【分布式技术专题】RocketMQ延迟消息实现原理和源码分析

痛点背景

业务场景

假设有这么一个需求,用户下单后如果30分钟未支付,则该订单需要被关闭。你会怎么做?

之前方案

最简单的做法,可以服务端启动个定时器,隔个几秒扫描数据库中待支付的订单,如果(当前时间-订单创建时间)>30分钟,则关闭订单。

方案评估
  • 优点:是实现简单,缺点呢?
  • *缺点:定时扫描意味着隔个几秒就得查一次数据库,频率高的情况下,如果数据库中订单总量特别大,这种高频扫描会对数据库带来一定压力,待付款订单特别多时(做个爆品秒杀活动,或者啥促销活动),若一次性查到内存中,容易引起宕机,需要分页查询,多少也会有一定数据库层面压力。
延时队列出现
  • 能够在指定时间间隔后触发某个业务操作
  • 能够应对业务数据量特别大的特殊场景

RocketMQ延时消息能够完美的解决上述需求,正常的消息在投递后会立马被消费者所消费,而延时消息在投递时,需要设置指定的延时级别(不同延迟级别对应不同延迟时间),即等到特定的时间间隔后消息才会被消费者消费,这样就将数据库层面的压力转移到了MQ中,也不需要手写定时器,降低了业务复杂度,同时MQ自带削峰功能,能够很好的应对业务高峰。

功能特点

  • RocketMQ支持发送延迟消息,但不支持任意时间的延迟消息的设置,仅支持内置预设值的延迟时间间隔的延迟消息;
  • 预设值的延迟时间间隔为:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h;
  • 在消息创建的时候,调用 setDelayTimeLevel(int level) 方法设置延迟时间;
  • *broker在接收到延迟消息的时候会把对应延迟级别的消息先存储到对应的延迟队列中,等延迟消息时间到达时,会把消息重新存储到对应的topic的queue里面。

Broker处理延迟消息

延时队列生产者端:

延时消息的关键点在于Producer生产者需要给消息设置特定延时级别,消费端代码与正常消费者没有差别。

public class Producer {private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.setNamesrvAddr("111.231.110.149:9876");producer.start();for (int i = 0; i < 10; i++) {try {Message msg = new Message("TopicTest" ,"TagA" ,("test message" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));msg.setDelayTimeLevel(3);SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}producer.shutdown();}
}
复制代码
初始化

DefaultMessageStore在启动时,会调用ScheduleMessageService#load()方法来加载消息消费进度和初始化延迟级别对应map,然后调用ScheduleMessageService#start()方法来启动类

load方法

public boolean load() {boolean result = super.load();result = result && this.parseDelayLevel();return result;
}
复制代码

ScheduleMessageService继承自ConfigManager类,super.load()方法对应

public boolean load() {String fileName = null;try {fileName = this.configFilePath();String jsonString = MixAll.file2String(fileName);if (null == jsonString || jsonString.length() == 0) {return this.loadBak();} else {this.decode(jsonString);log.info("load " + fileName + " OK");return true;}} catch (Exception e) {log.error("load " + fileName + " failed, and try to load backup file", e);return this.loadBak();}
}
复制代码

延时队列源码分析:

先从延时消息延迟级别设置与broker端消息持久化入手。

具体实现

RocketMQ发送延时消息时先把消息按照延迟时间段发送到指定的队列中(rocketmq把每种延迟时间段的消息都存放到同一个队列中)然后通过一个定时器进行轮训这些队列,查看消息是否到期,如果到期就把这个消息发送到指定topic的队列中,这样的好处是同一队列中的消息延时时间是一致的,还有一个好处是这个队列中的消息时按照消息到期时间进行递增排序的,说的简单直白就是队列中消息越靠前的到期时间越早。

启动延迟消息定时任务

如果想要深入了解的可以看一下ScheduleMessageService这个类

内部变量含义

延时消息定时投递相关具体实现代码在ScheduleMessageService中,先看下变量定义

  • delayLevelTable定义了延迟级别和延迟时间的对应关系
  • *offsetTable存放延延迟级别对应的队列消费的offset
ScheduleMessageService.start()
复制代码

延迟消息投递

其中根据,delayLevel获取消费队列id的方法如下,即queueId = delayLevel-1

