RocketMQ源码 — 九、 RocketMQ延时消息

上一节消息重试里面提到了重试的消息可以被延时消费,其实除此之外,用户发送的消息也可以指定延时时间(更准确的说是延时等级),然后在指定延时时间之后投递消息,然后被consumer消费。阿里云的ons还支持定时消息,而且延时消息是直接指定延时时间,其实阿里云的延时消息也是定时消息的另一种表述方式,都是通过设置消息被投递的时间来实现的,但是Apache RocketMQ在版本4.2.0中尚不支持指定时间的延时,只能通过配置延时等级和延时等级对应的时间来实现延时。

一个延时消息被发出到消费成功经历以下几个过程:

设置消息的延时级别delayLevel

producer发送消息

broker收到消息在准备将消息写入存储的时候,判断是延时消息则更改Message的topic为延时消息队列的topic,也就是将消息投递到延时消息队列

有定时任务从延时队列中读取消息,拿到消息后判断是否达到延时时间,如果到了则修改topic为原始topic。并将消息投递到原始topic的队列

consumer像消费其他消息一样从broker拉取消息进行消费

注意:批量消息是不支持延时消息的

tips:下文中说到的延时队列可以理解为一个ConsumeQueue

producer发送延时消息

在producer中发送消息的时候,设置Message的delayLevel

// org.apache.rocketmq.common.message.Message public void setDelayTimeLevel(int level) { this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level)); }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wsxgff.html