[Abp vNext 源码分析] - 13. 本地事件总线与分布式事件总线 (Rabbit MQ) (4)

在上述代码内部,都还没有真正执行事件处理器,真正的事件处理器执行程序是在下面的方法进行执行的。ABP vNext 通过引入 IEventDataWithInheritableGenericArgument 接口,实现了 类型继承事件 的触发,该接口提供了一个 GetConstructorArgs() 方法定义,方便后面生成构造参数。

例如有一个基础事件叫做 EntityEventData<Student>,如果 Student 继承自 Person,那么在触发该事件的时候,也会发布一个 EntityEventData<Person> 事件。

2.3.3 事件处理器的执行

真正事件处理器的执行,是通过下面的方法实现的,大概思路就是通过事件总线工厂,构建了事件处理器的实例。通过反射,调用事件处理器的 HandleEventAsync() 方法。如果在处理过程当中,出现了异常,则将异常数据放置在 List<Exception> 集合当中。

protected virtual async Task TriggerHandlerAsync(IEventHandlerFactory asyncHandlerFactory, Type eventType, object eventData, List<Exception> exceptions) { using (var eventHandlerWrapper = asyncHandlerFactory.GetHandler()) { try { // 获得事件处理器的类型。 var handlerType = eventHandlerWrapper.EventHandler.GetType(); // 判断事件处理器是本地事件还是分布式事件。 if (ReflectionHelper.IsAssignableToGenericType(handlerType, typeof(ILocalEventHandler<>))) { // 获得方法定义。 var method = typeof(ILocalEventHandler<>) .MakeGenericType(eventType) .GetMethod( nameof(ILocalEventHandler<object>.HandleEventAsync), new[] { eventType } ); // 使用工厂创建的实例调用方法。 await (Task)method.Invoke(eventHandlerWrapper.EventHandler, new[] { eventData }); } else if (ReflectionHelper.IsAssignableToGenericType(handlerType, typeof(IDistributedEventHandler<>))) { var method = typeof(IDistributedEventHandler<>) .MakeGenericType(eventType) .GetMethod( nameof(IDistributedEventHandler<object>.HandleEventAsync), new[] { eventType } ); await (Task)method.Invoke(eventHandlerWrapper.EventHandler, new[] { eventData }); } else { // 如果都不是,则说明类型不正确,抛出异常。 throw new AbpException("The object instance is not an event handler. Object type: " + handlerType.AssemblyQualifiedName); } } // 捕获到异常都统一添加到异常集合当中。 catch (TargetInvocationException ex) { exceptions.Add(ex.InnerException); } catch (Exception ex) { exceptions.Add(ex); } } } 2.4 分布式事件总线

分布式事件总线的实现都存放在 Volo.Abp.EventBus.RabbitMQ,该项目的代码比较少,由三个文件构成。

[Abp vNext 源码分析] - 13. 本地事件总线与分布式事件总线 (Rabbit MQ)

在 RabbitMQ 模块的内部,只干了两件事情。首先从 JSON 配置文件当中,获取 AbpRabbitMqEventBusOptions 配置的三个参数,然后解析 RabbitMqDistributedEventBus 实例,并调用初始化方法 (Initialize())。

[DependsOn( typeof(AbpEventBusModule), typeof(AbpRabbitMqModule))] public class AbpEventBusRabbitMqModule : AbpModule { public override void ConfigureServices(ServiceConfigurationContext context) { var configuration = context.Services.GetConfiguration(); // 从配置文件读取配置。 Configure<AbpRabbitMqEventBusOptions>(configuration.GetSection("RabbitMQ:EventBus")); } public override void OnApplicationInitialization(ApplicationInitializationContext context) { // 调用初始化方法。 context .ServiceProvider .GetRequiredService<RabbitMqDistributedEventBus>() .Initialize(); } } 2.4.1 分布式事件总线的初始化 public void Initialize() { // 创建一个消费者,并配置交换器和队列。 Consumer = MessageConsumerFactory.Create( new ExchangeDeclareConfiguration( AbpRabbitMqEventBusOptions.ExchangeName, type: "direct", durable: true ), new QueueDeclareConfiguration( AbpRabbitMqEventBusOptions.ClientName, durable: true, exclusive: false, autoDelete: false ), AbpRabbitMqEventBusOptions.ConnectionName ); // 消费者在消费消息的时候,具体的执行逻辑。 Consumer.OnMessageReceived(ProcessEventAsync); // 调用基类的方法,自动订阅对应的事件。 SubscribeHandlers(AbpDistributedEventBusOptions.Handlers); } 2.4.2 分布式事件的订阅

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wspdgg.html