CQRS之旅——旅程6(我们系统的版本管理) (7)

系统从ConferenceProcessor类中的OnStart方法配置Azure服务总线Topic和订阅。Settings.xml配置文件中的配置指定了具体的订阅使用会话。ServiceBusConfig类中的以下代码示例显示了系统如何创建和配置订阅。

private void CreateSubscriptionIfNotExists(NamespaceManager namespaceManager, TopicSettings topic, SubscriptionSettings subscription) { var subscriptionDescription = new SubscriptionDescription(topic.Path, subscription.Name) { RequiresSession = subscription.RequiresSession }; try { namespaceManager.CreateSubscription(subscriptionDescription); } catch (MessagingEntityAlreadyExistsException) { } }

以下来自SessionSubscriptionReceiver类的代码示例演示了如何使用会话接收消息:

private void ReceiveMessages(CancellationToken cancellationToken) { while (!cancellationToken.IsCancellationRequested) { MessageSession session; try { session = this.receiveRetryPolicy.ExecuteAction<MessageSession>(this.DoAcceptMessageSession); } catch (Exception e) { ... } if (session == null) { Thread.Sleep(100); continue; } while (!cancellationToken.IsCancellationRequested) { BrokeredMessage message = null; try { try { message = this.receiveRetryPolicy.ExecuteAction(() => session.Receive(TimeSpan.Zero)); } catch (Exception e) { ... } if (message == null) { // If we have no more messages for this session, exit and try another. break; } this.MessageReceived(this, new BrokeredMessageEventArgs(message)); } finally { if (message != null) { message.Dispose(); } } } this.receiveRetryPolicy.ExecuteAction(() => session.Close()); } } private MessageSession DoAcceptMessageSession() { try { return this.client.AcceptMessageSession(TimeSpan.FromSeconds(45)); } catch (TimeoutException) { return null; } }

CQRS之旅——旅程6(我们系统的版本管理)

Markus(软件开发人员)发言:

您可能会发现,将使用消息会话的ReceiveMessages方法的这个版本与SubscriptionReceiver类中的原始版本进行比较是很有用的。

您必须确保当你发送消息包含一个会话ID,这样才能使用消息会话接收一条消息。系统使用事件的SourceID作为会话ID,如下面的代码示例所示的EventBus类中的BuildMessage方法:

var message = new BrokeredMessage(stream, true); message.SessionId = @event.SourceId.ToString();

通过这种方式,您可以确保以正确的顺序接收来自单个源的所有消息。

CQRS之旅——旅程6(我们系统的版本管理)

Poe(IT运维人员)发言:

在V2版本中,团队更改了系统创建Azure服务总线Topic和订阅的方式。之前,SubscriptionReceiver类创建了它们(如果它们还不存在)。现在,系统在应用程序启动时使用配置数据创建它们。这发生在启动过程的早期,以避免在系统初始化订阅之前将消息发送到Topic时丢失消息的风险。

然而,只有当消息按正确的顺序传递到总线上时,会话才能保证按顺序传递消息。如果系统异步发送消息,则必须特别注意确保消息以正确的顺序放在总线上。在我们的系统中,来自每个单独聚合实例的事件按顺序到达是很重要的,但是我们不关心来自不同聚合实例的事件的顺序。因此,尽管系统异步发送事件,EventStoreBusPublisher实例仍然会在发送下一个事件之前等待前一个事件已发送的确认。以下来自TopicSender类的示例说明了这一点:

public void Send(Func<BrokeredMessage> messageFactory) { var resetEvent = new ManualResetEvent(false); Exception exception = null; this.retryPolicy.ExecuteAction( ac => { this.DoBeginSendMessage(messageFactory(), ac); }, ar => { this.DoEndSendMessage(ar); }, () => resetEvent.Set(), ex => { Trace.TraceError("An unrecoverable error occurred while trying to send a message:\r\n{0}", ex); exception = ex; resetEvent.Set(); }); resetEvent.WaitOne(); if (exception != null) { throw exception; } }

CQRS之旅——旅程6(我们系统的版本管理)

Jana(软件架构师)发言:

此代码示例展示了系统如何使用Transient Fault Handling Application Block来让异步调用可靠。

有关消息排序和Azure服务总线的更多信息,请参见Microsoft Azure Queues and Microsoft Azure Service Bus Queues - Compared and Contrasted

有关异步发送消息和排序的信息,请参阅博客文章Microsoft Azure Service Bus Splitter and Aggregator

从会议管理限界上下文中持久化事件

团队决定创建一个包含所有发送的命令和事件的消息日志。这将使订单和注册限界上下文能够从会议管理限界上下文查询此日志,以获取其构建读模型所需的事件。这不是事件源,因为我们没有使用这些事件来重建聚合的状态,尽管我们使用类似的技术来捕获和持久化这些集成事件。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wsfpgw.html