看eShopOnContainers学一个EventBus (2)

另外一个重要功能是获取事件的处理器信息,在rabbit mq的实现中,ProcessEvent方法中用manager获取了事件的处理器,再用依赖注入获得处理器的实例,反射调用Handle方法处理事件信息:

private async Task ProcessEvent(string eventName, string message) { // 从manager查询信息 if (_subsManager.HasSubscriptionsForEvent(eventName)) { using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)) { // 从manager获取处理器 var subscriptions = _subsManager.GetHandlersForEvent(eventName); foreach (var subscription in subscriptions) { // Di + 反射调用,处理事件(两个都是,只是针对是否是dynamic做了不同的处理) if (subscription.IsDynamic) { var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler; dynamic eventData = JObject.Parse(message); await handler.Handle(eventData); } else { var eventType = _subsManager.GetEventTypeByName(eventName); var integrationEvent = JsonConvert.DeserializeObject(message, eventType); var handler = scope.ResolveOptional(subscription.HandlerType); var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); } } } } } IEventBusSubscriptionsManager的默认实现

在eShop中只有一个实现就是InMemoryEventBusSubscriptionsManager类

这个类中有两个重要的字段

private readonly Dictionary<string, List<SubscriptionInfo>> _handlers; private readonly List<Type> _eventTypes;

他们分别存储了事件列表和时间处理器信息词典

接下来就是实现一个

基于内存的事件总线

我们要做什么呢?IEventBusSubscriptionsManager 已经有了InMemory的实现了,我们可以直接拿来用,所以我们只需要自己实现一个EventBus就好了

先贴出最终代码:

public class InMemoryEventBus : IEventBus { private readonly IServiceProvider _provider; private readonly ILogger<InMemoryEventBus> _logger; private readonly ISubscriptionsManager _manager; private readonly IList<IntegrationEvent> _events; public InMemoryEventBus( IServiceProvider provider, ILogger<InMemoryEventBus> logger, ISubscriptionsManager manager) { _provider = provider; _logger = logger; _manager = manager; } public void Publish(IntegrationEvent e) { var eventType = e.GetType(); var handlers = _manager.GetHandlersForEvent(eventType.FullName); foreach (var handlerInfo in handlers) { var handler = _provider.GetService(handlerInfo.HandlerType); var method = handlerInfo.HandlerType.GetMethod("Handle"); method.Invoke(handler, new object[] { e }); } } public void Subscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T> { _manager.AddSubscription<T, TH>(); } public void SubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler { throw new NotImplementedException(); } public void Unsubscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T> { _manager.RemoveSubscription<T, TH>(); } public void UnsubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler { throw new NotImplementedException(); } }

首先构造函数中声明我们要使用的东西:

public InMemoryEventBus( IServiceProvider provider, ILogger<InMemoryEventBus> logger, ISubscriptionsManager manager) { _provider = provider; _logger = logger; _manager = manager; }

这里要注意的就是IServiceProvider provider这是 DI容器,当我们在切实处理事件的时候我们选择从DI获取处理器的实例,而不是反射创建,这要做的好处在于,处理器可以依赖于其它东西,并且可以是单例的

public void Subscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T> { _manager.AddSubscription<T, TH>(); } public void Unsubscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T> { _manager.RemoveSubscription<T, TH>(); }

订阅和取消订阅很简单,因为我们是InMemory的所以只调用了manager的方法。

接下来就是最重要的Publish方法,实现Publish有两种方式:

使用额外的线程和Queue让发布和处理异步

为了简单起见,我们先写个简单易懂的同步的

public void Publish(IntegrationEvent e)
{
// 首先要拿到集成事件的Type信息
var eventType = e.GetType();

// 获取属于这个事件的处理器列表,可能有很多,注意获得的是SubscriptionInfo var handlers = _manager.GetHandlersForEvent(eventType.FullName); // 不解释循环 foreach (var handlerInfo in handlers) { // 从DI中获取类型的实例 var handler = _provider.GetService(handlerInfo.HandlerType); // 拿到Handle方法 var method = handlerInfo.HandlerType.GetMethod("Handle"); // 调用方法 await (Task) method.Invoke(handler, new object[] { e }); }

}

OK,我们的InMemoryEventBus就写好了!

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zywxxp.html