CQRS之旅——旅程7(增加弹性和优化性能) (12)

实现此功能的关键变更是在订单(Order)聚合里。Order类中的构造函数现在调用CalculateTotal方法并引发OrderTotalsCalculated事件,如下面的代码示例所示:

public Order(Guid id, Guid conferenceId, IEnumerable<OrderItem> items, IPricingService pricingService) : this(id) { var all = ConvertItems(items); var totals = pricingService.CalculateTotal(conferenceId, all.AsReadOnly()); this.Update(new OrderPlaced { ConferenceId = conferenceId, Seats = all, ReservationAutoExpiration = DateTime.UtcNow.Add(ReservationAutoExpiration), AccessCode = HandleGenerator.Generate(6) }); this.Update(new OrderTotalsCalculated { Total = totals.Total, Lines = totals.Lines != null ? totals.Lines.ToArray() : null, IsFreeOfCharge = totals.Total == 0m }); }

之前,在V2版本中,订单(Order)聚合一直等到收到MarkAsReserved命令才调用CalculateTotal方法。

异步接收、完成和发送消息

本节概述了系统现在如何异步地在Azure服务总线上执行所有I/O。

异步接收消息

SubscriptionReceiverSessionSubscriptionReceiver类现在异步接收消息,而不是在ReceiveMessages方法的循环中同步接收消息。

有关详细信息,请参阅SubscriptionReceiver类中的ReceiveMessages方法或SessionSubscriptionReceiver类中的ReceiveMessagesAndCloseSession方法。

Markus(软件开发人员)发言:

此代码示例还展示了如何使用Transient Fault Handling Application Block来可靠地异步接收来自服务总线Topic的消息。异步循环使代码更难以读取,但效率更高。这是推荐的最佳实践。这段代码将受益于c# 4中新的async关键字。

异步完成消息

系统使用peek/lock机制从服务总线Topic订阅中检索消息。要了解系统如何异步执行这些操作,请参阅SubscriptionReceiverSessionSubscriptionReceiver类中的ReceiveMessages方法。这提供了一个系统如何使用异步api的例子。

异步发送消息

应用程序现在异步发送服务总线上的所有消息。有关详细信息,请参见TopicSender类。

在进程中同步处理命令

在V2版本中,系统使用Azure服务总线将所有命令传递给它们的接收者。这意味着系统异步地交付命令。在V3版本中,MVC控制器现在同步地在进程中发送命令,以便通过绕过命令总线并将命令直接传递给处理程序来改进UI中的响应时间。此外,在ConferenceProcessor工作者角色中,发送到订单(Order)聚合的命令使用相同的机制在进程中同步发送。

Markus(软件开发人员)发言:

我们仍然异步地向可用座位(SeatsAvailability)聚合发送命令,因为随着RegistrationProcessManager的多个实例并行运行,将会出现争用,因为多个线程都试图访问可用座位(SeatsAvailability)聚合的同一个实例。

团队实现这种行为通过添加SynchronousCommandBusDecoratorCommandDispatcher类到基础设施并且在web角色启动的时候注册它们,如下面的代码展示了Global.asax.Azure.cs文件里的OnCreateContainer方法**:

var commandBus = new CommandBus(new TopicSender(settings.ServiceBus, "conference/commands"), metadata, serializer); var synchronousCommandBus = new SynchronousCommandBusDecorator(commandBus); container.RegisterInstance<ICommandBus>(synchronousCommandBus); container.RegisterInstance<ICommandHandlerRegistry>(synchronousCommandBus); container.RegisterType<ICommandHandler, OrderCommandHandler>("OrderCommandHandler"); container.RegisterType<ICommandHandler, ThirdPartyProcessorPaymentCommandHandler>("ThirdPartyProcessorPaymentCommandHandler"); container.RegisterType<ICommandHandler, SeatAssignmentsHandler>("SeatAssignmentsHandler"); 备注:在Conference.Azure.cs文件中也有类似的代码,用于配置工作角色,以便在进程中发送一些命令。

下面的代码示例展示了SynchronousCommandBusDecorator类如何实现命令消息的发送:

public class SynchronousCommandBusDecorator : ICommandBus, ICommandHandlerRegistry { private readonly ICommandBus commandBus; private readonly CommandDispatcher commandDispatcher; public SynchronousCommandBusDecorator(ICommandBus commandBus) { this.commandBus = commandBus; this.commandDispatcher = new CommandDispatcher(); } ... public void Send(Envelope<ICommand> command) { if (!this.DoSend(command)) { Trace.TraceInformation("Command with id {0} was not handled locally. Sending it through the bus.", command.Body.Id); this.commandBus.Send(command); } } ... private bool DoSend(Envelope<ICommand> command) { bool handled = false; try { var traceIdentifier = string.Format(CultureInfo.CurrentCulture, " (local handling of command with id {0})", command.Body.Id); handled = this.commandDispatcher.ProcessMessage(traceIdentifier, command.Body, command.MessageId, command.CorrelationId); } catch (Exception e) { Trace.TraceWarning("Exception handling command with id {0} synchronously: {1}", command.Body.Id, e.Message); } return handled; } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpspxg.html