如果想要深入了解的可以看一下ScheduleMessageService这个类
延时消息定时投递相关具体实现代码在ScheduleMessageService中,先看下变量定义
delayLevelTable定义了延迟级别和延迟时间的对应关系
offsetTable存放延延迟级别对应的队列消费的offset
ScheduleMessageService.start() 延迟消息投递其中根据,delayLevel获取消费队列id的方法如下,即queueId = delayLevel-1
public static int delayLevel2QueueId(final int delayLevel) { return delayLevel - 1; }核心逻辑就是取出tagCode(延时消息持久化时,tagsCode存储的是消息投递时间),解析成消息投递时间,与当前时间戳做差,判断是否应该进行消息投递,具体进行消息投递的方法,在if (countdown <= 0)中,看下代码
每个扫描任务主要是把队列中所有到期的消息都拿出来,并发送到指定的topic下,并把延迟队列中的消息删除
重新投递实现重新构建投递消息的关键点在于messageTimeup中,其构建了一个新的消息,并从延时消息属性中恢复出了原有的topic,queueId,再调用putMessage重新进行投递。
优点:设计简单,把所有相同延迟时间的消息都先放到一个队列中,定时扫描,可以保证消息消费的有序性
缺点:定时器采用了timer,timer是单线程运行,如果延迟消息数量很大的情况下,可能单线程处理不过来,造成消息到期后也没有发送出去的情况
改进点:可以在每个延迟队列上各采用一个timer,或者使用timer进行扫描,加一个线程池对消息进行处理,这样可以提供效率
基本思路已经介绍完,梳理下延时消息实现思路producer端设置消息delayLevel延迟级别,消息属性DELAY中存储了对应了延时级别
broker端收到消息后,判断延时消息延迟级别,如果大于0,则备份消息原始topic,queueId,并将消息topic改为延时消息队列特定topic(SCHEDULE_TOPIC),queueId改为延时级别-1
mq服务端ScheduleMessageService中,为每一个延迟级别单独设置一个定时器,定时(每隔1秒)拉取对应延迟级别的消费队列
根据消费偏移量offset从commitLog中解析出对应消息
从消息tagsCode中解析出消息应当被投递的时间,与当前时间做比较,判断是否应该进行投递
若到达了投递时间,则构建一个新的消息,并从消息属性中恢复出原始的topic,queueId,并清除消息延迟属性,从新进行消息投递