好了,事件处理器执行上下文就定义好了,接下来就是在我们的ASP.NET Core Web API中使用。为了使用IEventHandlerExecutionContext,我们需要修改事件订阅器的接口定义,并相应地修改PassThroughEventBus以及Startup.cs。代码如下:
// IEventSubscriber public interface IEventSubscriber : IDisposable { void Subscribe<TEvent, TEventHandler>() where TEvent : IEvent where TEventHandler : IEventHandler<TEvent>; } // PassThroughEventBus public sealed class PassThroughEventBus : IEventBus { private readonly EventQueue eventQueue = new EventQueue(); private readonly ILogger logger; private readonly IEventHandlerExecutionContext context; public PassThroughEventBus(IEventHandlerExecutionContext context, ILogger<PassThroughEventBus> logger) { this.context = context; this.logger = logger; logger.LogInformation($"PassThroughEventBus构造函数调用完成。Hash Code:{this.GetHashCode()}."); eventQueue.EventPushed += EventQueue_EventPushed; } private async void EventQueue_EventPushed(object sender, EventProcessedEventArgs e) => await this.context.HandleEventAsync(e.Event); public Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default) where TEvent : IEvent => Task.Factory.StartNew(() => eventQueue.Push(@event)); public void Subscribe<TEvent, TEventHandler>() where TEvent : IEvent where TEventHandler : IEventHandler<TEvent> { if (!this.context.HandlerRegistered<TEvent, TEventHandler>()) { this.context.RegisterHandler<TEvent, TEventHandler>(); } } #region IDisposable Support private bool disposedValue = false; // To detect redundant calls void Dispose(bool disposing) { if (!disposedValue) { if (disposing) { this.eventQueue.EventPushed -= EventQueue_EventPushed; logger.LogInformation($"PassThroughEventBus已经被Dispose。Hash Code:{this.GetHashCode()}."); } disposedValue = true; } } public void Dispose() => Dispose(true); #endregion } // Startup.cs public void ConfigureServices(IServiceCollection services) { this.logger.LogInformation("正在对服务进行配置..."); services.AddMvc(); services.AddTransient<IEventStore>(serviceProvider => new DapperEventStore(Configuration["mssql:connectionString"], serviceProvider.GetRequiredService<ILogger<DapperEventStore>>())); var eventHandlerExecutionContext = new EventHandlerExecutionContext(services, sc => sc.BuildServiceProvider()); services.AddSingleton<IEventHandlerExecutionContext>(eventHandlerExecutionContext); services.AddSingleton<IEventBus, PassThroughEventBus>(); this.logger.LogInformation("服务配置完成,已注册到IoC容器!"); } // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. public void Configure(IApplicationBuilder app, IHostingEnvironment env) { var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>(); eventBus.Subscribe<CustomerCreatedEvent, CustomerCreatedEventHandler>(); if (env.IsDevelopment()) { app.UseDeveloperExceptionPage(); } app.UseMvc(); }代码修改完成后,再次执行Web API,并发送两次(或多次)创建客户的请求,然后查看日志,我们发现,每次请求都会使用新的事件处理器去处理接收到的消息,在保存消息数据时,会使用新的DapperEventStore来保存数据,而保存完成后,会及时将DapperEventStore dispose掉:
小结本文篇幅比较长,或许你没有太多耐心将文章读完。但我尽量将问题分析清楚,希望提供给读者的内容是详细的、有理有据的。文章中黑体部分是在设计过程中的一些思考和需要注意的地方,希望能够给读者在工作和学习之中带来启发和收获。总而言之,对象生命周期的管理,在服务端应用程序中是非常重要的,需要引起足够的重视。在下文中,我们打算逐步摆脱PassThroughEventBus,基于RabbitMQ来实现消息总线的基础结构。
源代码的使用