Alibaba-技术专区-RocketMQ 延迟消息实现原理和源码分析 (2)

如果想要深入了解的可以看一下ScheduleMessageService这个类

Alibaba-技术专区-RocketMQ 延迟消息实现原理和源码分析

内部变量含义

延时消息定时投递相关具体实现代码在ScheduleMessageService中,先看下变量定义

Alibaba-技术专区-RocketMQ 延迟消息实现原理和源码分析

delayLevelTable定义了延迟级别和延迟时间的对应关系

offsetTable存放延延迟级别对应的队列消费的offset

ScheduleMessageService.start()

Alibaba-技术专区-RocketMQ 延迟消息实现原理和源码分析

延迟消息投递

Alibaba-技术专区-RocketMQ 延迟消息实现原理和源码分析

其中根据,delayLevel获取消费队列id的方法如下,即queueId = delayLevel-1

public static int delayLevel2QueueId(final int delayLevel) { return delayLevel - 1; }

Alibaba-技术专区-RocketMQ 延迟消息实现原理和源码分析

核心逻辑就是取出tagCode(延时消息持久化时,tagsCode存储的是消息投递时间),解析成消息投递时间,与当前时间戳做差,判断是否应该进行消息投递,具体进行消息投递的方法,在if (countdown <= 0)中,看下代码

Alibaba-技术专区-RocketMQ 延迟消息实现原理和源码分析

每个扫描任务主要是把队列中所有到期的消息都拿出来,并发送到指定的topic下,并把延迟队列中的消息删除

重新投递实现

重新构建投递消息的关键点在于messageTimeup中,其构建了一个新的消息,并从延时消息属性中恢复出了原有的topic,queueId,再调用putMessage重新进行投递。

Alibaba-技术专区-RocketMQ 延迟消息实现原理和源码分析

总结

优点:设计简单,把所有相同延迟时间的消息都先放到一个队列中,定时扫描,可以保证消息消费的有序性

缺点:定时器采用了timer,timer是单线程运行,如果延迟消息数量很大的情况下,可能单线程处理不过来,造成消息到期后也没有发送出去的情况

改进点:可以在每个延迟队列上各采用一个timer,或者使用timer进行扫描,加一个线程池对消息进行处理,这样可以提供效率

基本思路已经介绍完,梳理下延时消息实现思路

producer端设置消息delayLevel延迟级别,消息属性DELAY中存储了对应了延时级别

broker端收到消息后,判断延时消息延迟级别,如果大于0,则备份消息原始topic,queueId,并将消息topic改为延时消息队列特定topic(SCHEDULE_TOPIC),queueId改为延时级别-1

mq服务端ScheduleMessageService中,为每一个延迟级别单独设置一个定时器,定时(每隔1秒)拉取对应延迟级别的消费队列

根据消费偏移量offset从commitLog中解析出对应消息

从消息tagsCode中解析出消息应当被投递的时间,与当前时间做比较,判断是否应该进行投递

若到达了投递时间,则构建一个新的消息,并从消息属性中恢复出原始的topic,queueId,并清除消息延迟属性,从新进行消息投递

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zysygj.html