CQRS之旅——旅程7(增加弹性和优化性能) (14)

团队向Azure服务总线订阅添加了过滤器,以将每个订阅接收到的消息限制为订阅打算处理的消息。您可以在Settings.Template.xml文件中看到这些过滤器的定义,如下面的代码片段所示:

<Topic Path="conference/events" IsEventBus="true"> <Subscription RequiresSession="false"/> <Subscription RequiresSession="false" SqlFilter="TypeName IN ('OrderPlaced')"/> <Subscription RequiresSession="false" SqlFilter="TypeName IN ('OrderUpdated','SeatsReserved','PaymentCompleted','OrderConfirmed')"/> <Subscription RequiresSession="true" SqlFilter="TypeName IN ('OrderPlaced','OrderUpdated','OrderPartiallyReserved','OrderReservationCompleted','OrderRegistrantAssigned','OrderConfirmed','OrderPaymentConfirmed')"/> <Subscription RequiresSession="true" SqlFilter="TypeName IN ('OrderPlaced','OrderTotalsCalculated','OrderConfirmed','OrderExpired','SeatAssignmentsCreated','SeatCreated','SeatUpdated')"/> <Subscription RequiresSession="true" SqlFilter="TypeName IN ('ConferenceCreated','ConferenceUpdated','ConferencePublished','ConferenceUnpublished','SeatCreated','SeatUpdated','AvailableSeatsChanged','SeatsReserved','SeatsReservationCancelled')"/> <Subscription RequiresSession="true" SqlFilter="TypeName IN ('SeatAssignmentsCreated','SeatAssigned','SeatUnassigned','SeatAssignmentUpdated')"/> <Subscription RequiresSession="true" SqlFilter="TypeName IN ('OrderConfirmed','OrderPaymentConfirmed')"/> <Subscription RequiresSession="true" SqlFilter="TypeName IN ('OrderPlaced','OrderRegistrantAssigned','OrderTotalsCalculated','OrderConfirmed','OrderExpired','SeatAssignmentsCreated','SeatAssigned','SeatAssignmentUpdated','SeatUnassigned')"/> ... </Topic> 为可用座位(SeatsAvailability)聚合创建一个专用的SessionSubscriptionReceiver实例

在V2版本中,系统没有为命令使用会话,因为我们不需要命令的顺序保证。然而,我们现在希望为命令使用会话来保证每个可用座位(SeatsAvailability)聚合实例都有一个监听者,这将在不从这个高争用聚合中获得大量并发异常的情况下帮助我们进行扩展。

Conference.Processor.Azure.cs文件中的以下代码示例显示了系统如何创建一个专用的SessionSubscriptionReceiver实例来接收发送到可用座位(SeatsAvailability)聚合的消息:

var seatsAvailabilityCommandProcessor = new CommandProcessor(new SessionSubscriptionReceiver(azureSettings.ServiceBus, Topics.Commands.Path, Topics.Commands.Subscriptions.SeatsAvailability, false), serializer); ... container.RegisterInstance<IProcessor>("SeatsAvailabilityCommandProcessor", seatsAvailabilityCommandProcessor);

下面的代码示例显示了新的抽象SeatsAvailabilityCommand类,其中包含一个基于与该命令关联的会议的会话ID:

public abstract class SeatsAvailabilityCommand : ICommand, IMessageSessionProvider { public SeatsAvailabilityCommand() { this.Id = Guid.NewGuid(); } public Guid Id { get; set; } public Guid ConferenceId { get; set; } string IMessageSessionProvider.SessionId { get { return "SeatsAvailability_" + this.ConferenceId.ToString(); } } }

命令总线现在使用一个单独的订阅来订阅为可用座位(SeatsAvailability)聚合指定的命令。

Markus(软件开发人员)发言:

团队对RegistrationProcessManager流程管理器应用了类似的技术,为OrderPlaced事件创建单独的订阅来处理新订单。一个单独的订阅接收指定给流程管理器的所有其他事件。

缓存读模型的数据

作为V3版本中性能优化的一部分,团队为存储在订单和注册限界上下文读模型中的会议信息添加了缓存行为。这减少了读取这些常用数据所花费的时间。

下面的代码示例来自CachingConferenceDao类中的GetPublishedSeatTypes方法,展示了系统如何根据可用座位的数量决定是否缓存会议数据:

TimeSpan timeToCache; if (seatTypes.All(x => x.AvailableQuantity > 200 || x.AvailableQuantity <= 0)) { timeToCache = TimeSpan.FromMinutes(5); } else if (seatTypes.Any(x => x.AvailableQuantity < 30 && x.AvailableQuantity > 0)) { // There are just a few seats remaining. Do not cache. timeToCache = TimeSpan.Zero; } else if (seatTypes.Any(x => x.AvailableQuantity < 100 && x.AvailableQuantity > 0)) { timeToCache = TimeSpan.FromSeconds(20); } else { timeToCache = TimeSpan.FromMinutes(1); } if (timeToCache > TimeSpan.Zero) { this.cache.Set(key, seatTypes, new CacheItemPolicy { AbsoluteExpiration = DateTimeOffset.UtcNow.Add(timeToCache) }); }

Jana(软件架构师)发言:

您可以看到,通过调整缓存时间,甚至决定根本不缓存数据,我们是如何管理“显示陈旧数据”相关的风险。

系统现在还使用缓存来保存PricedOrderViewModelGenerator类中的座位类型描述。

使用多个Topic划分服务总线

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpspxg.html