EventBus/EventQueue 再思考 Intro
之前写过两篇文章,造轮子系列的 EventBus/EventQueue,回想起来觉得当前的想法有点问题,当时对 EvenStore 可能有点误解,有兴趣可以参考 https://www.cnblogs.com/weihanli/p/implement-a-simple-event-bus.html/https://www.cnblogs.com/weihanli/p/implement-event-queue.html,
最近把 Event 相关的逻辑做了一个重构,修改 EventStore,重新设计了 Event 相关的组件
重构后的 EventEvent: 事件的抽象定义
EventHandler:事件处理器抽象定义
EventHandlerFactory:事件处理器工厂,用来根据事件类型获取事件处理器(新增)
EventPublisher:事件发布器,用于事件发布
EventSubscriber:事件订阅器,用于管理事件的订阅
EventSubscriptionManager:事件订阅管理器,在 EventSubscriber 的基础上增加了一个根据事件类型获取事件订阅器类型的方法
EventBus:事件总线,由 EventPubliser 和 EventSubscriber 组合而成,用来比较方便的做事件发布和订阅
EventQueue:事件队列,希望某些消息顺序处理的时候可以考虑用 EventQueue 的模式
EventStore:事件存储,事件的持久化存储(在之前的版本里,EventStore 实际作用是一个 EventSubscriptionManager,在最近的版本更新中已修改)
以上 EventSubscriber 和 EventSubscriptionManager 一般不直接用,一般用 EventBus 来处理即可
EventHandlerFactory这次引入了 EventHandlerFactory 用来抽象获取 EventHandler 的逻辑,原来的设计里是在处理 Event 的时候获取 EventHandler 的类型,然后从依赖注入框架中获取或创建新的 event handler 实例之后再调用 EventHandler 的 Handle 方法处理事件,有一些冗余
使用 EventHandlerFactory 之后就可以直接获取一个 EventHandler 实例集合,具体是实例化还是从依赖注入中获取就由 EventHandlerFactory 来决定了,这样就可以对依赖注入很友好,对于基于内存的简单 EventBus 来说,在服务注册之后就不需要再调用 Subscribe 去显式订阅了,因为再注册服务的时候就已经隐式实现了订阅的逻辑,这样实际就不需要 EventSubscriptionManager 来管理订阅了,订阅信息都在依赖注入框架内部,比如说 CounterEvent,要获取它的订阅信息,我只需要从依赖注入框架中获取 IEventHandler<CounterEvent> 的实例即可,实际就代替了原先 “EventStoreInMemory”,现在的 EventSubscriptionManagerInMemory
基于依赖注入的 EventHandlerFactory 定义:
public sealed class DependencyInjectionEventHandlerFactory : IEventHandlerFactory { private readonly IServiceProvider _serviceProvider; public DependencyInjectionEventHandlerFactory(IServiceProvider serviceProvider = null) { _serviceProvider = serviceProvider ?? DependencyResolver.Current; } public ICollection<IEventHandler> GetHandlers(Type eventType) { var eventHandlerType = typeof(IEventHandler<>).MakeGenericType(eventType); return _serviceProvider.GetServices(eventHandlerType).Cast<IEventHandler>().ToArray(); } }如果不使用依赖注入,也可以根据 IEventSubscriptionManager 订阅信息来实现:
public sealed class DefaultEventHandlerFactory : IEventHandlerFactory { private readonly IEventSubscriptionManager _subscriptionManager; private readonly ConcurrentDictionary<Type, ICollection<IEventHandler>> _eventHandlers = new ConcurrentDictionary<Type, ICollection<IEventHandler>>(); private readonly IServiceProvider _serviceProvider; public DefaultEventHandlerFactory(IEventSubscriptionManager subscriptionManager, IServiceProvider serviceProvider = null) { _subscriptionManager = subscriptionManager; _serviceProvider = serviceProvider ?? DependencyResolver.Current; } public ICollection<IEventHandler> GetHandlers(Type eventType) { var eventHandlers = _eventHandlers.GetOrAdd(eventType, type => { var handlerTypes = _subscriptionManager.GetEventHandlerTypes(type); var handlers = handlerTypes .Select(t => (IEventHandler)_serviceProvider.GetServiceOrCreateInstance(t)) .ToArray(); return handlers; }); return eventHandlers; } } EventQueue Demo来看一下 EventQueue 的示例,示例基于 asp.net core 的,定义了一个 HostedService 来实现一个 EventConsumer 来消费 EventQueue 中的事件信息
EventConsumer 定义如下:
public class EventConsumer : BackgroundService { private readonly IEventQueue _eventQueue; private readonly IEventHandlerFactory _eventHandlerFactory; public EventConsumer(IEventQueue eventQueue, IEventHandlerFactory eventHandlerFactory) { _eventQueue = eventQueue; _eventHandlerFactory = eventHandlerFactory; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { while (!stoppingToken.IsCancellationRequested) { var queues = await _eventQueue.GetQueuesAsync(); if (queues.Count > 0) { await queues.Select(async q => { var @event = await _eventQueue.DequeueAsync(q); if (null != @event) { var handlers = _eventHandlerFactory.GetHandlers(@event.GetType()); if (handlers.Count > 0) { await handlers .Select(h => h.Handle(@event)) .WhenAll() ; } } }) .WhenAll() ; } await Task.Delay(1000, stoppingToken); } } }