CQRS之旅——旅程6(我们系统的版本管理) (8)

CQRS之旅——旅程6(我们系统的版本管理)

Gary(CQRS专家)发言:

此消息日志确保不会丢失任何消息,以便将来能够满足其他需求。

向消息添加额外元数据

系统现在将所有消息保存到消息日志中。为了方便查询特定命令或事件,系统现在向每个消息添加了更多的元数据。以前,惟一的元数据是事件类型,现在,事件元数据包括事件类型、命名空间、程序集和路径。系统将元数据添加到EventBus类中的事件和CommandBus类中的命令中。

捕获消息并将消息持久化到消息日志中

系统使用Azure服务总线中对会议/命令和会议/事件topic的额外订阅来接收系统中每条消息的副本。然后,它将消息附加到Azure表存储中。下面的代码示例显示了AzureMessageLogWriter类的实例,它用于将消息保存到表中:

public class MessageLogEntity : TableServiceEntity { public string Kind { get; set; } public string CorrelationId { get; set; } public string MessageId { get; set; } public string SourceId { get; set; } public string AssemblyName { get; set; } public string Namespace { get; set; } public string FullName { get; set; } public string TypeName { get; set; } public string SourceType { get; set; } public string CreationDate { get; set; } public string Payload { get; set; } }

Kind属性指定消息是命令还是事件。MessageId和CorrelationId属性由消息传递基础设施设置的,其余属性是从消息元数据中设置的。

下面的代码示例显示了这些消息的分区和RowKey的定义:

PartitionKey = message.EnqueuedTimeUtc.ToString("yyyMM"), RowKey = message.EnqueuedTimeUtc.Ticks.ToString("D20") + "_" + message.MessageId

注意,RowKey保存了消息最初发送的顺序,并添加到消息ID上,以确保惟一性,以防两条消息同时入队。

CQRS之旅——旅程6(我们系统的版本管理)

Jana(软件架构师)发言:

这与事件存储不同,在事件存储区中,分区键标识聚合实例,而RowKey标识聚合的版本号。

数据迁移

当Contoso将系统从V1迁移到V2时,它将使用消息日志在订单和注册限界上下文中重建会议和价格订单的读模型。

CQRS之旅——旅程6(我们系统的版本管理)

Gary(CQRS专家)发言:

Contoso可以在需要重建与聚合无关的事件构建的读模型时来使用消息日志,例如来自会议管理限界上下文的集成事件。

会议读模型包含会议的信息,并包含来自会议管理限界上下文的ConferenceCreated、ConferenceUpdated、ConferencePublished、ConferenceUnpublished、SeatCreated和SeatUpdated事件的信息。

价格订单读模型持有来自于SeatCreated和SeatUpdated事件的信息,这些事件来自于会议管理限界上下文。

然而,在V1中,这些事件消息没有被持久化,因此读模型不能在V2中重新填充。为了解决这个问题,团队实现了一个数据迁移实用程序,它使用一种最佳方法来生成包含要存储在消息日志中的丢失数据的事件。例如,在迁移到V2之后,消息日志不包含ConferenceCreated事件,因此迁移实用程序在会议管理限界上下文使用的数据库中找到这些信息,并创建丢失的事件。您可以在MigrationToV2项目的Migrator类中的GeneratePastEventLogMessagesForConferenceManagement方法中看到这是如何完成的。

CQRS之旅——旅程6(我们系统的版本管理)

Markus(软件开发人员)发言:

您可以在这个类中看到,Contoso还将所有现有的事件源事件复制到消息日志中。

如下面所示,Migrator类中的RegenerateViewModels方法重新构建读取的模型。它通过调用Query方法从消息日志中检索所有事件,然后使用ConferenceViewModelGeneratorPricedOrderViewModelUpdater类来处理消息。

internal void RegenerateViewModels(AzureEventLogReader logReader, string dbConnectionString) { var commandBus = new NullCommandBus(); Database.SetInitializer<ConferenceRegistrationDbContext>(null); var handlers = new List<IEventHandler>(); handlers.Add(new ConferenceViewModelGenerator(() => new ConferenceRegistrationDbContext(dbConnectionString), commandBus)); handlers.Add(new PricedOrderViewModelUpdater(() => new ConferenceRegistrationDbContext(dbConnectionString))); using (var context = new ConferenceRegistrationMigrationDbContext(dbConnectionString)) { context.UpdateTables(); } try { var dispatcher = new MessageDispatcher(handlers); var events = logReader.Query(new QueryCriteria { }); dispatcher.DispatchMessages(events); } catch { using (var context = new ConferenceRegistrationMigrationDbContext(dbConnectionString)) { context.RollbackTablesMigration(); } throw; } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wsfpgw.html