public static int delayLevel2QueueId(final int delayLevel) {return delayLevel - 1;
}
复制代码

核心逻辑就是取出tagCode(延时消息持久化时,tagsCode存储的是消息投递时间),解析成消息投递时间,与当前时间戳做差,判断是否应该进行消息投递,具体进行消息投递的方法,在if (countdown

分享资源

资源分享
获取以上资源请访问开源项目 点击跳转

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/90945.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

RocketMQ双主双从同步集群部署

&#x1f388; 作者&#xff1a;互联网-小啊宇 &#x1f388; 简介&#xff1a; CSDN 运维领域创作者、阿里云专家博主。目前从事 Kubernetes运维相关工作&#xff0c;擅长Linux系统运维、开源监控软件维护、Kubernetes容器技术、CI/CD持续集成、自动化运维、开源软件部署维护…

Java接口压力测试—如何应对并优化Java接口的压力测试

导言 在如今的互联网时代&#xff0c;Java接口压力测试是评估系统性能和可靠性的关键一环。一旦接口不能承受高并发量&#xff0c;用户体验将受到严重影响&#xff0c;甚至可能导致系统崩溃。因此&#xff0c;了解如何进行有效的Java接口压力测试以及如何优化接口性能至关重要…

Linux系统USB摄像头测试程序(二)_读取配置

1、收先安装gtk3&#xff0c;我的测试机器是ubutn16.04&#xff0c;只要执行下面的安装命令就可以了 apt-get install libgtk-3-dev 使用下列命令验证是否安装好gtk3&#xff1a; pkg-config --cflags --libs gtk-3.0 2、显示结果类似如下&#xff1a; -pthre…

这是一篇关于SQL 脚本表间连接join的可视化说明

使用SQL合并两个数据集可以通过JOINS来完成。JOIN是查询的FROM子句中的SQL指令&#xff0c;用于标识要查询的表以及它们应该如何组合。 主键和外键 通常&#xff0c;在关系数据库中&#xff0c;数据被组织到由属性&#xff08;列&#xff09;和记录&#xff08;行&#xff09…

MySQL运维

日志 错误日志 show VARIABLES like %log_error%;使用 tail -f 错误文件路径 可以查看具体错误二进制日志 show variables like %log_bin%;在my.ini文件下的mysqlID下添加 log_binmysql-bin binlog-formatROW重启就开启binlog了 show VARIABLES like %binlog_format%;mys…

i18n 配置vue项目中英文语言包(中英文转化)

一、实现效果 二、下载插件创建文件夹 2.1 下载cookie来存储 npm install --save js-cookienpm i vue-i18n -S 2.2 封装组件多页面应用 2.3 创建配置语言包字段 三、示例代码 3.1 main.js 引用 i18n.js import i18n from ./lang// 实现语言切换:i18n处理element&#xff0c…

屏蔽socket 实例化时,握手阶段报错信息WebSocket connection to ‘***‘ failed

事情起因是这样的&#xff1a; 我们网站是需要socket链接实行实时推送服务&#xff0c;有恶意竞争对手通过抓包或者断网&#xff0c;获取到了我们的socket链接地址&#xff0c;那么他就可以通过java写一个脚本无限链接这个socket地址。形成dos攻击。使socket服务器资源耗尽&…

【运维】linkis安装dss保姆级教程与踩坑实践

文章目录 一. 安装准备二. 创建用户三. 准备安装包四. 修改配置1. 修改config.sh2. 修改db.sh 五、安装和使用1. 执行安装脚本2. 启动服务3. 查看验证是否成功 六. 报错处理报错一&#xff1a;The user is not logged in报错二&#xff1a;dss接口报错报错三&#xff1a;执行没…

算法随笔:图论问题之割点割边

割点 定义 割点的定义&#xff1a;如果一个点被删除之后会导致整个图不再是一个连通图&#xff0c;那么这个顶点就是这个图的割点。举例&#xff1a; 上图中的点2就是一个割点&#xff0c;如果它被删除&#xff0c;则整个图被分为两个连通分量&#xff0c;不再是一个连通图。…

vue3多条件搜索功能

搜索功能在后台管理页面中非常常见&#xff0c;本篇就着重讲一下vue3-admin-element框架中如何实现一个顶部多条件搜索功能 一、首先需要在vue页面的<template></template>中写入对应的结构 <!-- 搜索 --><div style"display: flex; justify-content…

突破大模型 | Alluxio助力AI大模型训练-成功案例(一)

更多详细内容可见《Alluxio助力AI大模型训练制胜宝典》 【案例一&#xff1a;知乎】多云缓存在知乎的探索:从UnionStore到Alluxio 作者&#xff1a;胡梦宇-知乎大数据基础架构开发工程师&#xff08;内容转载自InfoQ&#xff09; 一、背景 随着云原生技术的飞速发展&#xff…

ApacheCon - 云原生大数据上的 Apache 项目实践

Apache 软件基金会的官方全球系列大会 CommunityOverCode Asia&#xff08;原 ApacheCon Asia&#xff09;首次中国线下峰会将于 2023 年 8 月 18-20 日在北京丽亭华苑酒店举办&#xff0c;大会含 17 个论坛方向、上百个前沿议题。 字节跳动云原生计算团队在此次 CommunityOve…

Java多线程编程中的线程间通信

Java多线程编程中的线程间通信 基本概念&#xff1a; ​ 线程间通信是多线程编程中的一个重要概念&#xff0c;指的是不同线程之间如何协调和交换信息&#xff0c;以达到共同完成任务的目的。 线程间通信的目的 ​ 是确保多个线程能够按照一定的顺序和规则进行协作&#xff…

使用QT可视化设计对话框详细步骤与代码

一、创建对话框基本步骤 创建并初始化子窗口部件把子窗口部件放到布局中设置tab键顺序建立信号-槽之间的连接实现对话框中的自定义槽 首先前面三步在这里是通过ui文件里面直接进行的&#xff0c;剩下两步则是通过代码来实现 二、项目创建详细步骤 创建新项目 为项目命名 为…

计算机组成原理之地址映射

例1&#xff1a;某计算机主存容量256MB&#xff0c;按字编址&#xff0c;字长1B&#xff0c;块大小32B&#xff0c;Cache容量512KB。对如下的直接映射方式、4-路组相联映射方式、全相联映射方式的内存地址格式&#xff0c;求&#xff1a; &#xff08;1&#xff09;计算A、B、C…

案例研究|大福中国通过JumpServer满足等保合规和资产管理双重需求

“大福中国为了满足安全合规要求引入堡垒机产品&#xff0c;在对比了传统型堡垒机后&#xff0c;发现JumpServer使用部署更加灵活&#xff0c;功能特性丰富&#xff0c;能够较好地满足公司在等保合规和资产管理方面的双重需求。” ——大福&#xff08;中国&#xff09;有限公…

网络编程(8.14)TCP并发服务器模型

作业&#xff1a; 1. 多线程中的newfd&#xff0c;能否修改成全局&#xff0c;不行&#xff0c;为什么&#xff1f; 2. 多线程中分支线程的newfd能否不另存&#xff0c;直接用指针间接访问主线程中的newfd,不行&#xff0c;为什么&#xff1f; 多线程并发服务器模型原代码&…

YOLOv5基础知识入门(3)— 目标检测相关知识点

前言&#xff1a;Hello大家好&#xff0c;我是小哥谈。YOLO算法发展历程和YOLOv5核心基础知识学习完成之后&#xff0c;接下来我们就需要学习目标检测相关知识了。为了让大家后面可以顺利地用YOLOv5进行目标检测实战&#xff0c;本节课就带领大家学习一下目标检测的基础知识点&…

SpringBootWeb案例

通过该综合案例,我们就可以知道,在开发一个Web程序时,前端程序、后端程序以及数据库这三者之间是如何交互、如何协作的,而通过这个综合案例也需要掌握根据接口文档开发服务端接口的能力。 而这个案例呢,就是Tlias智能学习辅助系统。 产品经理所绘制的页面原型: 在这个案…

less基本使用

1 less中的变量 //对值进行声明 link-color: #ccc//定义变量名称 .{sleName} {}bg: background-color; //定义属性名称 .container {{bg}: red; }2 继承&#xff08;复用重复样式&#xff09; //继承必须位于选择器最后 //继承选择器名不能为变量 .a:hover:extend(.b) {}.a {…