本讨论假设您已经基本了解了Azure服务总线队列和Topic之间的区别。有关Azure服务总线的介绍,请参阅参考指南中的“参考实现中使用的技术”。
使用上图所示的实现,有两件事是必要的,以确保命令只有单个处理程序。首先,Azure服务总线中应该保证只有一个会议/命令Topic的订阅。请记住,Azure服务总线主题是可以有多个订阅者的。其次,CommandProcessor应该为它接收到的每个命令调用一个处理程序。Azure服务总线中没有办法将主题限制为单个订阅。因此,开发人员必须自己小心的为命令的Topic创建单个订阅。
Gary(CQRS专家)发言:另一个问题是确保处理程序从Topic获取命令后只处理一次。您必须确保命令是幂等的,或者系统保证只处理命令一次。该团队将在旅程的后期处理这个问题。有关更多信息,请参见旅程第7章“增添适应能力和优化性能”。 备注:可能会运行多个SubscriptionReceiver实例,因为可以同时部署运行多个工作服务。如果多个SubscriptionReceiver实例可以接收来自同一主题订阅的消息,那么第一个调用SubscriptionClient对象上的Receive方法的实例将获取并处理该命令。
另一种方法是使用Azure服务总线队列代替Topic来传递命令。Azure服务总线队列与Topic的不同之处在于,它们的设计目的是将消息传递给单个接收者,而不是通过多个订阅传递给多个接收者。开发人员计划更详细的评估这个方案,以便在项目的稍后部分用此方案来实现。
下面来自SubscriptionReceiver类的代码示例显示了它如何接收来自Topic订阅的消息。
private SubscriptionClient client; ... private void ReceiveMessages(CancellationToken cancellationToken) { while (!cancellationToken.IsCancellationRequested) { BrokeredMessage message = null; try { message = this.receiveRetryPolicy.ExecuteAction<BrokeredMessage>(this.DoReceiveMessage); } catch (Exception e) { Trace.TraceError("An unrecoverable error occurred while trying to receive a new message:\r\n{0}", e); throw; } try { if (message == null) { Thread.Sleep(100); continue; } this.MessageReceived(this, new BrokeredMessageEventArgs(message)); } finally { if (message != null) { message.Dispose(); } } } } protected virtual BrokeredMessage DoReceiveMessage() { return this.client.Receive(TimeSpan.FromSeconds(10)); } Jana(软件架构师)发言:此代码示例展示了系统如何使用Transient Fault Handling Application Block可靠地从Topic获取消息。
Azure服务总线SubscriptionClient类使用peek/lock技术从订阅中获取消息。在代码示例中,Receive方法在订阅时锁定消息。当消息被锁定时,其他客户端无法看到它。然后Receive方法尝试处理消息。如果客户端成功处理消息,则调用Complete方法:这将从订阅中删除消息。否则,如果客户端未能成功处理该消息,则调用Abandon方法:这将释放消息上的锁,然后相同的客户端或不同的客户端就可以继续接收它。如果客户端在固定的时间内没有调用Complete方法或Abandon方法,则也会释放消息上的锁。
MessageReceived事件将一个引用传递给SubscriptionReceiver实例,以便处理程序在处理消息时可以调用Complete方法或Abandon方法。下面来自MessageProcessor类的代码示例展示了如何使用BrokeredMessage实例作为MessageReceived事件的参数以及如何使用它调用Complete和Abandon方法。
private void OnMessageReceived(object sender, BrokeredMessageEventArgs args) { var message = args.Message; object payload; using (var stream = message.GetBody<Stream>()) using (var reader = new StreamReader(stream)) { payload = this.serializer.Deserialize(reader); } try { ... ProcessMessage(payload); ... } catch (Exception e) { if (args.Message.DeliveryCount > MaxProcessingRetries) { Trace.TraceWarning("An error occurred while processing a new message and will be dead-lettered:\r\n{0}", e); message.SafeDeadLetter(e.Message, e.ToString()); } else { Trace.TraceWarning("An error occurred while processing a new message and will be abandoned:\r\n{0}", e); message.SafeAbandon(); } return; } Trace.TraceInformation("The message has been processed and will be completed."); message.SafeComplete(); }备注:本示例使用可靠的Transient Fault Handling Application Block,并使用扩展方法调用BrokeredMessage的Complete方法和Abandon方法。
为什么分为CommandBus和EventBus?