CQRS之旅——旅程6(我们系统的版本管理) (2)

在基础设施中映射和过滤事件消息是一种选择。此方法是对旧的事件消息和消息格式进行处理,在它们到达领域之前在基础设施的某个位置处理它们。您可以过滤掉不再相关的旧消息,并使用映射将旧格式的消息转换为新格式。这种方法最初比较复杂,因为它需要对基础设施进行更改,但是它可以保持领域域的纯粹,领域只需要理解当前的新事件集合就可以了。

在聚合中处理多个版本的消息

在聚合中处理多个版本的消息是另一种选择。在这种方法中,所有消息类型(包括旧消息和新消息)都传递到领域,每个聚合必须能够处理旧消息和新消息。从短期来看,这可能是一个合适的策略,但它最终会导致域模型受到遗留事件处理程序的污染。

团队为V2版本选择了这个选项,因为它包含了最少数量的代码更改。

CQRS之旅——旅程6(我们系统的版本管理)

Jana(软件架构师)发言:

当前在聚合中处理旧事件和新事件并不妨碍您以后使用第一种选择:在基础设施中使用映射/过滤机制。

履行消息幂等性

V2版本中要解决的一个关键问题是使系统更加健壮。在V1版本中,在某些场景中,可能会多次处理某些消息,导致系统中的数据不正确或不一致。

CQRS之旅——旅程6(我们系统的版本管理)

Jana(软件架构师)发言:

消息幂等性在任何使用消息传递的系统中都很重要,这不仅仅是在实现CQRS模式或使用事件源的系统中。

在某些场景中,设计幂等消息是可能的,例如:使用“将座位配额设置为500”的消息,而不是“在座位配额中增加100”的消息。您可以安全地多次处理第一个消息,但不能处理第二个消息。

然而,并不总是能够使用幂等消息,因此团队决定使用Azure服务总线的重复删除特性,以确保它只传递一次消息。团队对基础设施进行了一些更改,以确保Azure服务总线能够检测重复消息,并配置Azure服务总线来执行重复消息检测。

要了解Contoso是如何实现这一点的,请参阅下面的“不让命令消息重复”一节。此外,我们需要考虑系统中的消息处理程序如何从队列和Topic检索消息。当前的方法使用Azure服务总线peek/lock机制。这是一个分成三个阶段的过程:

处理程序从队列或Topic检索消息,并在其中留下消息的锁定副本。其他客户端无法看到或访问锁定的消息。

处理程序处理消息。

处理程序从队列中删除锁定的消息。如果锁定的消息在固定时间后没有解锁或删除,则解锁该消息并使其可用,以便再次检索。

如果步骤由于某种原因失败,这意味着系统可以不止一次地处理消息。

CQRS之旅——旅程6(我们系统的版本管理)

Jana(软件架构师)发言:

该团队计划在旅程的下一阶段解决这个问题(步骤失败的问题)。更多信息,请参见第7章“添加弹性和优化性能”。

阻止多次处理事件

在V1中,在某些场景里,如果在处理事件时发生错误,系统可能多次处理事件。为了避免这种情况,团队修改了体系结构,以便每个事件处理程序都有自己对Azure Topic的订阅。下图显示了两个不同的模型。

CQRS之旅——旅程6(我们系统的版本管理)

在V1中,可能发生以下行为:

EventProcessor实例从服务总线中的所有订阅者那里接收到OrderPlaced事件。

EventProcessor实例有两个已注册的处理程序,RegistrationProcessManagerRouterOrderViewModelGenerator处理程序类,所以会在两个里都触发调用Handle方法。

OrderViewModelGenerator类中的Handle方法执行成功。

RegistrationProcessManagerRouter类中的Handle方法抛出异常。

EventProcessor实例捕获到异常然后抛弃掉事件消息。消息将自动放回订阅中。

EventProcessor实例第二次从所有订阅者那里接收到OrderPlaced事件。

事件又触发两个处理方法,导致RegistrationProcessManagerRouter类和OrderViewModelGenerator第二次处理事件消息。

每当RegistrationProcessManagerRouter类抛出异常时,OrderViewModelGenerator类都会触发处理该事件。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wsfpgw.html