ASP.NET Core Web API下事件驱动型架构的实现(一):一个简单的实现 (2)

由于我们的这个模型可以作为实现消息系统的通用模型,并且会需要用到ASP.NET Core的项目中,因此,建议将这些接口的定义放在一个独立的NetStandard的Class Library中,方便今后重用和扩展

OK,接口定义好了。实现呢?下面,我们实现一个非常简单的消息总线:PassThroughEventBus。在今后的文章中,我还会介绍如何基于RabbitMQ和Azure Service Bus实现不一样的消息总线。

PassThroughEventBus

顾名思义,PassThroughEventBus表示当有消息被派发到消息总线时,消息总线将不做任何处理与路由,而是直接将消息推送到订阅方。在订阅方的事件监听函数中,会通过已经注册的事件处理器对接收到的消息进行处理。整个过程并不会依赖于任何外部组件,不需要引用额外的开发库,只是利用现有的.NET数据结构来模拟消息的派发和订阅过程。因此,PassThroughEventBus不具备容错和消息重发功能,不具备消息存储和路由功能,我们先实现这样一个简单的消息总线,来体验事件驱动型架构的设计过程。

我们可以使用.NET中的Queue或者ConcurrentQueue等基本数据结构来作为消息队列的实现,与这些基本的数据结构相比,消息队列本身有它自己的职责,它需要在消息被推送进队列的同时通知调用方。当然,PassThroughEventBus不需要依赖于Queue或者ConcurrentQueue,它所要做的事情就是模拟一个消息队列,当消息推送进来的时候,立刻通知订阅方进行处理。同样,为了分离职责,我们可以引入一个EventQueue的实现(如下),从而将消息推送和路由的职责(基础结构层的职责)从消息总线中分离出来。

internal sealed class EventQueue { public event System.EventHandler<EventProcessedEventArgs> EventPushed; public EventQueue() { } public void Push(IEvent @event) { OnMessagePushed(new EventProcessedEventArgs(@event)); } private void OnMessagePushed(EventProcessedEventArgs e) => this.EventPushed?.Invoke(this, e); }

EventQueue中最主要的方法就是Push方法,从上面的代码可以看到,当EventQueue的Push方法被调用时,它将立刻触发EventPushed事件,它是一个.NET事件,用以通知EventQueue对象的订阅者,消息已经被派发。整个EventQueue的实现非常简单,我们仅专注于事件的路由,完全没有考虑任何额外的事情。

接下来,就是利用EventQueue来实现PassThroughEventBus。毫无悬念,PassThroughEventBus需要实现IEventBus接口,它的两个基本操作分别是Publish和Subscribe。在Publish方法中,会将传入的事件消息转发到EventQueue上,而Subscribe方法则会订阅EventQueue.EventPushed事件(.NET事件),而在EventPushed事件处理过程中,会从所有已注册的事件处理器(Event Handlers)中找到能够处理所接收到的事件,并对其进行处理。整个流程还是非常清晰的。以下便是PassThroughEventBus的实现代码:

public sealed class PassThroughEventBus : IEventBus { private readonly EventQueue eventQueue = new EventQueue(); private readonly IEnumerable<IEventHandler> eventHandlers; public PassThroughEventBus(IEnumerable<IEventHandler> eventHandlers) { this.eventHandlers = eventHandlers; } private void EventQueue_EventPushed(object sender, EventProcessedEventArgs e) => (from eh in this.eventHandlers where eh.CanHandle(e.Event) select eh).ToList().ForEach(async eh => await eh.HandleAsync(e.Event)); public Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default) where TEvent : IEvent => Task.Factory.StartNew(() => eventQueue.Push(@event)); public void Subscribe() => eventQueue.EventPushed += EventQueue_EventPushed; #region IDisposable Support private bool disposedValue = false; // To detect redundant calls void Dispose(bool disposing) { if (!disposedValue) { if (disposing) { this.eventQueue.EventPushed -= EventQueue_EventPushed; } disposedValue = true; } } public void Dispose() => Dispose(true); #endregion }

实现过程非常简单,当然,从这些代码也可以更清楚地了解到,PassThroughEventBus不做任何路由处理,更不会依赖于一个基础结构设施(比如实现了AMQP的消息队列),因此,不要指望能够在生产环境中使用它。不过,目前来看,它对于我们接下来要讨论的事情还是会很有帮助的,至少在我们引入基于RabbitMQ等实现的消息总线之前。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wsppwj.html