在基础设施中映射和过滤事件消息是一种选择。此方法是对旧的事件消息和消息格式进行处理,在它们到达领域之前在基础设施的某个位置处理它们。您可以过滤掉不再相关的旧消息,并使用映射将旧格式的消息转换为新格式。这种方法最初比较复杂,因为它需要对基础设施进行更改,但是它可以保持领域域的纯粹,领域只需要理解当前的新事件集合就可以了。
在聚合中处理多个版本的消息在聚合中处理多个版本的消息是另一种选择。在这种方法中,所有消息类型(包括旧消息和新消息)都传递到领域,每个聚合必须能够处理旧消息和新消息。从短期来看,这可能是一个合适的策略,但它最终会导致域模型受到遗留事件处理程序的污染。
团队为V2版本选择了这个选项,因为它包含了最少数量的代码更改。
Jana(软件架构师)发言:当前在聚合中处理旧事件和新事件并不妨碍您以后使用第一种选择:在基础设施中使用映射/过滤机制。 履行消息幂等性
V2版本中要解决的一个关键问题是使系统更加健壮。在V1版本中,在某些场景中,可能会多次处理某些消息,导致系统中的数据不正确或不一致。
Jana(软件架构师)发言:消息幂等性在任何使用消息传递的系统中都很重要,这不仅仅是在实现CQRS模式或使用事件源的系统中。
在某些场景中,设计幂等消息是可能的,例如:使用“将座位配额设置为500”的消息,而不是“在座位配额中增加100”的消息。您可以安全地多次处理第一个消息,但不能处理第二个消息。
然而,并不总是能够使用幂等消息,因此团队决定使用Azure服务总线的重复删除特性,以确保它只传递一次消息。团队对基础设施进行了一些更改,以确保Azure服务总线能够检测重复消息,并配置Azure服务总线来执行重复消息检测。
要了解Contoso是如何实现这一点的,请参阅下面的“不让命令消息重复”一节。此外,我们需要考虑系统中的消息处理程序如何从队列和Topic检索消息。当前的方法使用Azure服务总线peek/lock机制。这是一个分成三个阶段的过程:
处理程序从队列或Topic检索消息,并在其中留下消息的锁定副本。其他客户端无法看到或访问锁定的消息。
处理程序处理消息。
处理程序从队列中删除锁定的消息。如果锁定的消息在固定时间后没有解锁或删除,则解锁该消息并使其可用,以便再次检索。
如果步骤由于某种原因失败,这意味着系统可以不止一次地处理消息。
Jana(软件架构师)发言:该团队计划在旅程的下一阶段解决这个问题(步骤失败的问题)。更多信息,请参见第7章“添加弹性和优化性能”。 阻止多次处理事件
在V1中,在某些场景里,如果在处理事件时发生错误,系统可能多次处理事件。为了避免这种情况,团队修改了体系结构,以便每个事件处理程序都有自己对Azure Topic的订阅。下图显示了两个不同的模型。
在V1中,可能发生以下行为:
EventProcessor实例从服务总线中的所有订阅者那里接收到OrderPlaced事件。
EventProcessor实例有两个已注册的处理程序,RegistrationProcessManagerRouter和OrderViewModelGenerator处理程序类,所以会在两个里都触发调用Handle方法。
在OrderViewModelGenerator类中的Handle方法执行成功。
在RegistrationProcessManagerRouter类中的Handle方法抛出异常。
EventProcessor实例捕获到异常然后抛弃掉事件消息。消息将自动放回订阅中。
EventProcessor实例第二次从所有订阅者那里接收到OrderPlaced事件。
事件又触发两个处理方法,导致RegistrationProcessManagerRouter类和OrderViewModelGenerator第二次处理事件消息。
每当RegistrationProcessManagerRouter类抛出异常时,OrderViewModelGenerator类都会触发处理该事件。