RabbitMQ死信队列另类用法之复合死信 (2)

上面的接口 puhlish 模拟了业务消息,由于我们依次发布了 红包/订单/投票 消息,所以迭代发布 10 次后,正好形成了一个时序错乱的信息队列,按照自动过期时序计算,当第一个红包超时到达时,第四条消息(红包)也会接着超时,可是由于此时订单和投票消息位于红包消息上面,该红包消息在达到超时时间后并不会被投递到 Consumer 消费队列,这是正确的,我们确实也是希望是这个结果

如果有一个办法把超时的消息自动将其提升到队列顶部就好了!

4. 处理复合死信

在 RabbitMQ 提供的 API 接口中,没有什么直接可用的能将死信队列中超时消息提升到顶部的好办法;但是,我们可以利用部分 API 接口的特性来完成这件事情。

4.1 定时消费客户端

下面,我们将使用一个定时消费客户端来完成对死信队列的轮询,充分利用 RabbitMQ 的消费特性来完成超时消息的位置提升。

过程如下图:

RabbitMQ死信队列另类用法之复合死信

如上图所示,我们增加一个 dlx-timer 定时器,定时的发起对死信队列的消费,该消费者仅仅是消费,不确认消息,也就是不做 ack,然后将消息重新置入队列中;这个过程,就是将消息不断提升位置的过程。

4.2 定时消费客户端实现代码 public class CdlxTimerService : MQServiceBase { public override string vHost { get { return "test_mq"; } } public override string Exchange { get { return "cdlx-Exchange"; } } public override List<BindInfo> Binds => new List<BindInfo>(); private string queue = "cdlx-Master"; public CdlxTimerService(IConfiguration cfg, ILogger logger) : base(cfg, logger) { } /// <summary> /// 检查死信队列 /// </summary> /// <returns></returns> public List<CdlxMessage> CheckMessage() { long total = 0; List<CdlxMessage> list = new List<CdlxMessage>(); var connection = base.CreateConnection(); using (IModel channel = connection.Connection.CreateModel()) { bool latest = true; while (latest) { BasicGetResult result = channel.BasicGet(this.queue, false); total++; latest = result != null; if (latest) { var json = Encoding.UTF8.GetString(result.Body); list.Add(JsonConvert.DeserializeObject<CdlxMessage>(json)); } } channel.Close(); connection.Close(); } return list; } }

上面的代码首先在定时调用到来的时候,创建了一个 Connection,然后利用此 Connection 创建了了一个 Channel,紧接着,使用该 Channel 调用 BasicGet 方法,获得队列顶部的信息,且设置 autoAck=false,表示仅检查消息,不确认,然后进入一个 while 迭代过程,一直读取到队列底部,获得所有队列中的信息,最后,关闭了通道释放连接。

这样,就完成了一次消息检查的过程,在调用 BasicGet 后,下一条信息将会出现在队列的顶部,同步,队列将自动对该消息进行超时检查,由于我们在调用 BasicGet 的时候,传入 autoAck=false,不确认该消息,在 RabbitMQ 控制台中,将显示为 unacted,所以在释放连接后,所有消息将会被重新置入队列中,这是一个自动的过程,无需我们做额外的工作。

4.3 Consumer(死信消费队列)最终处理业务

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpjffd.html