在业务开发过程中,我们常常需要做一些定时任务,这些任务一般用来做监控或者清理任务,比如在订单的业务场景中,用户在创建订单后一段时间内,没有完成支付,系统将自动取消该订单,并将库存返回到商品中,又比如在微信中,用户发出红包24小时后,需要对红包进行检查,是否已领取完成,如未领取完成,将剩余金额退回到发送者钱包中,同时销毁该红包。
在项目初始阶段,或者是一些小型的项目中,常常采用定时轮询的方法进行检查,但是我们都知道,定时轮询将给数据库带来不小的压力,而且定时间隔无法进行动态调整,特别是一个系统中,同时存在好几个定时器的时候,就显得非常的麻烦,同时给数据库造成巨大的访问压力。
下面,本文将演示如何使用一个 RabbitMQ 的死信队列同时监控多种业务(复合业务),达到模块解耦,释放压力的目的。
注意:名词“复合死信”是为了叙述方便临时创造的,如有不妥,欢迎指正
1. 什么是 RabbitMQ 死信队列DLX(Dead Letter Exchanges)死信交换,死信队列本身也是一个普通的消息队列,在创建队列的时候,通过设置一些关键参数,可以将一个普通的消息队列设置为死信队列,与其它消息队列不同的是,其入栈的消息根据入栈时指定的过期时间/被拒绝/超出队列长度被移除,依次被转发到指定的消息队列中进行二次处理。这样说法比较拗口,其原理就是死信队列内位于顶部的消息过期时,该消息将被马上发送到另外一个订阅者(消息队列)中。
其原理入下图
由上图可以看到,目前有三种类型的业务需要使用 DLX 进行处理,因为每个业务的超时时间不一致的问题,如果将他们都放入一个 DLX 中进行处理,将会出现一个时序的问题,即消息队列总数处理顶部的消息,如果顶部的消息未过期,而底部的消息过期,这就麻烦了,因为过期的消息无法得到消费,将会造成延迟;所以正常情况下,最好的办法是每个业务都独立一个队列,这样就可以保证,即将过期的消息总是处于队列的顶部,从而被第一时间处理。
但是多个 DLX 又带来了管理上面的问题,随着业务的增加,越来越多的业务需要进入不同的 DLX ,这个时候我们发现,由于人手不足的原因,维护这么多 DLX 实在是太吃力了,如果能将这些消息都接入一个 DLX 中该多好呀,在一个 DLX 中进行消息订阅,然后进行分发或者处理,这就非常有趣了。
下面就按照这个思路,我们进行集中处理,也就是复合死信交换 CDLX(Composite Dead Letter Exchanges)
2. 如何创建死信队列创建 DLX 队列的方式非常简单,我们使用 RabbitMQ Web 控制面板进行创建 Exhcange(交换机)/Consumer(死信消费队列)/cdlx(复合死信队列)
2.1 创建队列创建交换机 cdlx-Exchange
死信消费队列 cdlx-Consumer
复合死信队列 cdlx-Master
注意,这里添加死信队列必须同时设置死信转发交换机和路由,后续通过路由绑定实现消费队列
路由绑定
上面的路由绑定共有两个,分别是 Master 和 Consumer 用于消息路由到队列,为下面的业务消息做准备,建好后的队列如下
3.复合业务进入死信队列当建立好队列以后,我们就可以专心的处理业务了,下面就来模拟3种业务将消息发送到死信队列的过程
3.1 发送死信消息到队列发送消息使用了 Asp.NetCore轻松学-实现一个轻量级高可复用的RabbitMQ客户端 中的轻量客户端,封装后的发送消息代码如下
public class CdlxMasterService { private IConfiguration cfg = null; private ILogger logger = null; private string vhost = "test_mq"; private string exchange = "cdlx-Exchange"; private string routekey = "master"; private static MQConnection connection = null; private MQConnection Connection { get { if (connection == null || !connection.Connection.IsOpen) { connection = new MQConnection( cfg["rabbitmq:username"], cfg["rabbitmq:password"], cfg["rabbitmq:host"], Convert.ToInt32(cfg["rabbitmq:port"]), vhost, logger); } return connection; } } private static IModel channel = null; private IModel Channel { get { if (channel == null || channel.IsClosed) channel = Connection.Connection.CreateModel(); return channel; } } public void SendMessage(object data) { string message = JsonConvert.SerializeObject(data); this.Connection.Publish(this.Channel, exchange, routekey, message); } } 3.2 将 CdlxMasterService 注入到服务 public void ConfigureServices(IServiceCollection services) { services.AddSingleton<CdlxMasterService>(); ... } 3.3 模拟3种业务生产死信消息 public class HomeController : Controller { private CdlxMasterService masterService; public HomeController(CdlxMasterService masterService) { this.masterService = masterService; } [HttpGet("publish")] public int Publish() { Contract contract = new Contract(this.masterService); for (int i = 0; i < 10; i++) { contract.Publish(MessageType.RedPackage, "红包信息,超时时间1024s"); contract.Publish(MessageType.Order, "订单信息,超时时间2048s"); contract.Publish(MessageType.Vote, "投票信息,超时时间4096s"); } return 0; } }