ABP vNext 封装了两种事件总线结构,第一种是 ABP vNext 自己实现的本地事件总线,这种事件总线无法跨项目发布和订阅。第二种则是分布式事件总线,ABP vNext 自己封装了一个抽象层进行定义,并使用 RabbitMQ 编写了一个基本实现。
在使用方式上,两种事件总线的作用基本相同。
事件总线分布在两个模块,在 Volo.Abp.EventBus 模块内部,定义了事件总线的抽象接口,以及本地事件总线 (ILocalEventBus) 的实现。分布式事件总线的具体实现,是在 Volo.Abp.EventBus.RabbitMQ 模块内部进行定义,从项目名称可以看出来,这个模块是基于 RabbitMQ 消息队列实现的。
但是该项目并不是直接引用 RabbitMQ.Client 包,而是在 Volo.Abp.RabbitMQ 项目内部引用。这是因为除了分布式事件总线以外,ABP 还基于 RabbitMQ 实现了一个后台作业管理器。
ABP vNext 框架便将一些对象抽象出来,放在 Volo.Abp.RabbitMQ 项目内部进行定义和实现。
二、源码分析 2.1 事件处理器的注册分析源码,首先从一个项目的模块开始,Volo.Abp.EventBus 库的模块 AbpEventBusModule 只干了一件事情。在组件注册的时候,根据组件的实现接口 (ILocalEventHandler 或 IDistributedEventHandler) 不同,将其赋值给 AbpLocalEventBusOptions 与 AbpDistributedEventBusOptions 的 Handlers 属性。
也就是说,开发人员定义的事件处理程序 (Handler) 都会在依赖注入的时候,都会将其类型 (Type) 添加到事件总线的配置类当中,方便后续进行使用。
2.2 事件总线的接口通过事件总线模块的单元测试我们可以知道,事件的发布与订阅都是通过 IEventBus 的两个子接口 (ILocalEventBus/IDistributedEventBus) 进行的。在 IEventBus 接口的定义中,有三种行为,分别是 发布、订阅、取消订阅。
对于 ILocalEventBus 接口和 IDistributedEventBus 接口来说,它们都提供了一个,针对本地事件处理器和分布式处理器的特殊订阅方法。
ILocalEventBus:
/// <summary> /// Defines interface of the event bus. /// </summary> public interface ILocalEventBus : IEventBus { /// <summary> /// Registers to an event. /// Same (given) instance of the handler is used for all event occurrences. /// </summary> /// <typeparam>Event type</typeparam> /// <param>Object to handle the event</param> IDisposable Subscribe<TEvent>(ILocalEventHandler<TEvent> handler) where TEvent : class; }IDistributedEventBus:
public interface IDistributedEventBus : IEventBus { /// <summary> /// Registers to an event. /// Same (given) instance of the handler is used for all event occurrences. /// </summary> /// <typeparam>Event type</typeparam> /// <param>Object to handle the event</param> IDisposable Subscribe<TEvent>(IDistributedEventHandler<TEvent> handler) where TEvent : class; } 2.3 事件总线基本流程和实现同其他模块一样,因为有分布式事件总线和本地事件总线,ABP vNext 同样抽象了一个 EventBusBase 类型,作为它们的基类实现。
一般的流程,我们是先定义某个事件,然后订阅该事件并指定事件处理器,最后在某个时刻发布事件。例如下面的代码:
首先定义了一个事件处理器,专门用于处理 EntityChangedEventData<MyEntity> 事件。
public class MyEventHandler : ILocalEventHandler<EntityChangedEventData<MyEntity>> { public int EntityChangedEventCount { get; set; } public Task HandleEventAsync(EntityChangedEventData<MyEntity> eventData) { EntityChangedEventCount++; return Task.CompletedTask; } } var handler = new MyEventHandler(); LocalEventBus.Subscribe<EntityChangedEventData<MyEntity>>(handler); await LocalEventBus.PublishAsync(new EntityCreatedEventData<MyEntity>(new MyEntity())); 2.3.1 事件的订阅可以看到,这里使用的是 ILocalEventBus 定义的订阅方法,跳转到内部实现,它还是调用的 EventBus 的方法。
public virtual IDisposable Subscribe<TEvent>(ILocalEventHandler<TEvent> handler) where TEvent : class { // 调用基类的 Subscribe 方法,并传递 TEvent 的类型,和事件处理器。 return Subscribe(typeof(TEvent), handler); } public virtual IDisposable Subscribe(Type eventType, IEventHandler handler) { return Subscribe(eventType, new SingleInstanceHandlerFactory(handler)); }