消息中间件之RocketMQ源码分析(二十九)

延迟消息投递机制

RocketMQ在存储延迟消息时,将其保存在一个系统的Topic中,在创建ConsumeQueue时,tagCode字段中保存着延迟消息需要被投递的时间,通过这个存储实现的思路,我们可以总结出延迟消息的投递过程:通过定时服务定时扫描ConsumeQueue,满足投递时间条件的消息再通过
CommitLog将消息重新投递到原始的Topic中,消费者就可以接收消息了。

在存储模块初始化时,初始化延迟消息处理类ScheduleMessageService,通过依次调用start()方法来启动延迟消息定时扫描任务,start()方法核心逻辑如图
在这里插入图片描述

核心字段和方法

  • timer:定时检查延迟消息是否可以投递的定时器
  • delayLevelTable:该字段用于保存全部的延迟级别
  • level:延迟级别
  • timeDelay:延迟时间
  • offset:延迟级别对应的ConsumeQueue的消费位点,扫描时从这个位点开始
  • timeDelay:参数表示延迟时间

从代码中的for循环可以知道,每个延迟级别都有一个定时任务进行扫描,每个延迟级别在第一次扫描时会延迟1000ms,再开始执行扫描。随着延迟消息不断被重新投递,内置Topic的全部ConsumeQueue的消费位点offset不断向前推进,也会定时执行ScheduleMessageService.this.persist()方法来持久化消费位点,以便进程重启后从上次开始扫描检查。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

this.timer.schedule()定时任务只执行一次,那么之后发送的消息是如何进行投递的呢?

在DeliverDelayedMessageTimeTask.executeOnTimeup()方法中,DeliverDelayedMessageTimerTask类是ScheduleMessageService类的一个内部类,同时也是this.timer.schedule()方法的输入参数

核心属性和方法

  • delayLevel:延迟级别。

  • offset:待检查消息的ConsumeQueue的位点值

  • correctDeliverTimestamp():纠正投递时间
    在这里插入图片描述

  • executeOnTimeup():定时扫描核心方法
    DeliverDelayedMessageTimerTask默认执行run()方法,run()方法直接调用executeOnTimeup()方法扫描当前位点的消息是否满足投递条件

核心方法的执行步骤

第一步:查找Consume Queue.

其中涉及到了queueId2DelayLevel()和delayLevel2QueueId(),RocketMQ设计的延迟级别和延迟Topic的queueId有关系,可以进行互相转化

第二步:找到投递时间。

真正的投递时间deliverTimestamp被存储在
ConsumeQueue的tagCode中,所以我们可以通过offset查找ConsumeQueue中保存的deliverTimestamp,再通过调用correctDeliverTimestamp()计算当前消息的真正投递时间deliverTimestamp

第三步:如果满足投递时间条件,则重新发送消息到原始Topic中

在重新投递前调用messageTimeup()方法,将消息的原始Topic、
queueId、tagCode等还原,清除扩展字段中延迟消息的标志
(MessageConstant.PROPERTY_DELAY_TIME_LEVEL),然后被重新
投递、更新消费位点。重新投递后,消息会正常创建Consume Queue索引、IndexFile索引,然后被消费者拉取消费,达到定时消费的目的。

第四步:如果第三步投递失败,或者消息没有达到投递时间条件,则重新提交一个定时任务到timer中,以供下次检查

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/268809.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++入门07 数组、指针与字符串

图源:文心一言 听课笔记简单整理,供小伙伴们参考~🥝🥝 第1版:听课的记录代码~🧩🧩 编辑:梅头脑🌸 审核:文心一言 目录 🐳课程来源 &#x1…

逆序遍历字符串(不改变内存地址)

题目:逆序遍历字符串"ABCDEFG" 实现思路: 使用StringBuilder创建对象,因为String字符串是不可变的,而StringBuilder内部的方法没有被final关键字修饰,所以将s1的字符串内容传给StringBuilder创建的对象ret…

模拟算法题练习(一)(扫雷,灌溉,回文日期)

目录 模拟算法介绍: (一、扫雷) (二、灌溉) (三、回文日期) 有一说一这题大佬的题解是真的强 模拟算法介绍: 模拟算法通过模拟实际情况来解决问题,一般容易理解但是实…

c语言游戏实战(10):坤坤的篮球回避秀

前言: 这款简易版的球球大作战是博主耗时两天半完成的,玩家需要控制坤坤在游戏界面上移动,来躲避游戏界面上方不断掉下来的篮球。本游戏使用C语言和easyx图形库编写,旨在帮助初学者了解游戏开发的基本概念和技巧。 在开始编写代…

SpringMVC01、回顾MVC

1、回顾MVC 1.1、什么是MVC MVC是模型(Model)、视图(View)、控制器(Controller)的简写,是一种软件设计规范。是将业务逻辑、数据、显示分离的方法来组织代码。MVC主要作用是降低了视图与业务逻辑间的双向偶合。MVC不是一种设计模式,MVC是一种架构模式。…

Node.js中的并发和多线程处理

