RocketMQ 延迟消息

RocketMQ 延迟消息

RocketMQ 消费者启动流程

什么是延迟消息

RocketMQ 延迟消息是指,生产者发送消息给消费者消息,消费者需要等待一段时间后才能消费到。

使用场景

用户下单之后,15分钟未支付,对支付账单进行提醒或者关单处理。

RocketMQ 开源版本的消息不支持任意时间精度,只支持5s 10s 1m等等。

Broker 如何处理延迟消息

消息投递如下:

  1. 生产者发送一个延迟消息到一个 topic
  2. Broker 判断是个延迟消息后,将消息暂存
  3. Broker 通过延迟服务, 先检查消息是否过期,如果到期将消息投递到目标 topic
  4. 消费者消费topic中的投递延迟消息。

开源RocketMQ 的消息不支持任意精度,默认支持 18个 level:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

Broker 在启动的时候,会创建一个内部 topic:“SCHEDULE_TOPIC_XXXX” 根据延迟 level 数量,创建对应数量的 队列。 也就是说 18 level 对应了18 个队列。

具体可以在 代码TopicConfigManager.java 中 看到:

private static final int SCHEDULE_TOPIC_QUEUE_NUM = 18;

要注意的是,Broker 一般是集群模式
部署,也就是说,每个Broker 都会有18个队列。

TopicConfigManager#TopicConfigManager(BrokerController brokerController)

生产者消息延迟发送

代码示例如下:

Message msg=new Message();
msg.setTopic("TopicA");
msg.setTags("Tag");
msg.setBody("this is a delay message".getBytes());
//设置延迟level为5,对应延迟1分钟
msg.setDelayTimeLevel(5);
producer.send(msg);

Broker 存储延迟消息

上一篇文章已经谈到,Broker 收到消费者消息后,会进行消息存储,然后再转发到消费队列(ConsumerQueue),然后再推给消费者。

其实一旦消息转发到

存储延迟消息的流程也类似

  1. 确定延迟消息投递到topic 哪个队列。存储生产者写入的消息时,将消息转发到 ConsumeQueue 中,消费者就能消费到。 延迟消息不能立即消息到,于是将 topic 名称修改为 SCHEDULE_TOPIC_XXX,并根据延迟消息级别,确定投递到哪个队列上。同时还会将原来消息要发送到的目标 topic 和队列记录投递到哪个队列。

代码在CommitLog#asyncPutMessage 中

设置延迟消息的投递队列信息代码如下:

 // Delay Deliveryif (msg.getDelayTimeLevel() > 0) {// 如果设置的级别超过了最大级别,重置延迟级别if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}
// 计算延迟消息应该投递到 SCHEDULE_TOPIC_XXXX 到哪个队列。topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// Backup real topic, queueId// 记录原始 topic ,queueid,方便后期投递到目标 topicMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
// 更新消息投递目标为 SCHEDULE_TOPIC_XXX,queueIdmsg.setTopic(topic);msg.setQueueId(queueId);}

消息转发

消息转发过程其实中会对延迟消息做一些特殊处理

CommitLog中的消息转发到CosumeQueue中是异步进行的。在转发过程中,会对延迟消息进行特殊处理,主要是计算这条延迟消息需要在什么时候进行投递。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/87126.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

推荐 4 个 yyds 的 GitHub 项目

本期推荐开源项目目录: 1. 开源的 Markdown 编辑器 2. MetaGPT 3. SuperAGI 4. 一个舒适的笔记平台 01 开源的 Markdown 编辑器 Cherry 是腾讯开源的 Markdown 编辑器,基于 Javascript具有轻量简洁、易于扩展等特点, 它可以运行在浏览器或服…

【C语言】进阶指针,超详解,含丰富代码示例

文章目录 前言指针进阶的重点内容1.字符指针2.数组指针3.指针数组4.函数指针5.函数指针数组6. 指向函数指针数组的指针 总结 这里是初阶的链接,方便大家对照查看!!!添加链接描述 前言 大家好呀,今天和大家将指针进阶…

【Linux】网络基础2

文章目录 网络基础21. 应用层1.1 协议1.2 HTTP 协议1.2.1 URL1.2.2 urlencode和urldecode1.2.3 HTTP协议格式1.2.4 HTTP的方法1.2.5 HTTP的状态码1.2.6 HTTP 常见的header1.2.7 最简单的HTTP服务器 2. 传输层2.1 端口号2.1.1 端口号范围划分2.1.2 认识知名端口号2.1.3 netstat2…

RISC-V走向开放服务器规范

原文:RISC-V Moving Toward Open Server Specification 作者:Agam Shah 转载自:https://www.hpcwire.com/2023/07/24/risc-v-moving-toward-open-server-specification/ 中文翻译: 2023年7月24日 RISC-V International目前正…

Android之消除APP图标的白色边框

