MASA Framework - EventBus设计 (2)

MediatR 支持接口的方式去扫描事件订阅关系,举个例子:IRequestHandler<,>

public class PingHandler : IRequestHandler<Ping, string> { public Task<string> Handle(Ping request, CancellationToken cancellationToken) { return Task.FromResult("Pong"); } }

如果你的代码洁癖程度没有高的离谱,或许你希望是这样

public class NetHandler : IRequestHandler<Ping, string>, IRequestHandler<Telnet, string> { public Task<string> Handle(Ping request, CancellationToken cancellationToken) { return Task.FromResult("Pong"); } public Task<string> Handle(Telnet request, CancellationToken cancellationToken) { return Task.FromResult("Success"); } }

看着好像还行?如果很多呢?

那有没有办法解决这个问题?

特性!我们来看个例子

public class NetHandler { [EventHandler] public Task PingAsync(PingEvent @event) { //TODO } [EventHandler] public Task TelnetAsync(TelnetEvent @event) { //TODO } }

似乎我们找到了一个出路

多订阅者保序执行

通过事件层层推进确实可以满足顺序执行的场景,但如果你被大量无限套娃的事件包围的时候或许你需要另外一个出路,看下例子:

public class NetHandler { [EventHandler(0)] public Task PingAsync(PingEvent @event) { //TODO } [EventHandler(1)] public Task LogAsync(PingEvent @event) { //TODO } }

只要参数是同一个 Event 就会按照 EventHandler 的 Order 顺序执行。

Saga

那执行失败了怎么办,如果两个方法因为其中一个需要调用远程服务而无法跟本地事务结合,能帮我回滚吗?

来吧,SAGA 走起,帮你再做个取消动作,同时还支持重试机制,以及是否忽略当前步骤的取消动作。

我们先来预设一下场景:

调用 CheckBalanceAsync 来检查余额

调用 WithdrawAsync, 抛出 exception

重试 WithdrawAsync 3 次

调用 CancelWithdrawAsync

代码如下:

public class TransferHandler { [EventHandler(1)] public Task CheckBalanceAsync(TransferEvent @event) { //TODO } [EventHandler(2, FailureLevels.ThrowAndCancel, enableRetry: true, retryTimes: 3)] public Task WithdrawAsync(TransferEvent @event) { //TODO throw new Exception(); } [EventHandler(2, FailureLevels.Ignore, enableRetry: false, isCancel: true)] public Task CancelWithdrawAsync(TransferEvent @event) { //TODO } } AOP

举个业务场景,给所有 Command 在执行前增加一个参数验证

我们提供了 Middleware,允许像俄罗斯套娃一样(.Net Middleware)做横切关注点的相关的事情

public class LoggingMiddleware<TEvent> : IMiddleware<TEvent> where TEvent : notnull, IEvent { private readonly ILogger<LoggingMiddleware<TEvent>> _logger; public LoggingMiddleware(ILogger<LoggingMiddleware<TEvent>> logger) => _logger = logger; public async Task HandleAsync(TEvent @event, EventHandlerDelegate next) { _logger.LogInformation("----- Handling command {EventName} ({@Event})", typeof(TEvent).FullName, @event); await next(); } }

注册 DI

builder.Services.AddTransient(typeof(IMiddleware<>), typeof(LoggingMiddleware<>)) MASA EventBus 完整功能列表

接收事件

维护订阅关系 - 接口

维护订阅关系 - 特性

多订阅者顺序执行

转发事件

Saga

AOP

UoW

自动开启和关闭事务

Integration Event Bus

用于跨服务的 Event Bus,支持最终一致性,本地消息表

Pub/Sub

提供了 Pub Sub 接口,并基于 Dapr Pub/Sub 提供默认实现

本地消息表

提供了本地消息保存和 UoW 联动接口,并基于 EF Core 提供默认实现

使用方法

启用 Dapr Event Bus

builder.Services .AddDaprEventBus<IntegrationEventLogService>(options=> { options.UseUoW<CatalogDbContext>(dbOptions => dbOptions.UseSqlServer("server=localhost;uid=sa;pwd=Password;database=test")) .UseEventLog<CatalogDbContext>(); ) });

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zgyfzs.html