动手造轮子:实现一个简单的 EventBus

动手轮子实现一个简单的 EventBus Intro

EventBus 是一种事件发布订阅模式,通过 EventBus 我们可以很方便的实现解耦,将事件的发起和事件的处理的很好的分隔开来,很好的实现解耦。 微软官方的示例项目 EShopOnContainers 也有在使用 EventBus 。

这里的 EventBus 实现也是参考借鉴了微软 eShopOnContainers 项目。

EventBus 处理流程:

event bus flow

微服务间使用 EventBus 实现系统间解耦:

event bus in microservice

借助 EventBus 我们可以很好的实现组件之间,服务之间,系统之间的解耦以及相互通信的问题。

起初觉得 EventBus 和 MQ 其实差不多嘛,都是通过异步处理来实现解耦合,高性能。后来看到了下面这张图才算明白为什么要用 EventBus 以及 EventBus 和 MQ 之间的关系,EventBus 是抽象的,可以用MQ来实现 EventBus.

eventbus implement

为什么要使用 EventBus

解耦合(轻松的实现系统间解耦)

高性能可扩展(每一个事件都是简单独立且不可更改的对象,只需要保存新增的事件,不涉及其他的变更删除操作)

系统审计(每一个事件都是不可变更的,每一个事件都是可追溯的)

...

EventBus 整体架构:

IEventBase :所有的事件应该实现这个接口,这个接口定义了事件的唯一id EventId 和事件发生的事件 EventAt

IEventBase

IEventHandler:定义了一个 Handle 方法来处理相应的事件

IEventHandler

IEventStore:所有的事件的处理存储,保存事件的IEventHandler,一般不会直接操作,通过 EventBus 的订阅和取消订阅来操作 EventStore

IEventStore

IEventBus:用来发布/订阅/取消订阅事件,并将事件的某一个 IEventHandler 保存到 EventStore 或从 EventStore 中移除

IEventBus

使用示例

来看一个使用示例,完整代码示例:

internal class EventTest { public static void MainTest() { var eventBus = DependencyResolver.Current.ResolveService<IEventBus>(); eventBus.Subscribe<CounterEvent, CounterEventHandler1>(); eventBus.Subscribe<CounterEvent, CounterEventHandler2>(); eventBus.Subscribe<CounterEvent, DelegateEventHandler<CounterEvent>>(); eventBus.Publish(new CounterEvent { Counter = 1 }); eventBus.Unsubscribe<CounterEvent, CounterEventHandler1>(); eventBus.Unsubscribe<CounterEvent, DelegateEventHandler<CounterEvent>>(); eventBus.Publish(new CounterEvent { Counter = 2 }); } } internal class CounterEvent : EventBase { public int Counter { get; set; } } internal class CounterEventHandler1 : IEventHandler<CounterEvent> { public Task Handle(CounterEvent @event) { LogHelper.GetLogger<CounterEventHandler1>().Info($"Event Info: {@event.ToJson()}, Handler Type:{GetType().FullName}"); return Task.CompletedTask; } } internal class CounterEventHandler2 : IEventHandler<CounterEvent> { public Task Handle(CounterEvent @event) { LogHelper.GetLogger<CounterEventHandler2>().Info($"Event Info: {@event.ToJson()}, Handler Type:{GetType().FullName}"); return Task.CompletedTask; } } 具体实现

EventStoreInMemory 实现:

EventStoreInMemory 是 IEventStore 将数据放在内存中的实现,使用了 ConcurrentDictionary 以及 HashSet 来尽可能的保证高效,具体实现代码如下:

public class EventStoreInMemory : IEventStore { private readonly ConcurrentDictionary<string, HashSet<Type>> _eventHandlers = new ConcurrentDictionary<string, HashSet<Type>>(); public bool AddSubscription<TEvent, TEventHandler>() where TEvent : IEventBase where TEventHandler : IEventHandler<TEvent> { var eventKey = GetEventKey<TEvent>(); if (_eventHandlers.ContainsKey(eventKey)) { return _eventHandlers[eventKey].Add(typeof(TEventHandler)); } else { return _eventHandlers.TryAdd(eventKey, new HashSet<Type>() { typeof(TEventHandler) }); } } public bool Clear() { _eventHandlers.Clear(); return true; } public ICollection<Type> GetEventHandlerTypes<TEvent>() where TEvent : IEventBase { if(_eventHandlers.Count == 0) return new Type[0]; var eventKey = GetEventKey<TEvent>(); if (_eventHandlers.TryGetValue(eventKey, out var handlers)) { return handlers; } return new Type[0]; } public string GetEventKey<TEvent>() { return typeof(TEvent).FullName; } public bool HasSubscriptionsForEvent<TEvent>() where TEvent : IEventBase { if(_eventHandlers.Count == 0) return false; var eventKey = GetEventKey<TEvent>(); return _eventHandlers.ContainsKey(eventKey); } public bool RemoveSubscription<TEvent, TEventHandler>() where TEvent : IEventBase where TEventHandler : IEventHandler<TEvent> { if(_eventHandlers.Count == 0) return false; var eventKey = GetEventKey<TEvent>(); if (_eventHandlers.ContainsKey(eventKey)) { return _eventHandlers[eventKey].Remove(typeof(TEventHandler)); } return false; } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zydxyd.html