配置队列管理随程序启动停止
private MQServcieManager serviceManager; // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory factory, IApplicationLifetime lifeTime) { serviceManager = new MQServcieManager(this.Configuration, factory.CreateLogger<MQServcieManager>()); lifeTime.ApplicationStarted.Register(() => { serviceManager.Start(); }); lifeTime.ApplicationStopping.Register(() => { serviceManager.Stop(); }); ... }实现消费队列
public class CdlxConsumerService : MQServiceBase { public override string vHost { get { return "test_mq"; } } public override string Exchange { get { return "cdlx-Exchange"; } } private string queue = "cdlx-Consumer"; private string routeKey = "all"; private List<BindInfo> bs = new List<BindInfo>(); public override List<BindInfo> Binds { get { return bs; } } public CdlxConsumerService(IConfiguration cfg, ILogger logger) : base(cfg, logger) { this.bs.Add(new BindInfo { ExchangeType = ExchangeType.Direct, Queue = this.queue, RouterKey = this.routeKey, OnReceived = this.OnReceived }); } private void OnReceived(MessageBody body) { var message = JsonConvert.DeserializeObject<CdlxMessage>(body.Content); Console.WriteLine("类型:{0}\t 内容:{1}\t进入时间:{2}\t过期时间:{3}", message.Type, message.Data, message.CreateTime, message.CreateTime.AddSeconds(message.Expire)); body.Consumer.Model.BasicAck(body.BasicDeliver.DeliveryTag, true); } }上面的代码,模拟了最终业务处理的过程,这里仅仅是简单演示,所以只是将消息打印到屏幕上;在实际的业务场景中,我们可以根据不同的 MessageType 进行消息的分发处理。
5. 消费过程演示为了比较直观的观看死信消费过程,我们编写一个简单的列表页面,自动刷新后去消费死信队列,然后将消息输出到页面上,通过观察此页面,我们可以实时了解到死信队列的消费过程,实际的业务场景中,大家可以利用第三方定时器定时调用接口实现,或者使用内置的轻量主机做后台任务实现定时轮询,具体参考 Asp.Net Core 轻松学-基于微服务的后台任务调度管理器
5.1 发布消息浏览器访问本机地址::5000/home/publish
下面将发布 30 条信息到 DLX 中,每个业务各 10 条信息。
通常情况下,红包的过期时间最短且超时时间一致,应该最快超时,意味着当第一条红包消息超时的时候,其余 9 条红包消息也会一并超时,但是由于红包消息混合的发布在队列中,且只有第一条红包消息位移队列顶部;所以,当第一条红包消息超时被消费后,其余 9 条红包由于不是位于队列顶部,虽然此时他们已经超时,但是 DLX 将无法处理;当我们使用 cdlx-timer(定时器)模拟调用 CdlxTimerService 的时候(也就是刷新首页), CdlxTimerService 服务将会对 DLX 进行检查。
查看消费状态
通过上图的观察得知,红色部分首先位于消息顶部被消费,然后就无法进行超时判断,接下来,由于使用了定时轮询,使得绿色部分消息得以浮动到消息顶部,然后被 DLX 进行处理后消费。
5.2 定时器检查死信队列浏览器访问本机地址::5000/home
上图的每一次刷新,都是对 DLX 的一次轮询检查,随着轮询的深入,所有处于队列中不同位置的超时消息都有机会浮动到队列顶部进行消费处理。
结束语业务的发展促进了架构的演进,每一个需求升级的背后,是程序员深深的思考;本文从 CDLX 的需求出发,充分利用了 RabbitMQ DLX 对消息检查的特性,实现了对复合业务的集中处理。
演示代码下载https://github.com/lianggx/Examples/tree/master/RabbitMQ.CDLX