CQRS之旅——旅程7(增加弹性和优化性能) (13)

注意这个类是如何尝试在不使用服务总线的情况下同步发送命令,但是如果它找不到该命令的处理程序,它将返回到使用服务总线。下面的代码示例展示了CommandDispatcher类如何试图定位处理程序并传递命令消息:

public bool ProcessMessage(string traceIdentifier, ICommand payload, string messageId, string correlationId) { var commandType = payload.GetType(); ICommandHandler handler = null; if (this.handlers.TryGetValue(commandType, out handler)) { Trace.WriteLine("-- Handled by " + handler.GetType().FullName + traceIdentifier); ((dynamic)handler).Handle((dynamic)payload); return true; } else { return false; } } 使用memento模式实现快照

在Contoso会议管理系统中,唯一有事件源的聚合就是可用座位(SeatAvailability)聚合。它可能包含大量的事件并且可以从快照中获益。

Markus(软件开发人员)发言:

因为我们选择使用memento模式,所以聚合状态的快照存储在memento中。

下面的代码示例来自AzureEventSourcedRepository类中的Save方法,展示了如果存在缓存且聚合实现了IMementoOriginator接口,系统如何创建缓存的memento对象。

public void Save(T eventSourced, string correlationId) { ... this.cacheMementoIfApplicable.Invoke(eventSourced); }

然后,当系统调用AzureEventSourcedRepository类中的Find方法加载聚合时,它会检查是否有缓存的memento其中包含要使用对象状态的快照:

private readonly Func<Guid, Tuple<IMemento, DateTime?>> getMementoFromCache; ... public T Find(Guid id) { var cachedMemento = this.getMementoFromCache(id); if (cachedMemento != null && cachedMemento.Item1 != null) { IEnumerable<IVersionedEvent> deserialized; if (!cachedMemento.Item2.HasValue || cachedMemento.Item2.Value < DateTime.UtcNow.AddSeconds(-1)) { deserialized = this.eventStore.Load(GetPartitionKey(id), cachedMemento.Item1.Version + 1).Select(this.Deserialize); } else { deserialized = Enumerable.Empty<IVersionedEvent>(); } return this.originatorEntityFactory.Invoke(id, cachedMemento.Item1, deserialized); } else { var deserialized = this.eventStore.Load(GetPartitionKey(id), 0) .Select(this.Deserialize) .AsCachedAnyEnumerable(); if (deserialized.Any()) { return this.entityFactory.Invoke(id, deserialized); } } return null; }

如果缓存条目在最近几秒钟内更新了,那么它很有可能没有过期,因为我们只有一个写入者用于高争用聚合。因此,当memento创建之后,我们乐观的不用在事件存储中检查新事件。否则,我们需要在事件存储中检查创建memento之后到达的事件。

下面的代码示例显示了SeatsAvailability类如何将其状态数据快照添加到要缓存的memento对象中:

public IMemento SaveToMemento() { return new Memento { Version = this.Version, RemainingSeats = this.remainingSeats.ToArray(), PendingReservations = this.pendingReservations.ToArray(), }; } 并行发布事件

在第5章“准备发布V1版本”中,您了解了系统如何在将事件保存到事件存储时发布事件。这种优化使系统能够并行发布其中一些事件,而不是按顺序发布。重要的是,与特定聚合实例关联的事件必须按照正确的顺序发送,因此系统只为不同的分区键创建新任务。下面的代码示例来自EventStoreBusPublisher类中的Start方法,展示了如何定义并行任务:

Task.Factory.StartNew( () => { try { foreach (var key in GetThrottlingEnumerable(this.enqueuedKeys.GetConsumingEnumerable(cancellationToken), this.throttlingSemaphore, cancellationToken)) { if (!cancellationToken.IsCancellationRequested) { ProcessPartition(key); } else { this.enqueuedKeys.Add(key); return; } } } catch (OperationCanceledException) { return; } }, TaskCreationOptions.LongRunning);

SubscriptionReceiverSessionSubscriptionReceiver类使用相同的DynamicThrottling类来动态限制从服务总线检索消息的速度。

在订阅中过滤消息

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpspxg.html