CQRS之旅——旅程5(准备发布V1版本) (11)

下面来自OrderCommandHandler类的代码示例显示了如何调用存储库中的Find方法来启动此过程。

public void Handle(MarkSeatsAsReserved command) { var order = repository.Find(command.OrderId); ... }

下面的代码示例显示了SqlEventSourcedRepository类如何加载与聚合关联的事件流。

CQRS之旅——旅程5(准备发布V1版本)

Jana(软件架构师)发言:

该团队后来使用Azure表而不是SqlEventSourcedRepository开发了一个简单的事件存储。下一节将描述这种基于Azure表存储的实现。

public T Find(Guid id) { using (var context = this.contextFactory.Invoke()) { var deserialized = context.Set<Event>() .Where(x => x.AggregateId == id) .OrderBy(x => x.Version) .AsEnumerable() .Select(x => this.serializer.Deserialize(new MemoryStream(x.Payload))) .Cast<IVersionedEvent>() .AsCachedAnyEnumerable(); if (deserialized.Any()) { return entityFactory.Invoke(id, deserialized); } return null; } }

下面的代码示例显示了当前面的代码调用Invoke方法时候,Order类中的构造函数是怎样从自己的事件流里重建状态的。

public Order(Guid id, IEnumerable<IVersionedEvent> history) : this(id) { this.LoadFrom(history); }

LoadFrom方法在EventSourced类中定义,如下面的代码示例所示。对于历史中存储的每个事件,它确定要在Order类中调用的适当处理程序方法,并更新聚合实例的版本号。

protected void LoadFrom(IEnumerable<IVersionedEvent> pastEvents) { foreach (var e in pastEvents) { this.handlers[e.GetType()].Invoke(e); this.version = e.Version; } } 简单事件存储实现的一些问题

前面几节中概述的事件源和事件存储的简单实现有许多缺点。下面的列表列出了在生产质量的实现中应该克服的一些缺点。

SqlEventRepository类中的Save方法不能保证将事件持久存储并发布到消息传递基础设施。失败可能导致事件被保存到存储区,但不会发布。

没有检查当系统持久保存一个事件时,它是否是比前一个事件晚一些的事件。事件需要被按顺序存储。

对于事件流中有大量事件的聚合实例,没有适当的优化。这可能会在重播事件时导致性能问题。

基于Azure表的事件存储

基于Azure表实现的事件存储解决了简单的基于SQL server实现的事件存储的一些缺点。然而,在这一点上,它仍然不是一个生产质量的实现。

团队设计此实现是为了确保事件既被持久化到存储中,又被发布在消息总线上。为了实现这一点,它使用了Azure表的事务功能。

CQRS之旅——旅程5(准备发布V1版本)

Markus(软件开发人员)发言:

Azure表存储支持跨共享相同分区键的记录的事务。

EventStore类最初保存要持久化的每个事件的两个副本。一个副本是该事件的永久记录,另一个副本成为必须在Azure服务总线上发布的事件虚拟队列的一部分。下面的代码示例显示了EventStore类中的Save方法。前缀“Unpublished”标识事件的副本,该副本是未发布事件的虚拟队列的一部分。

public void Save(string partitionKey, IEnumerable<EventData> events) { var context = this.tableClient.GetDataServiceContext(); foreach (var eventData in events) { var formattedVersion = eventData.Version.ToString("D10"); context.AddObject( this.tableName, new EventTableServiceEntity { PartitionKey = partitionKey, RowKey = formattedVersion, SourceId = eventData.SourceId, SourceType = eventData.SourceType, EventType = eventData.EventType, Payload = eventData.Payload }); // Add a duplicate of this event to the Unpublished "queue" context.AddObject( this.tableName, new EventTableServiceEntity { PartitionKey = partitionKey, RowKey = UnpublishedRowKeyPrefix + formattedVersion, SourceId = eventData.SourceId, SourceType = eventData.SourceType, EventType = eventData.EventType, Payload = eventData.Payload }); } try { this.eventStoreRetryPolicy.ExecuteAction(() => context.SaveChanges(SaveChangesOptions.Batch)); } catch (DataServiceRequestException ex) { var inner = ex.InnerException as DataServiceClientException; if (inner != null && inner.StatusCode == (int)HttpStatusCode.Conflict) { throw new ConcurrencyException(); } throw; } }

备注:此代码示例还说明了如何使用重复键错误来标识并发错误。

repository类中的Save方法如下所示。此方法由事件处理程序类调用,它调用前面代码示例中所示的Save方法,并调用EventStoreBusPublisher类的SendAsync方法。

public void Save(T eventSourced) { var events = eventSourced.Events.ToArray(); var serialized = events.Select(this.Serialize); var partitionKey = this.GetPartitionKey(eventSourced.Id); this.eventStore.Save(partitionKey, serialized); this.publisher.SendAsync(partitionKey); }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpjgxy.html