有问题的效果: 解决方案: 第一步:app右键—>new—>Image Asset 第二步:上传Logo图标,选择每种分辨率,预览看效果,选择Resize,可以微调 第三步:点击 Next&#xff…

pytest-xdist分布式测试原理浅析

目录 pytest-xdist执行流程: pytest-xdist 模块结构: pytest-xdist分布式测试原理: pytest-xdist源码浅读: pytest-xdist执行流程: 解析命令行参数:pytest-xdist 会解析命令行参数,获取用户…

K8s环境下监控告警平台搭建及配置

Promethues是可以单机搭建的,参考prometheus入门[1] 本文是就PromethuesGrafana在K8s环境下的搭建及配置 Prometheus度量指标监控平台简介 启动minikube minikube start 安装helm 使用Helm Chart 安装 Prometheus Operator: helm install prometheus-operator stabl…

idea找不到DataBase

一、我想把数据库跟我的idea链接,结果发现找不到。如图。 二、解决方案 找到 file ---setting 找到plugin------找到marketplace 我的已经出现了

Jmeter入门之digest函数 jmeter字符串连接与登录串加密应用

登录请求中加密串是由多个子串连接,再加密之后传输。 参数连接:${var1}${var2}${var3} 加密函数:__digest (函数助手里如果没有该函数,请下载最新版本的jmeter5.0) 函数助手:Options > …

EMQX物联网竟然用这个?(一)——简介

一、前言 我们这些年,“物联网”这个名称越来越被大家所知道了。 物联网 (Internet of things),简称 IoT,这个概念在1991年就被漂亮国提出来了,解释一下就是万物可以通过互联网连接起来,可以进…

通用FIR滤波器的verilog实现(内有Lowpass、Hilbert参数生成示例)

众所周知,Matlab 中的 Filter Designer 可以直接生成 FIR 滤波器的 verilog 代码,可以方便地生成指定阶数、指定滤波器参数的高通、低通、带通滤波器,生成的 verilog 代码也可以指定输入输出信号的类型和位宽。然而其生成的代码实在算不上美观…

掌握 JVM 调优命令

常用命令 1、jps查看当前 java 进程2、jinfo实时查看和调整 JVM 配置参数3、jstat查看虚拟机统计信息4、jstack查看线程堆栈信息5、jmap查看堆内存的快照信息 JVM 日常调优总结起来就是:首先通过 jps 命令查看当前进程,然后根据 pid 通过 jinfo 命令查看…

c语言——完数的计算

完数即所有因子之和等于其本身值 列入,28124714,28所有的因子为1,2,4,7,14 而这五个因子之和恰好也是28. //完数的计算 /*完数即所有因子之和等于其本身值 列入,28124714,28所有的…

取证--理论

资料: 各比赛 Writeup : https://meiyacup.cn/Mo_index_gci_36.html 哔站比赛复盘视频: https://space.bilibili.com/453117423?spm_id_from333.337.search-card.all.click 自动分析取证四部曲 新建案例添加设备自动取证制作报告 取证大…

图片预览插件vue-photo-preview的使用

移动端项目中需要图片预览的功能,但本身使用mintui,vantui中虽然也有,但是为了一个组件安装这个有点儿多余,就选用了vue-photo-preview插件实现(其实偷懒也不想自己写)。 1、安装 npm i vue-photo-preview…

宋浩高等数学笔记(十一)曲线积分与曲面积分

个人认为同济高数乃至数学一中最烧脑的一章。。。重点在于计算方式的掌握,如果理解不了可以暂时不强求,背熟积分公式即可。此外本贴暂时忽略两类曲面积分之间的联系,以及高斯公式的相关内容,日后会尽快更新,争取高效率…

安装Qt选择组件

最近在做Qt相关的开发,首先搭建开发环境,刚开始对组件这块不是很熟悉,需要了解这方面的知识,写下来主要是方便记住关于选择组件的说明,Qt版本是最新的长期维护版本,版本号:6.5.2 一、选择要安装…

C# Linq源码分析之Take方法

概要 Take方法作为IEnumerable的扩展方法,具体对应两个重载方法。本文主要分析第一个接收整数参数的重载方法。 源码解析 Take方法的基本定义 public static System.Collections.Generic.IEnumerable Take (this System.Collections.Generic.IEnumerable source…

Easys Excel的表格导入(读)导出(写)-----java

一,EasyExcel官网: 可以学习一些新知识: EasyExcel官方文档 - 基于Java的Excel处理工具 | Easy Excel 二,为什么要使用easyexcle excel的一些优点和缺点 java解析excel的框架有很多 : poi jxl,存在问题:非常的消耗内存, easyexcel 我们…

Vue项目npm run dev 启动报错TypeError: Cannot read property ‘upgrade‘ of undefined

vue项目启动报错 TypeError: Cannot read property upgrade of undefined 由于我的vue.config.js文件 里面的代理target为空导致的 修改: 结果就可以正常运行了 参考原文: vue项目运行时报Cannot read property ‘upgrade’ of undefined错误_cannot r…