在Node.js中,处理并发和多线程是一个非常重要的话题。由于Node.js是单线程的,这意味着它在任何给定时间内只能执行一个任务。然而,Node.js的事件驱动和非阻塞I/O模型使得处理并发和多线程变得更加高效和简单。在本文中,我们将探讨…

恋爱话术小程序源码支持多种流量主模式

源码介绍 这就是一款恋爱话术小程序,该款小程序相对来说还是挺强大的 这款小程序基本分段都是和外面几千块几百块的分段是一样的,基本就是从开场-情绪-聊天-升级-邀约-约会等几大分类开始 然后每一大分类下面都有N个小分类来做识别 另外也支持输入对方的话或关键词获取相关的话…

Container killed on request. Exit code is 143

Bug信息 WARN YarnAllocator: Container marked as failed: container_e33_1480922439133_0845_02_000002 on host: hdp4. Exit status: 143. Diagnostics: Container killed on request. Exit code is 143 Container exited with a non-zero exit code 143 Killed by externa…

大数据技术(一)

大数据技术概述 大数据技术层面及其功能 数据采集与预处理 利用ETL(extract-transform-load)工具将分布的、异构数据源中的数据,如关系数据、平面数据文件等,抽取到临时中间层后进行清洗、转换、集成,最后加载到数据仓库或数据集市中&…

机器人 标准DH与改进DH

文章目录 1 建立机器人坐标系1.1 连杆编号1.2 关节编号1.3 坐标系方向2 标准DH(STD)2.1 确定X轴方向2.2 建模步骤2.3 变换顺序2.4 变换矩阵3 改进DH(MDH)3.1 确定X轴方向3.2 建模步骤3.3 变换顺序3.4 变换矩阵4 标准DH与改进DH区别5 Matlab示例参考链接1 建立机器人坐标系 1.1…

【Python】变量的引用

🚩 WRITE IN FRONT 🚩 🔎 介绍:"謓泽"正在路上朝着"攻城狮"方向"前进四" 🔎🏅 荣誉:2021|2022年度博客之星物联网与嵌入式开发TOP5|TOP4、2021|2222年获评…

微服务day04-基于Feign的远程调用

一.Feign的认识 是http客户端,因为使用RestTemplate存在一些问题:代码可读性差,参数配置费事,不够优雅… String url"http://userservice/user/"order.getUserId(); User userrestTemplate.getForObject(url,User.cla…

【AIGC】如何提高Prompt准确度

前言 随着人工智能的迅猛进展,AIGC(通用人工智能聊天工具)已成为多个行业中不可或缺的自然语言处理技术。Prompt作为AIGC系统的一项关键功能,在工具的有效运作中发挥了举足轻重的作用。本篇文章将深入探讨Prompt与AIGC之间的紧密…

OpenLayers线性渐变和中心渐变(径向渐变)

目录 1.前言2.添加一个面要素3.线性渐变3.1 第一个注意点3.2 第二个注意点 4.中心渐变(径向渐变)5.总结 1.前言 OpenLayers官网有整个图层的渐变示例,但是没有单个要素的渐变示例,我们这里来补充一下。OpenLayers中的渐变是通过fi…

PostgreSQL restartpoint 原理详解

背景 大部分人对 PG 的 checkpoint 机制会熟悉一点,但是对 restartpoint 却不太熟悉,网上介绍这方面的文章也比较少。因此,本文将以 PG 14.7 的社区代码为基础,介绍 PG 中的 restartpoint 机制。 原理介绍 什么是 restartpoint…

Docker的基础知识与应用技巧

文章目录 一.docekr简介二.docekr安装三.docker命令 一.docekr简介 Docker是一个开源的应用容器引擎,它可以让开发者打包他们的应用以及依赖包到一个可移植的镜像中,然后发布到任何流行的Linux或Windows操作系统的机器上。Docker基于轻量级虚拟化技术&a…

2024022701-信息安全(二)——密码学

密码学的基本概念 密码学(Cryptology): 研究信息系统安全保密的科学。 密码编码学(Cryptography): 研究对信息进行编码,实现对信息的隐蔽。 密码分析学(Cryptanalytics) : 研究加密消息的破译或消息的伪造。 消息被称为明文(Plaintext)。 用…

【CSS】(浮动定位)易忘知识点汇总

浮动特性 加了浮动之后的元素,会具有很多特性,需要我们掌握的. 1、浮动元素会脱离标准流(脱标:浮动的盒子不再保留原先的位置) 2、浮动的元素会一行内显示并且元素顶部对齐 注意: 浮动的元素是互相贴靠在一起的(不会有缝隙)&…

用于游戏开发的顶级 PYTHON 框架

一、说明 我们试图用python开发游戏,一旦产生这个念头,就伴随这样一个问题:当今用于构建游戏的领先 Python 框架有哪些?python下,支持游戏开发平台有哪些优势?我们在这篇博文中告诉你。 二、高级游戏平台简…

nginx使用详解--缓存

Nginx 是一个功能强大的 Web 服务器和反向代理服务器,它可以用于实现静态内容的缓存,缓存可以分为客户端缓存和服务端缓存。 客户端缓存 客户端缓存指的是浏览器缓存, 浏览器缓存是最快的缓存, 因为它直接从本地获取(但有可能需要发送一个协商缓存的请…