CQRS之旅——旅程6(我们系统的版本管理) (6)

订单和注册限界上下文中的ConferenceViewModelGenerator类现在处理这些事件,并使用它们来计算和存储读模型中的座位类型数量。下面的代码示例显示了ConferenceViewModelGenerator类中的相关处理程序:

public void Handle(AvailableSeatsChanged @event) { this.UpdateAvailableQuantity(@event, @event.Seats); } public void Handle(SeatsReserved @event) { this.UpdateAvailableQuantity(@event, @event.AvailableSeatsChanged); } public void Handle(SeatsReservationCancelled @event) { this.UpdateAvailableQuantity(@event, @event.AvailableSeatsChanged); } private void UpdateAvailableQuantity(IVersionedEvent @event, IEnumerable<SeatQuantity> seats) { using (var repository = this.contextFactory.Invoke()) { var dto = repository.Set<Conference>().Include(x => x.Seats).FirstOrDefault(x => x.Id == @event.SourceId); if (dto != null) { if (@event.Version > dto.SeatsAvailabilityVersion) { foreach (var seat in seats) { var seatDto = dto.Seats.FirstOrDefault(x => x.Id == seat.SeatType); if (seatDto != null) { seatDto.AvailableQuantity += seat.Quantity; } else { Trace.TraceError("Failed to locate Seat Type read model being updated with id {0}.", seat.SeatType); } } dto.SeatsAvailabilityVersion = @event.Version; repository.Save(dto); } else { Trace.TraceWarning ... } } else { Trace.TraceError ... } } }

UpdateAvailableQuantity方法将事件上的版本与读模型的当前版本进行比较,以检测可能的重复消息。

CQRS之旅——旅程6(我们系统的版本管理)

Markus(软件开发人员)发言:

此检查仅检测重复的消息,而不是超出序列的消息。

修改UI以显示剩余的座位数量

现在,当UI向会议的读模型查询座位类型列表时,列表包括当前可用的座位数量。下面的代码示例显示了RegistrationController MVC控制器如何使用SeatType类的AvailableQuantity

private OrderViewModel CreateViewModel() { var seatTypes = this.ConferenceDao.GetPublishedSeatTypes(this.ConferenceAlias.Id); var viewModel = new OrderViewModel { ConferenceId = this.ConferenceAlias.Id, ConferenceCode = this.ConferenceAlias.Code, ConferenceName = this.ConferenceAlias.Name, Items = seatTypes.Select( s => new OrderItemViewModel { SeatType = s, OrderItem = new DraftOrderItem(s.Id, 0), AvailableQuantityForOrder = s.AvailableQuantity, MaxSelectionQuantity = Math.Min(s.AvailableQuantity, 20) }).ToList(), }; return viewModel; } 数据迁移

保存会议读模型数据的数据库有一个新列来保存用于检查重复事件的版本号,而保存座位类型读模型数据有一个新列来保存可用的座椅数量。

作为数据迁移的一部分,有必要为每个可用座位(SeatsAvailability)聚合重放事件存储中的所有事件,以便正确计算可用数量。

不让命令消息重复

系统目前使用Azure服务总线传输消息。当系统从ConferenceProcessor类的启动代码初始化Azure服务总线时,它配置Topic来检测重复的消息,如下面的ServiceBusConfig类的代码示例所示:

private void CreateTopicIfNotExists() { var topicDescription = new TopicDescription(this.topic) { RequiresDuplicateDetection = true, DuplicateDetectionHistoryTimeWindow = topic.DuplicateDetectionHistoryTimeWindow, }; try { this.namespaceManager.CreateTopic(topicDescription); } catch (MessagingEntityAlreadyExistsException) { } } 备注:您可以在Settings.xml文件中配置DuplicateDetectionHistoryTimeWindow 可以向Topic元素添加这个属性。默认值是1小时。

但是,为了使重复检测工作正常,您必须确保每个消息都有一个惟一的ID。下面的代码示例显示了MarkSeatsAsReserved命令:

public class MarkSeatsAsReserved : ICommand { public MarkSeatsAsReserved() { this.Id = Guid.NewGuid(); this.Seats = new List<SeatQuantity>(); } public Guid Id { get; set; } public Guid OrderId { get; set; } public List<SeatQuantity> Seats { get; set; } public DateTime Expiration { get; set; } }

CommandBus类中的BuildMessage方法使用命令Id创建一个惟一的消息Id, Azure服务总线可以使用这个消息Id来检测重复:

private BrokeredMessage BuildMessage(Envelope command) { var stream = new MemoryStream(); ... var message = new BrokeredMessage(stream, true); if (!default(Guid).Equals(command.Body.Id)) { message.MessageId = command.Body.Id.ToString(); } ... return message; } 保证消息顺序

团队决定使用Azure服务总线消息会话来保证系统中的消息顺序。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wsfpgw.html