CQRS之旅——旅程7(增加弹性和优化性能) (10)

Markus(软件开发人员)发言:

作为添加此功能的副作用,EventProcessor类在将事件转发给处理程序时,不能再使用dynamic关键字。现在在V3中,它使用了新的EventDispatcher类,该类使用反射来标识给定消息类型的正确处理程序。

在性能测试期间,团队发现了这个特定的SeatsReserved事件的另一个问题。由于系统在加载时其他地方出现了延迟,因此第二份SeatsReserved事件被发布了。然后,这个Handle方法抛出一个异常,导致系统在将消息发送到dead-letter队列之前多次重试处理该消息。为了解决这个特定的问题,团队修改了这个方法,添加了else if子句,如下面的代码示例所示:

public void Handle(Envelope<SeatsReserved> envelope) { if (this.State == ProcessState.AwaitingReservationConfirmation) { ... } else if (string.CompareOrdinal(this.SeatReservationCommandId.ToString(), envelope.CorrelationId) == 0) { Trace.TraceInformation("Seat reservation response for request {1} for reservation id {0} was already handled. Skipping event.", envelope.Body.ReservationId, envelope.CorrelationId); } else { throw new InvalidOperationException("Cannot handle seat reservation at this stage."); } }

Markus(软件开发人员)发言:

此优化仅应用于此特定消息。注意,它使用了之前保存在实例中的SeatReservationCommandId属性的值。如果希望对其他消息执行这种检查,则需要在流程管理器中存储更多信息。

检测重复的OrderPlaced事件

为了检测重复的OrderPlaced事件,RegistrationProcessManagerRouter类现在执行一个检查,以查看事件是否已经被处理。V3版本的新代码如下面的代码示例所示:

public void Handle(OrderPlaced @event) { using (var context = this.contextFactory.Invoke()) { var pm = context.Find(x => x.OrderId == @event.SourceId); if (pm == null) { pm = new RegistrationProcessManager(); } pm.Handle(@event); context.Save(pm); } } 当RegistrationProcessManager类保存状态并发送命令时创建伪事务

Azure中不可能有包含将RegistrationProcessManager持久化到存储里并发送命令的事务。因此,团队决定保存流程管理器生成的所有命令,以便在流程崩溃时不会丢失这些命令,它们可以稍后发送。我们使用另一个进程来可靠地处理发送命令。

Markus(软件开发人员)发言:

已经迁移到V3版本的迁移实用程序更新了数据库模式,以适应新的存储需求。

下面来自SqlProcessDataContext类的代码示例显示了系统如何持久化所有命令以及进程管理器的状态:

public void Save(T process) { var entry = this.context.Entry(process); if (entry.State == System.Data.EntityState.Detached) this.context.Set<T>().Add(process); var commands = process.Commands.ToList(); UndispatchedMessages undispatched = null; if (commands.Count > 0) { // If there are pending commands to send, we store them as undispatched. undispatched = new UndispatchedMessages(process.Id) { Commands = this.serializer.Serialize(commands) }; this.context.Set<UndispatchedMessages>().Add(undispatched); } try { this.context.SaveChanges(); } catch (DbUpdateConcurrencyException e) { throw new ConcurrencyException(e.Message, e); } this.DispatchMessages(undispatched, commands); }

下面来自SqlProcessDataContext类的代码示例展示了系统如何发送命令消息:

private void DispatchMessages(UndispatchedMessages undispatched, List<Envelope<ICommand>> deserializedCommands = null) { if (undispatched != null) { if (deserializedCommands == null) { deserializedCommands = this.serializer.Deserialize<IEnumerable<Envelope<ICommand>>>(undispatched.Commands).ToList(); } var originalCommandsCount = deserializedCommands.Count; try { while (deserializedCommands.Count > 0) { this.commandBus.Send(deserializedCommands.First()); deserializedCommands.RemoveAt(0); } } catch (Exception) { // We catch a generic exception as we don't know what implementation of ICommandBus we might be using. if (originalCommandsCount != deserializedCommands.Count) { // If we were able to send some commands, then update the undispatched messages. undispatched.Commands = this.serializer.Serialize(deserializedCommands); try { this.context.SaveChanges(); } catch (DbUpdateConcurrencyException) { // If another thread already dispatched the messages, ignore and surface original exception instead. } } throw; } // We remove all the undispatched messages for this process manager. this.context.Set<UndispatchedMessages>().Remove(undispatched); this.retryPolicy.ExecuteAction(() => this.context.SaveChanges()); } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpspxg.html