[Abp vNext 源码分析] - 13. 本地事件总线与分布式事件总线 (Rabbit MQ) (3)

[Abp vNext 源码分析] - 13. 本地事件总线与分布式事件总线 (Rabbit MQ)

public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency { protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; } public LocalEventBus( IOptions<AbpLocalEventBusOptions> options, IServiceScopeFactory serviceScopeFactory) : base(serviceScopeFactory) { Options = options.Value; Logger = NullLogger<LocalEventBus>.Instance; HandlerFactories = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>(); // 调用父类的方法,将模块初始化时扫描到的事件处理器,都尝试进行订阅。 SubscribeHandlers(Options.Handlers); } public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) { GetOrCreateHandlerFactories(eventType) // 锁住集合,以确保线程安全。 .Locking(factories => { // 如果在集合内部,已经有了对应的工厂,则不进行添加。 if (!factory.IsInFactories(factories)) { factories.Add(factory); } } ); // 返回一个事件处理器工厂注销器,当调用 Dispose() 方法时,会取消之前订阅的事件。 return new EventHandlerFactoryUnregistrar(this, eventType, factory); } private List<IEventHandlerFactory> GetOrCreateHandlerFactories(Type eventType) { // 根据事件的类型,从字典中获得该类型的所有事件处理器工厂。 return HandlerFactories.GetOrAdd(eventType, (type) => new List<IEventHandlerFactory>()); } }

上述流程结合 EventBus 和 LocalEventBus 讲解了事件的订阅流程,事件的订阅操作都是对 HandlerFactories 的操作,往里面添加指定事件的事件处理器工厂,而每个工厂都是跟具体的事件处理器实例/类型进行关联的。

2.3.2 事件的发布

当开发人员需要发布事件的时候,一般都是通过对应的 EventBus,调用响应的 PublishAsync 方法,传递要触发的事件类型与事件数据。接口和基类当中,定义了两种发布方法的签名与实现:

public virtual Task PublishAsync<TEvent>(TEvent eventData) where TEvent : class { return PublishAsync(typeof(TEvent), eventData); } public abstract Task PublishAsync(Type eventType, object eventData);

[Abp vNext 源码分析] - 13. 本地事件总线与分布式事件总线 (Rabbit MQ)

第二种方法一共也分为本地事件总线的实现,和分布式事件总线的实现,本地事件比较简单,我们先分析本地事件总线的实现。

public override async Task PublishAsync(Type eventType, object eventData) { // 定义了一个异常集合,用于接收多个事件处理器执行时,产生的所有异常。 var exceptions = new List<Exception>(); // 触发事件处理器。 await TriggerHandlersAsync(eventType, eventData, exceptions); // 如果有任何异常产生,则抛出到之前的调用栈。 if (exceptions.Any()) { if (exceptions.Count == 1) { exceptions[0].ReThrow(); } throw new AggregateException("More than one error has occurred while triggering the event: " + eventType, exceptions); } }

可以看到真正的触发行为是在 TriggerHandlersAsync(Type eventType, object eventData, List<Exception> exceptions) 内部进行实现的。

protected virtual async Task TriggerHandlersAsync(Type eventType, object eventData, List<Exception> exceptions) { // 针对于这个的作用,等同于 ConfigureAwait(false) 。 // 具体可以参考 https://blogs.msdn.microsoft.com/benwilli/2017/02/09/an-alternative-to-configureawaitfalse-everywhere/。 await new SynchronizationContextRemover(); // 根据事件的类型,得到它的所有事件处理器工厂。 foreach (var handlerFactories in GetHandlerFactories(eventType)) { // 遍历所有的事件处理器工厂,通过 Factory 获得事件处理器,调用 Handler 的 HandleEventAsync 方法。 foreach (var handlerFactory in handlerFactories.EventHandlerFactories) { await TriggerHandlerAsync(handlerFactory, handlerFactories.EventType, eventData, exceptions); } } // 如果类型继承了 IEventDataWithInheritableGenericArgument 接口,那么会检测泛型参数是否有父类。 // 如果有父类,则会使用当前的事件数据,为其父类发布一个事件。 if (eventType.GetTypeInfo().IsGenericType && eventType.GetGenericArguments().Length == 1 && typeof(IEventDataWithInheritableGenericArgument).IsAssignableFrom(eventType)) { var genericArg = eventType.GetGenericArguments()[0]; var baseArg = genericArg.GetTypeInfo().BaseType; if (baseArg != null) { // 构造基类的事件类型,使用当前一样的泛型定义,只是泛型参数使用基类。 var baseEventType = eventType.GetGenericTypeDefinition().MakeGenericType(baseArg); // 构建类型的构造参数。 var constructorArgs = ((IEventDataWithInheritableGenericArgument)eventData).GetConstructorArgs(); // 通过事件类型和构造参数,构造一个新的事件数据实例。 var baseEventData = Activator.CreateInstance(baseEventType, constructorArgs); // 发布父类的同类事件。 await PublishAsync(baseEventType, baseEventData); } } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wspdgg.html