看eShopOnContainers学一个EventBus

最近在看微软eShopOnContainers 项目,看到EventBus觉得不错,和大家分享一下

看完此文你将获得什么?

eShop中是如何设计事件总线的

实现一个InMemory事件总线eShop中是没有InMemory实现的,这算是一个小小小的挑战

发布订阅模式

发布订阅模式可以让应用程序组件之间解耦,这是我们使用这种模式最重要的理由之一,如果你完全不知道这个东西,建议你先通过搜索引擎了解一下这种模式,网上的资料很多这里就不再赘述了。

eShop中的EventBus就是基于这种模式的发布/订阅。
发布订阅模式核心概念有三个:发布者、订阅者、调度中心,这些概念在消息队列中就是生产者、消费者、MQ实例

在eShop中有两个EventBus的实现:

基于RabbitMq的EventBusRabbitMQ

基于AzureServiceBus的EventBusServiceBus。

从IEventBus开始

先来看一看,所有EventBus的接口IEventBus

public interface IEventBus { void Publish(IntegrationEvent @event); void Subscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T>; void SubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler; void UnsubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler; void Unsubscribe<T, TH>() where TH : IIntegrationEventHandler<T> where T : IntegrationEvent; }

嗯,乍一看看是有点眼晕的,仔细看它的核心功能只有三个:

Publish 发布

Subscribe 订阅

Unsubscribe 取消订阅

这对应着发布订阅模式的基本概念,不过对于事件总线的接口添加了许多约束:

发布的内容(消息)必须是IntegrationEvent及其子类

订阅事件必须指明要订阅事件的类型,并附带处理器类型

处理器必须是IIntegrationEventHandler的实现类

Ok,看到这里先不要管Dynamic相关的方法,然后记住这个两个关键点:

事件必须继承IntegrationEvent

处理器必须实现IIntegrationEventHandler<T>且T是IntegrationEvent子类

另外,看下 IntegrationEvent有什么

public class IntegrationEvent { public IntegrationEvent() { Id = Guid.NewGuid(); CreationDate = DateTime.UtcNow; } public Guid Id { get; } public DateTime CreationDate { get; } } IEventBusSubscriptionsManager是什么 public interface IEventBusSubscriptionsManager { bool IsEmpty { get; } event EventHandler<string> OnEventRemoved; void AddDynamicSubscription<TH>(string eventName) where TH : IDynamicIntegrationEventHandler; void AddSubscription<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T>; void RemoveSubscription<T, TH>() where TH : IIntegrationEventHandler<T> where T : IntegrationEvent; void RemoveDynamicSubscription<TH>(string eventName) where TH : IDynamicIntegrationEventHandler; bool HasSubscriptionsForEvent<T>() where T : IntegrationEvent; bool HasSubscriptionsForEvent(string eventName); Type GetEventTypeByName(string eventName); void Clear(); IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>() where T : IntegrationEvent; IEnumerable<SubscriptionInfo> GetHandlersForEvent(string eventName); string GetEventKey<T>(); }

这个接口看起来稍显复杂些,我们来简化下看看:

public interface IEventBusSubscriptionsManager { void AddSubscription<T, TH>() void RemoveSubscription<T, TH>() IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>() }

最终,这三个方法就是我们要关注的,添加订阅、移除订阅、获取指定事件的订阅信息。

SubscriptionInfo是什么? public bool IsDynamic { get; } public Type HandlerType{ get; }

SubscriptionInfo中只有两个信息,这是不是一个Dynamic类型的Event以及这个Event所对应的处理器的类型。

这是你可能会有另一个疑问:

这个和IEventBus有什么关系?

IEventBusSubscriptionsManager含有更多功能:查看是否有订阅,获取事件的Type,获取事件的处理器等等

IEventBusSubscriptionsManager由IEventBus使用,在RabbitMq和ServiceBus的实现中,都使用Manager去存储事件的信息,例如下面的代码:

public void Subscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T> { // 查询事件的全名 var eventName = _subsManager.GetEventKey<T>(); //向mq添加注册 DoInternalSubscription(eventName); // 向manager添加订阅 _subsManager.AddSubscription<T, TH>(); } private void DoInternalSubscription(string eventName) { var containsKey = _subsManager.HasSubscriptionsForEvent(eventName); if (!containsKey) { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } using (var channel = _persistentConnection.CreateModel()) { channel.QueueBind(queue: _queueName, exchange: BROKER_NAME, routingKey: eventName); } } }

查询事件的名字是manager做的,订阅的时候是先向mq添加订阅,之后又加到manager中,manager管理着订阅的基本信息。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zywxxp.html