下面来自OrderCommandHandler类的代码示例显示了如何调用存储库中的Find方法来启动此过程。
public void Handle(MarkSeatsAsReserved command) { var order = repository.Find(command.OrderId); ... }下面的代码示例显示了SqlEventSourcedRepository类如何加载与聚合关联的事件流。
Jana(软件架构师)发言:该团队后来使用Azure表而不是SqlEventSourcedRepository开发了一个简单的事件存储。下一节将描述这种基于Azure表存储的实现。 public T Find(Guid id) { using (var context = this.contextFactory.Invoke()) { var deserialized = context.Set<Event>() .Where(x => x.AggregateId == id) .OrderBy(x => x.Version) .AsEnumerable() .Select(x => this.serializer.Deserialize(new MemoryStream(x.Payload))) .Cast<IVersionedEvent>() .AsCachedAnyEnumerable(); if (deserialized.Any()) { return entityFactory.Invoke(id, deserialized); } return null; } }
下面的代码示例显示了当前面的代码调用Invoke方法时候,Order类中的构造函数是怎样从自己的事件流里重建状态的。
public Order(Guid id, IEnumerable<IVersionedEvent> history) : this(id) { this.LoadFrom(history); }LoadFrom方法在EventSourced类中定义,如下面的代码示例所示。对于历史中存储的每个事件,它确定要在Order类中调用的适当处理程序方法,并更新聚合实例的版本号。
protected void LoadFrom(IEnumerable<IVersionedEvent> pastEvents) { foreach (var e in pastEvents) { this.handlers[e.GetType()].Invoke(e); this.version = e.Version; } } 简单事件存储实现的一些问题前面几节中概述的事件源和事件存储的简单实现有许多缺点。下面的列表列出了在生产质量的实现中应该克服的一些缺点。
SqlEventRepository类中的Save方法不能保证将事件持久存储并发布到消息传递基础设施。失败可能导致事件被保存到存储区,但不会发布。
没有检查当系统持久保存一个事件时,它是否是比前一个事件晚一些的事件。事件需要被按顺序存储。
对于事件流中有大量事件的聚合实例,没有适当的优化。这可能会在重播事件时导致性能问题。
基于Azure表的事件存储基于Azure表实现的事件存储解决了简单的基于SQL server实现的事件存储的一些缺点。然而,在这一点上,它仍然不是一个生产质量的实现。
团队设计此实现是为了确保事件既被持久化到存储中,又被发布在消息总线上。为了实现这一点,它使用了Azure表的事务功能。
Markus(软件开发人员)发言:Azure表存储支持跨共享相同分区键的记录的事务。
EventStore类最初保存要持久化的每个事件的两个副本。一个副本是该事件的永久记录,另一个副本成为必须在Azure服务总线上发布的事件虚拟队列的一部分。下面的代码示例显示了EventStore类中的Save方法。前缀“Unpublished”标识事件的副本,该副本是未发布事件的虚拟队列的一部分。
public void Save(string partitionKey, IEnumerable<EventData> events) { var context = this.tableClient.GetDataServiceContext(); foreach (var eventData in events) { var formattedVersion = eventData.Version.ToString("D10"); context.AddObject( this.tableName, new EventTableServiceEntity { PartitionKey = partitionKey, RowKey = formattedVersion, SourceId = eventData.SourceId, SourceType = eventData.SourceType, EventType = eventData.EventType, Payload = eventData.Payload }); // Add a duplicate of this event to the Unpublished "queue" context.AddObject( this.tableName, new EventTableServiceEntity { PartitionKey = partitionKey, RowKey = UnpublishedRowKeyPrefix + formattedVersion, SourceId = eventData.SourceId, SourceType = eventData.SourceType, EventType = eventData.EventType, Payload = eventData.Payload }); } try { this.eventStoreRetryPolicy.ExecuteAction(() => context.SaveChanges(SaveChangesOptions.Batch)); } catch (DataServiceRequestException ex) { var inner = ex.InnerException as DataServiceClientException; if (inner != null && inner.StatusCode == (int)HttpStatusCode.Conflict) { throw new ConcurrencyException(); } throw; } }备注:此代码示例还说明了如何使用重复键错误来标识并发错误。
repository类中的Save方法如下所示。此方法由事件处理程序类调用,它调用前面代码示例中所示的Save方法,并调用EventStoreBusPublisher类的SendAsync方法。
public void Save(T eventSourced) { var events = eventSourced.Events.ToArray(); var serialized = events.Select(this.Serialize); var partitionKey = this.GetPartitionKey(eventSourced.Id); this.eventStore.Save(partitionKey, serialized); this.publisher.SendAsync(partitionKey); }