CQRS之旅——旅程7(增加弹性和优化性能) (9)

我们考虑的另一种方法是在V3处理器中同时包含V2和V3处理。使用这种方法,在迁移期间不需要一个特别的工作人员角色来处理V2事件。但是,我们决定将特定于迁移的代码保存在一个单独的项目中,以避免V3发行版由于包含只在迁移期间需要的功能而膨胀。

Jana(软件架构师)发言:

如果我们在V3处理器中同时包含V2和V3处理,迁移过程会稍微容易一些。但我们认为,这种方法的好处被不必在V3处理器中维护重复功能的好处所抵消。

迁移的每个步骤之间的间隔需要一些时间来完成,因此迁移不会导致停机,但是用户确实会遇到延迟。我们可以从处理切换开关的一些更快的机制中获益,比如停止V2处理器并启动V3处理器。

实现细节

本节描述订单和注册限界上下文的实现的一些重要功能。您可能会发现拥有一份代码拷贝很有用,这样您就可以继续学习了。您可以从Download center下载一个副本,或者在GitHub上查看存储库:https://github.com/mspnp/cqrs-journey-code。您可以从GitHub上的Tags页面下载V3版本的代码。

备注:不要期望代码示例与参考实现中的代码完全匹配。本章描述了CQRS过程中的一个步骤,随着我们了解更多并重构代码,实现可能会发生变化。 增强RegistrationProcessManager类

本节描述了团队如何通过检查SeatsReservedOrderPlaced消息的重复实例来强化RegistrationProcessManager流程管理器。

检测无序的SeatsReserved事件

通常,RegistrationProcessManager类向SeatAvailability聚合发送一个MakeSeatReservation命令,SeatAvailability聚合在进行预订时发布一个SeatsReserved事件,RegistrationProcessManager接收此通知。RegistrationProcessManager在创建订单和更新订单时都发送一条MakeSeatReservation命令。SeatsReserve事件到达的时候可能不是按顺序的,但是,系统应该尊重与最后发送的命令相关的事件。本节描述的解决方案使RegistrationProcessManager能够识别最新的SeatsReserved消息,然后忽略任何较早的消息,而不是重新处理它们。

RegistrationProcessManager类发送MakeSeatReservation命令之前,它将该命令的Id保存在SeatReservationCommandId变量中,如下面的代码示例所示:

public void Handle(OrderPlaced message) { if (this.State == ProcessState.NotStarted) { this.ConferenceId = message.ConferenceId; this.OrderId = message.SourceId; // Use the order id as an opaque reservation id for the seat reservation. // It could be anything else, as long as it is deterministic from the // OrderPlaced event. this.ReservationId = message.SourceId; this.ReservationAutoExpiration = message.ReservationAutoExpiration; var expirationWindow = message.ReservationAutoExpiration.Subtract(DateTime.UtcNow); if (expirationWindow > TimeSpan.Zero) { this.State = ProcessState.AwaitingReservationConfirmation; var seatReservationCommand = new MakeSeatReservation { ConferenceId = this.ConferenceId, ReservationId = this.ReservationId, Seats = message.Seats.ToList() }; this.SeatReservationCommandId = seatReservationCommand.Id; this.AddCommand(new Envelope<ICommand>(seatReservationCommand) { TimeToLive = expirationWindow.Add(TimeSpan.FromMinutes(1)), }); ... }

然后,当它处理SeatsReserved事件时,它检查该事件的CorrelationId属性是否匹配SeatReservationCommandId变量的最新值,如下面的代码示例所示:

public void Handle(Envelope<SeatsReserved> envelope) { if (this.State == ProcessState.AwaitingReservationConfirmation) { if (envelope.CorrelationId != null) { if (string.CompareOrdinal(this.SeatReservationCommandId.ToString(), envelope.CorrelationId) != 0) { // Skip this event. Trace.TraceWarning("Seat reservation response for reservation id {0} does not match the expected correlation id.", envelope.Body.ReservationId); return; } } ... }

注意这个Handle方法如何处理Envelope实例而不是SeatsReserved实例。作为V3版本的一部分,事件被封装在一个包含CorrelationId属性的Envelope实例中。EventDispatcher中的DoDispatchMessage方法分配关联Id的值。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpspxg.html