ASP.NET Core Web API下事件驱动型架构的实现(一):一个简单的实现

很长一段时间以来,我都在思考如何在ASP.NET Core的框架下,实现一套完整的事件驱动型架构。这个问题看上去有点大,其实主要目标是为了实现一个基于ASP.NET Core的微服务,它能够非常简单地订阅来自于某个渠道的事件消息,并对接收到的消息进行处理,于此同时,它还能够向该渠道发送事件消息,以便订阅该事件消息的消费者能够对消息数据做进一步处理。让我们回顾一下微服务之间通信的几种方式,分为同步和异步两种。同步通信最常见的就是RESTful API,而且非常简单轻量,一个Request/Response回环就结束了;异步通信最常见的就是通过消息渠道,将载有特殊意义的数据的事件消息发送到消息渠道,而对某种类型消息感兴趣的消费者,就可以获取消息中所带信息并执行相应操作,这也是我们比较熟知的事件驱动架构的一种表现形式。虽然事件驱动型架构看起来非常复杂,从微服务的实现来看显得有些繁重,但它的应用范围确实很广,也为服务间通信提供了新的思路。了解DDD的朋友相信一定知道CQRS体系结构模式,它就是一种事件驱动型架构。事实上,实现一套完整的、安全的、稳定的、正确的事件驱动架构并不简单,由于异步特性带来的一致性问题会非常棘手,甚至需要借助一些基础结构层工具(比如关系型数据库,不错!只能是关系型数据库)来解决一些特殊问题。本文就打算带领大家一起探探路,基于ASP.NET Core Web API实现一个相对比较简单的事件驱动架构,然后引出一些有待深入思考的问题,留在今后的文章中继续讨论。或许,本文所引入的源代码无法直接用于生产环境,但我希望本文介绍的内容能够给到读者一些启发,并能够帮助解决实际中遇到的问题。

术语约定

本文会涉及一些相关的专业术语,在此先作约定:

事件:在某一特定时刻发生在某件事物上的一件事情,例如:在我撰写本文的时候,电话铃响了

消息:承载事件数据的实体。事件的序列化/反序列化和传输都以消息的形式进行

消息通信渠道:一种带有消息路由功能的数据传输机制,用以在消息的派发器和订阅器之间进行数据传输

注意:为了迎合描述的需要,在下文中可能会混用事件和消息两个概念。

一个简单的设计

先从简单的设计开始,基本上事件驱动型架构会有事件消息(Events)、事件订阅器(Event Subscriber)、事件派发器(Event Publisher)、事件处理器(Event Handler)以及事件总线(Event Bus)等主要组件,它们之间的关系大致如下:

class_diagram

首先,IEvent接口定义了事件消息(更确切地说,数据)的基本结构,几乎所有的事件都会有一个唯一标识符(Id)和一个事件发生的时间(Timestamp),这个时间通常使用UTC时间作为标准。IEventHandler定义了事件处理器接口,显而易见,它包含两个方法:CanHandle方法,用以确定传入的事件对象是否可被当前处理器所处理,以及Handle方法,它定义了事件的处理过程。IEvent和IEventHandler构成了事件处理的基本元素。

然后就是IEventSubscriber与IEventPublisher接口。前者表示实现该接口的类型为事件订阅器,它负责事件处理器的注册,并侦听来自事件通信渠道上的消息,一旦所获得的消息能够被某个处理器处理,它就会指派该处理器对接收到的消息进行处理。因此,IEventSubscriber会保持着对事件处理器的引用;而对于实现了IEventPublisher接口的事件派发器而言,它的主要任务就是将事件消息发送到消息通信渠道,以便订阅端能够获得消息并进行处理。

IEventBus接口表示消息通信渠道,也就是大家所熟知的消息总线的概念。它不仅具有消息订阅的功能,而且还具有消息派发的能力,因此,它会同时继承于IEventSubscriber和IEventPublisher接口。在上面的设计中,通过接口分离消息总线的订阅器和派发器的角色是很有必要的,因为两种角色的各自职责不一样,这样的设计同时满足SOLID中的SRP和ISP两个准则。

基于以上基础模型,我们可以很快地将这个对象关系模型转换为C#代码:

public interface IEvent { Guid Id { get; } DateTime Timestamp { get; } } public interface IEventHandler { Task<bool> HandleAsync(IEvent @event, CancellationToken cancellationToken = default); bool CanHandle(IEvent @event); } public interface IEventHandler<in T> : IEventHandler where T : IEvent { Task<bool> HandleAsync(T @event, CancellationToken cancellationToken = default); } public interface IEventPublisher : IDisposable { Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default) where TEvent : IEvent; } public interface IEventSubscriber : IDisposable { void Subscribe(); } public interface IEventBus : IEventPublisher, IEventSubscriber { }

短短30行代码,就把我们的基本对象关系描述清楚了。对于上面的代码我们需要注意以下几点:

这段代码使用了C# 7.1的新特性(default关键字)

Publish以及Handle方法被替换为支持异步调用的PublishAsync和HandleAsync方法,它们会返回Task对象,这样可以方便使用C#中async/await的编程模型

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wsppwj.html