动手造轮子:实现一个简单的 EventBus (2)

EventBus 的实现,从上面可以看到 EventStore 保存的是 IEventHandler 对应的 Type,在 Publish 的时候根据 Type 从 IoC 容器中取得相应的 Handler 即可,如果没有在 IoC 容器中找到对应的类型,则会尝试创建一个类型实例,然后调用 IEventHandler 的 Handle 方法,代码如下:

/// <summary> /// EventBus in process /// </summary> public class EventBus : IEventBus { private static readonly ILogHelperLogger Logger = Helpers.LogHelper.GetLogger<EventBus>(); private readonly IEventStore _eventStore; private readonly IServiceProvider _serviceProvider; public EventBus(IEventStore eventStore, IServiceProvider serviceProvider = null) { _eventStore = eventStore; _serviceProvider = serviceProvider ?? DependencyResolver.Current; } public bool Publish<TEvent>(TEvent @event) where TEvent : IEventBase { if (!_eventStore.HasSubscriptionsForEvent<TEvent>()) { return false; } var handlers = _eventStore.GetEventHandlerTypes<TEvent>(); if (handlers.Count > 0) { var handlerTasks = new List<Task>(); foreach (var handlerType in handlers) { try { if (_serviceProvider.GetServiceOrCreateInstance(handlerType) is IEventHandler<TEvent> handler) { handlerTasks.Add(handler.Handle(@event)); } } catch (Exception ex) { Logger.Error(ex, $"handle event [{_eventStore.GetEventKey<TEvent>()}] error, eventHandlerType:{handlerType.FullName}"); } } handlerTasks.WhenAll().ConfigureAwait(false); return true; } return false; } public bool Subscribe<TEvent, TEventHandler>() where TEvent : IEventBase where TEventHandler : IEventHandler<TEvent> { return _eventStore.AddSubscription<TEvent, TEventHandler>(); } public bool Unsubscribe<TEvent, TEventHandler>() where TEvent : IEventBase where TEventHandler : IEventHandler<TEvent> { return _eventStore.RemoveSubscription<TEvent, TEventHandler>(); } } 项目实例

来看一个实际的项目中的使用,在我的活动室预约项目中有一个公告的模块,访问公告详情页面,这个公告的访问次数加1,把这个访问次数加1改成了用 EventBus 来实现,实际项目代码:https://github.com/WeihanLi/ActivityReservation/blob/67e2cb8e92876629a7af6dc051745dd8c7e9faeb/ActivityReservation/Startup.cs

定义 Event 以及 EventHandler

public class NoticeViewEvent : EventBase { public Guid NoticeId { get; set; } // UserId // IP // ... } public class NoticeViewEventHandler : IEventHandler<NoticeViewEvent> { public async Task Handle(NoticeViewEvent @event) { await DependencyResolver.Current.TryInvokeServiceAsync<ReservationDbContext>(async dbContext => { var notice = await dbContext.Notices.FindAsync(@event.NoticeId); notice.NoticeVisitCount += 1; await dbContext.SaveChangesAsync(); }); } }

这里的 Event 只定义了一个 NoticeId ,其实也可以把请求信息如IP/UA等信息加进去,在 EventHandler里处理以便日后数据分析。

注册 EventBus 相关服务以及 EventHandlers

services.AddSingleton<IEventBus, EventBus>(); services.AddSingleton<IEventStore, EventStoreInMemory>(); //register EventHandlers services.AddSingleton<NoticeViewEventHandler>();

订阅事件

public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory, IEventBus eventBus) { eventBus.Subscribe<NoticeViewEvent, NoticeViewEventHandler>(); // ... }

发布事件

eventBus.Publish(new NoticeViewEvent { NoticeId = notice.NoticeId }); Reference

https://github.com/dotnet-architecture/eShopOnContainers

https://docs.microsoft.com/zh-cn/previous-versions/msp-n-p/jj591559(v=pandp.10)

https://docs.microsoft.com/en-us/dotnet/standard/microservices-architecture/multi-container-microservice-net-applications/integration-event-based-microservice-communications

https://docs.microsoft.com/en-us/dotnet/standard/microservices-architecture/multi-container-microservice-net-applications/subscribe-events

https://github.com/sheng-jie/EventBus

https://github.com/WeihanLi/ActivityReservation

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zydxyd.html