在定义分布式事件的时候,我们必须使用 EventNameAttribute 为事件声明路由键。
public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) { var handlerFactories = GetOrCreateHandlerFactories(eventType); if (factory.IsInFactories(handlerFactories)) { return NullDisposable.Instance; } handlerFactories.Add(factory); if (handlerFactories.Count == 1) //TODO: Multi-threading! { // 为消费者绑定一个路由键,在收到对应的事件时,就会触发之前绑定的方法。 Consumer.BindAsync(EventNameAttribute.GetNameOrDefault(eventType)); } return new EventHandlerFactoryUnregistrar(this, eventType, factory); }订阅的时候,除了 Consumer.BindAsync() 以外,基本流程和本地事件总线基本一致。
2.4.3 分布式事件的发布分布式事件总线一样重写了发布方法,内部首先使用 IRabbitMqSerializer 序列化器 (基于 JSON.NET) 将事件数据进行序列化,然后将消息投递出去。
public override Task PublishAsync(Type eventType, object eventData) { var eventName = EventNameAttribute.GetNameOrDefault(eventType); // 序列化事件数据。 var body = Serializer.Serialize(eventData); // 创建一个信道用于通讯。 using (var channel = ConnectionPool.Get(AbpRabbitMqEventBusOptions.ConnectionName).CreateModel()) { channel.ExchangeDeclare( AbpRabbitMqEventBusOptions.ExchangeName, "direct", durable: true ); // 更改投递模式为持久化模式。 var properties = channel.CreateBasicProperties(); properties.DeliveryMode = RabbitMqConsts.DeliveryModes.Persistent; // 发布一个新的事件。 channel.BasicPublish( exchange: AbpRabbitMqEventBusOptions.ExchangeName, routingKey: eventName, mandatory: true, basicProperties: properties, body: body ); } return Task.CompletedTask; } 2.4.4 分布式事件的执行执行逻辑都存放在 ProcessEventAsync(IModel channel, BasicDeliverEventArgs ea) 方法内部,基本就是监听到指定的消息,首先反序列化消息,调用父类的 TriggerHandlersAsync 去执行具体的事件处理器。
private async Task ProcessEventAsync(IModel channel, BasicDeliverEventArgs ea) { var eventName = ea.RoutingKey; var eventType = EventTypes.GetOrDefault(eventName); if (eventType == null) { return; } var eventData = Serializer.Deserialize(ea.Body, eventType); await TriggerHandlersAsync(eventType, eventData); } 三、总结ABP vNext 为我们实现了比较完善的本地事件总线,和基于 RabbitMQ 的分布式事件总线。在平时开发过程中,我们本地事件总线的使用频率应该还是比较高,而分布式事件总线目前仍处于一个半成品,很多高级特性还没实现,例如重试策略等。所以分布式事件总线要使用的话,建议使用较为成熟的 CAP 库替代 ABP vNext 的分布式事件总线。