[Abp vNext 源码分析] - 13. 本地事件总线与分布式事件总线 (Rabbit MQ)

ABP vNext 封装了两种事件总线结构,第一种是 ABP vNext 自己实现的本地事件总线,这种事件总线无法跨项目发布和订阅。第二种则是分布式事件总线,ABP vNext 自己封装了一个抽象层进行定义,并使用 RabbitMQ 编写了一个基本实现。

在使用方式上,两种事件总线的作用基本相同。

事件总线分布在两个模块,在 Volo.Abp.EventBus 模块内部,定义了事件总线的抽象接口,以及本地事件总线 (ILocalEventBus) 的实现。分布式事件总线的具体实现,是在 Volo.Abp.EventBus.RabbitMQ 模块内部进行定义,从项目名称可以看出来,这个模块是基于 RabbitMQ 消息队列实现的。

[Abp vNext 源码分析] - 13. 本地事件总线与分布式事件总线 (Rabbit MQ)

[Abp vNext 源码分析] - 13. 本地事件总线与分布式事件总线 (Rabbit MQ)

但是该项目并不是直接引用 RabbitMQ.Client 包,而是在 Volo.Abp.RabbitMQ 项目内部引用。这是因为除了分布式事件总线以外,ABP 还基于 RabbitMQ 实现了一个后台作业管理器。

[Abp vNext 源码分析] - 13. 本地事件总线与分布式事件总线 (Rabbit MQ)

ABP vNext 框架便将一些对象抽象出来,放在 Volo.Abp.RabbitMQ 项目内部进行定义和实现。

[Abp vNext 源码分析] - 13. 本地事件总线与分布式事件总线 (Rabbit MQ)

二、源码分析 2.1 事件处理器的注册

分析源码,首先从一个项目的模块开始,Volo.Abp.EventBus 库的模块 AbpEventBusModule 只干了一件事情。在组件注册的时候,根据组件的实现接口 (ILocalEventHandler 或 IDistributedEventHandler) 不同,将其赋值给 AbpLocalEventBusOptions 与 AbpDistributedEventBusOptions 的 Handlers 属性。

也就是说,开发人员定义的事件处理程序 (Handler) 都会在依赖注入的时候,都会将其类型 (Type) 添加到事件总线的配置类当中,方便后续进行使用。

2.2 事件总线的接口

通过事件总线模块的单元测试我们可以知道,事件的发布与订阅都是通过 IEventBus 的两个子接口 (ILocalEventBus/IDistributedEventBus) 进行的。在 IEventBus 接口的定义中,有三种行为,分别是 发布订阅取消订阅

对于 ILocalEventBus 接口和 IDistributedEventBus 接口来说,它们都提供了一个,针对本地事件处理器和分布式处理器的特殊订阅方法。

ILocalEventBus

/// <summary> /// Defines interface of the event bus. /// </summary> public interface ILocalEventBus : IEventBus { /// <summary> /// Registers to an event. /// Same (given) instance of the handler is used for all event occurrences. /// </summary> /// <typeparam>Event type</typeparam> /// <param>Object to handle the event</param> IDisposable Subscribe<TEvent>(ILocalEventHandler<TEvent> handler) where TEvent : class; }

IDistributedEventBus

public interface IDistributedEventBus : IEventBus { /// <summary> /// Registers to an event. /// Same (given) instance of the handler is used for all event occurrences. /// </summary> /// <typeparam>Event type</typeparam> /// <param>Object to handle the event</param> IDisposable Subscribe<TEvent>(IDistributedEventHandler<TEvent> handler) where TEvent : class; } 2.3 事件总线基本流程和实现

同其他模块一样,因为有分布式事件总线和本地事件总线,ABP vNext 同样抽象了一个 EventBusBase 类型,作为它们的基类实现。

一般的流程,我们是先定义某个事件,然后订阅该事件并指定事件处理器,最后在某个时刻发布事件。例如下面的代码:

首先定义了一个事件处理器,专门用于处理 EntityChangedEventData<MyEntity> 事件。

public class MyEventHandler : ILocalEventHandler<EntityChangedEventData<MyEntity>> { public int EntityChangedEventCount { get; set; } public Task HandleEventAsync(EntityChangedEventData<MyEntity> eventData) { EntityChangedEventCount++; return Task.CompletedTask; } } var handler = new MyEventHandler(); LocalEventBus.Subscribe<EntityChangedEventData<MyEntity>>(handler); await LocalEventBus.PublishAsync(new EntityCreatedEventData<MyEntity>(new MyEntity())); 2.3.1 事件的订阅

可以看到,这里使用的是 ILocalEventBus 定义的订阅方法,跳转到内部实现,它还是调用的 EventBus 的方法。

public virtual IDisposable Subscribe<TEvent>(ILocalEventHandler<TEvent> handler) where TEvent : class { // 调用基类的 Subscribe 方法,并传递 TEvent 的类型,和事件处理器。 return Subscribe(typeof(TEvent), handler); } public virtual IDisposable Subscribe(Type eventType, IEventHandler handler) { return Subscribe(eventType, new SingleInstanceHandlerFactory(handler)); }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wspdgg.html