延迟消息投递机制
RocketMQ在存储延迟消息时,将其保存在一个系统的Topic中,在创建ConsumeQueue时,tagCode字段中保存着延迟消息需要被投递的时间,通过这个存储实现的思路,我们可以总结出延迟消息的投递过程:通过定时服务定时扫描ConsumeQueue,满足投递时间条件的消息再通过
CommitLog将消息重新投递到原始的Topic中,消费者就可以接收消息了。
在存储模块初始化时,初始化延迟消息处理类ScheduleMessageService,通过依次调用start()方法来启动延迟消息定时扫描任务,start()方法核心逻辑如图
核心字段和方法
- timer:定时检查延迟消息是否可以投递的定时器
- delayLevelTable:该字段用于保存全部的延迟级别
- level:延迟级别
- timeDelay:延迟时间
- offset:延迟级别对应的ConsumeQueue的消费位点,扫描时从这个位点开始
- timeDelay:参数表示延迟时间
从代码中的for循环可以知道,每个延迟级别都有一个定时任务进行扫描,每个延迟级别在第一次扫描时会延迟1000ms,再开始执行扫描。随着延迟消息不断被重新投递,内置Topic的全部ConsumeQueue的消费位点offset不断向前推进,也会定时执行ScheduleMessageService.this.persist()方法来持久化消费位点,以便进程重启后从上次开始扫描检查。
this.timer.schedule()定时任务只执行一次,那么之后发送的消息是如何进行投递的呢?
在DeliverDelayedMessageTimeTask.executeOnTimeup()方法中,DeliverDelayedMessageTimerTask类是ScheduleMessageService类的一个内部类,同时也是this.timer.schedule()方法的输入参数
核心属性和方法
-
delayLevel:延迟级别。
-
offset:待检查消息的ConsumeQueue的位点值
-
correctDeliverTimestamp():纠正投递时间
-
executeOnTimeup():定时扫描核心方法
DeliverDelayedMessageTimerTask默认执行run()方法,run()方法直接调用executeOnTimeup()方法扫描当前位点的消息是否满足投递条件
核心方法的执行步骤
第一步:查找Consume Queue.
其中涉及到了queueId2DelayLevel()和delayLevel2QueueId(),RocketMQ设计的延迟级别和延迟Topic的queueId有关系,可以进行互相转化
第二步:找到投递时间。
真正的投递时间deliverTimestamp被存储在
ConsumeQueue的tagCode中,所以我们可以通过offset查找ConsumeQueue中保存的deliverTimestamp,再通过调用correctDeliverTimestamp()计算当前消息的真正投递时间deliverTimestamp
第三步:如果满足投递时间条件,则重新发送消息到原始Topic中
在重新投递前调用messageTimeup()方法,将消息的原始Topic、
queueId、tagCode等还原,清除扩展字段中延迟消息的标志
(MessageConstant.PROPERTY_DELAY_TIME_LEVEL),然后被重新
投递、更新消费位点。重新投递后,消息会正常创建Consume Queue索引、IndexFile索引,然后被消费者拉取消费,达到定时消费的目的。