图解RocketMQ之如何实现顺序消息

大家好,我是苍何。

顺序消息是业务中常用的功能之一,而 RocketMQ 默认发送的事普通无序的消息,那该如何发送顺序消息呢?

要保证消息的顺序,要从生产端到 broker 消息存储,再到消费消息都要保证链路的顺序,才可以做到真正的顺序消息。

何为顺序

那苍何首先抛出一个「玄学」问题,何为顺序?

你肯定会用你聪明的大脑瓜子说"顺序不就是有序吗?有啥好说的"。不,那你就浅了,我从顺序的严格划分程度来说,可分为:

  • 普通顺序
  • 严格顺序

普通顺序是指的相同队列收到的消息是有序的(有时也叫局部有序)。这有个前提条件,必须消息在同一个队列,我们知道队列本身就是先进先出嘛,故而自带顺序。

普通顺序场景

严格顺序就好理解啦,我可不管你是哪个队列,哪个 topic,不管你在天涯海角的哪一台服务器,都要满足顺序(有时也叫全局有序)。

严格顺序消费

这一看就很难对不对,在地球上天涯海角的 2 台服务器,硬是要来保证顺序。

那如果是业务来分呢,顺序还可以分为:因果顺序时间顺序

凡事有因必有果,比如交易系统中订单创建、支付、退款等流程,先要创建订单才能支付,支付完成才能退款。这些步骤间有因果顺序

因果顺序

对于时间顺序步骤之间没有因果联系,只要满足先进先出的顺序,比如股票交易中,对于出价相同的交易单,坚持按照先出价先交易的原则,下游处理订单的系统需要严格按照出价顺序来处理订单。

股票交易中的时间顺序

所以一个简单的玄学问题硬是可以整出好几个概念,哈哈,这就是计算机的魅力。总结下何为顺序吧:

何为顺序总结

顺序消息的使用场景

其实顺序消息的使用场景比较多,只要你的业务需要用消息来保证顺序就都用到顺序消息,你肯定会说"苍何,你这说了和没说一样😂"

别急,除了刚刚的股票交易场景,在 MySQL 数据库利用 binlog 消息来进行数据实时增量同步也是需要顺序消息的。

增量同步是指的两个不同数据库,当有一方增删改时,另一方也要同步进行增删改。

增量同步的目的是为了保证数据数据一致性,如果是普通消息,数据可能就乱了。

数据实时增量同步

就这两场景啊,也不多啊,别急,以下这些业务场景也都可用到顺序消息:

  • 库存管理:保证商品入库、出库、盘点等操作的顺序性,避免超卖或库存错误。
  • 事件溯源:在事件驱动架构中,保证事件按照发生的顺序被处理和记录。
  • 消息重试机制:在处理失败需要重试的场景中,保证重试消息的顺序。

顺序发送技术原理

关关难过,关关过。要保证顺序消息,第一关,当然是,先让生产者发送的消息是顺序的吧

在之前文章 [[图解RocketMQ之生产者如何进行消息重试]]中,说到了在 RocketMQ 中消息发送的 3 种方式:同步、异步、单向。

要保证发送消息的顺序,只能保证同步发送,且必须是单个生产者。这 2 个条件缺一不可。

究其原因还是因为顺序发送的技术原理,其技术原理也比较简单,就是要将同一类消息发送到相同队列即可。

RocketMQ 顺序消息的顺序关系是通过消息组(MessageGroup)判定和识别。发送消息的时候需要为每条消息设置 MessageGroup。那如何保证同一 MessageGroup 下同一业务 ID 的消息发送到指定相同的队列呢?

我们不妨看看 RocketMQ 的源码,在 SelectMessageQueueByHash 类中:

public class SelectMessageQueueByHash implements MessageQueueSelector {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {int value = arg.hashCode();if (value < 0) {value = Math.abs(value);}value = value % mqs.size();return mqs.get(value);,}
}

在发送消息的时候,生产者可以使用这个选择器,来选择指定队列的消息;

SendResult sendResult = producer.send(msg, new SelectMessageQueueByHash(), orderId);
  • orderId 指的就是业务 ID,取 orderId 的 hash 值(想通的业务 ID 具有想通的 hash 值)
  • 用哈希值和队列数 mqs.size() 取摸,得到一个索引值,结果会小于队列数
  • 根据索引值从队列列表中取出一个队列,那 hash 值相同,取出的队列也就相同。

通过发送消息的时候指定队列方式就保证了同一个业务 ID 发往相同的队列,也即保证消息发送的顺序性。

生产者保证消息顺序原理

队列列表的获取,Producer 会从 Nameserver 中获取对应 Topic 的 Broker 列表,并将结果缓存到本地,下次将直接从缓存中拿到结果。

顺序存储技术原理

我们在之前的[[图解RocketMQ之消息如何存储]]中有介绍通过 commitlog 存储全量的消息,且会按照 topic 和队列分配到 consumerQueue 中。

由于我们发送消息的时候指定了队列,那对于相同业务 ID 的消息,也会被存储到相同的 consumerQueue 中,且通常在实际项目中,同一个业务位于同一个消息组。

这样相同一笔订单,无论是创建、支付还是退款消息,都按顺序会被发送到相同的队列,不同的订单会被分配到不同的队列中。且消息存储也是按照顺序的。

顺序存储技术原理

这个时候问题很多的小明又问了,如果不同消息组的消息都发往 Topic 中的同一个队列,那这个时候存储的 consumerQueue 中也会有多个消息组的消息,如何保证顺序呢?

多个消息组发送同一个队列如何保证顺序性

这其实就扯到一个概念了,我们这里所说的顺序其实还只是分区顺序,也就是同一个消息组的消息在队列中是能保证顺序的,不同消息组的消息在同一个队列中无法保证顺序。

上图中的例子就是对于消息组 1 中的订单消息在 order-队列 1 中的消息是按照顺序存储的,同理消息组 3 中的消息 msg-1 和 msg-2 在 order-队列 1 中消息也是顺序的。

但消息组 1 和消息组 3 虽然都放在了同一个队列,但并不涉及到顺序。

顺序消费技术原理

RocketMQ 支持 2 种消费模式,集群消费广播消费

集群消费模式下每一条消息只会被 ConsumerGroup 分组下的一个 Consumer 消费,而广播消费模式下,每个 Consumer 都会消费这条消息。

多数场景下用的都是集群消费,也就是一次消费代表一次业务处理,每一条消息都将由集群中的一个实例来对应处理。

而顺序消费也叫有序消费,如果消息是顺序发送,且顺序存储,那理应消费也是一条条消费,这个用屁股想也知道,但实际却没这么简单。

在 Consumer 中不止一个线程在那消费,因为同一个消费者可能会处理不同的队列消息。如果只有一个线程。那不得慢死,实际上会有多个线程同时消费,对应的是 Consumer 中的消费线程池

多个线程消费同一条消息,如何防止消息被重复消费又是一大问题。

如果是你,会怎么做呢?

没错,就是加锁,在 RocketMQ 中用了 3 把锁来保证,分别是分布式锁SynchronizedReentrantLock

我们先来看看第一把锁:分布式锁。

顺序消费用的是 MessageListenerOrderly 来保证顺序消费,RocketMQ 默认已经提供了一个实现类 ConsumeMessageOrderlyService 。

这个 service 在启动的时候就会向 Broker 申请当前消费者负责的队列锁,会将自己的消费组、自己的客户端ID、以及负责的队列发往 Broker,Broker 就把对应的队列与这个消费者绑定,将这个关系存储在了本地。

分布式锁源码

加了这分布式锁,就可以保证在同一个消费组内,一个队列只能被一个 Consumer 消费。

这个分布式锁在 broker 中会过期,默认消费者每 20 s 去续签这把锁。

在 Consumer 中消费线程池会并发消费,分布式锁可管不到这,那就需要另外一把本地锁,那就是 Synchronized。

Synchronized 其实是为了保证同一个队列的消息只会被 Consumer 线程池中的一个线程所消费

最后终于费了九牛二虎之力获得了消费的资格,还不够,在消费内部逻辑中又加了一把更细粒度的 ReentrantLock 锁来标记队列还有消费者在消费。

特别注意在顺序消费时,如果有线程消费发生异常,会阻塞该队列中的其他消息,因为他拿着锁不妨,别的消费者依然也无法获取,之前说过有重试机制可以重试,直到超出最大重试次数,在这段期间内,该队列的消息都将会被阻塞。

实际顺序消息中最大重试次数要谨慎设置,防止消息大量堆积。

实战——如何发送和消费顺序消息

以 PmHub 项目中的顺序发和消费消息为例,我们来实战一波。

顺序发消息

public class OrderMessageProducer {private DefaultMQProducer producer;public OrderMessageProducer() throws Exception {producer = new DefaultMQProducer("order_producer_group");producer.setNamesrvAddr("localhost:9876");producer.start();}public void sendOrderMessage(String orderId, String status) throws Exception {Message msg = new Message("OrderTopic", status, orderId.getBytes());SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {String orderId = (String) arg;int index = Math.abs(orderId.hashCode()) % mqs.size();return mqs.get(index);}}, orderId);System.out.printf("Send order message: %s, status: %s, result: %s\n", orderId, status, sendResult);}public void shutdown() {producer.shutdown();}
}

在发消息时候,OrderMessageProducer 使用 MessageQueueSelector 来确保同一订单的消息被发送到同一个队列。且需要注意设置发送消息为同步发送(默认)。

顺序消费

public class OrderMessageConsumer {private DefaultMQPushConsumer consumer;public OrderMessageConsumer() throws Exception {consumer = new DefaultMQPushConsumer("order_consumer_group");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("OrderTopic", "*");consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {String orderId = msg.getKeys();String status = new String(msg.getBody());System.out.printf("Consume order message: %s, status: %s\n", orderId, status);// 在这里处理订单状态更新的业务逻辑}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.println("Order message consumer started.");}public void shutdown() {consumer.shutdown();}
}

OrderMessageConsumer 使用 MessageListenerOrderly 来保证消息的顺序消费。

这样我们就实现了顺序生产和消费消息。

实际上企业级项目中,实现顺序消息需要考虑更为复杂,稍微一不注意就无法保证顺序性,且顺序消息的性能和队列数有很大关系,一般实际项目中都只会分区顺序即可

好啦,今天的分享结束。

我是苍何,这是图解 RocketMQ 教程的第 9 篇,我们下篇见~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/392918.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【C++】二维数组 数组名

二维数组名用途 1、查看所占内存空间 2、查看二维数组首地址 针对第一种用途&#xff0c;还可以计算数组有多少行、多少列、多少元素 针对第二种用途&#xff0c;数组元素、行数、列数都是连续的&#xff0c;且相差地址是有规律的 下面是一个实例 #include<iostream&g…

Spring源码解析(29)之AOP动态代理对象创建过程分析

一、前言 在上一节中我们已经介绍了在createBean过程中去执行AspectJAutoProxyCreator的after方法&#xff0c;然后去获取当前bean适配的advisor&#xff0c;如果还不熟悉的可以去看下之前的博客&#xff0c;接下来我们分析Spring AOP是如何创建代理对象的&#xff0c;在此之前…

【目标检测类】YOLOv5网络模型结构基本原理讲解

1. 基本概念 YOLOv5模型结构主要包括以下组成部分&#xff1a;‌ 输入端&#xff1a;‌YOLOv5的输入端采用了多种技术来增强模型的性能&#xff0c;‌包括Mosaic数据增强、‌自适应锚框计算、‌以及自适应图片缩放。‌这些技术有助于提高模型的泛化能力和适应不同尺寸的输入图…

MySQL基础操作全攻略:增删改查实用指南(中)

本节目标&#xff1a; NOT NULL - 指示某列不能存储 NULL 值。 UNIQUE - 保证某列的每行必须有唯一的值。 DEFAULT - 规定没有给列赋值时的默认值。 PRIMARY KEY - NOT NULL 和 UNIQUE 的结合。确保某列&#xff08;或两个列多个列的结合&#xff09;有唯一标 识&am…

【C++】模拟实现stack

&#x1f984;个人主页:修修修也 &#x1f38f;所属专栏:实战项目集 ⚙️操作环境:Visual Studio 2022 ​ 目录 一.了解项目功能 &#x1f4cc;了解stack官方标准 &#x1f4cc;了解模拟实现stack 二.逐步实现项目功能模块及其逻辑详解 &#x1f4cc;实现stack成员变量 &…

【Linux】进程间通信(管道通信、共享内存通信)

一.什么是进程间通信 进程间通信这五个字很好理解&#xff0c;就是进程和进程之间通信。 那么为什么要有进程间通信呢&#xff1f; 1.数据传输&#xff1a;一个进程需要将它的数据发送给另一个进程 2.资源共享&#xff1a;多个进程之间共享同样的资源 3.通知事件&#xff1a;一…

遗传算法与深度学习实战——生命模拟与进化论

遗传算法与深度学习实战——生命模拟与进化论 0. 前言1. 模拟进化1.1 代码实现1.2 代码改进 2. 达尔文进化论3. 自然选择和适者生存3.1 适者生存3.2 进化计算中的生物学 小结系列链接 0. 前言 生命模拟通过计算机模拟生物体的基本特征、遗传机制、环境互动等&#xff0c;试图模…

WPF 依赖属性 IsHitTestVisible

IsHitlTestVisible 仅影响本身的元素&#xff08;含内部包含的子元素&#xff09;&#xff0c;不影响父元素效果&#xff0c;且事件会传递到父元素。 Eg&#xff1a; 如父元素有click事件&#xff0c; 子元素设置了IsHitTestVisiblefalse&#xff0c; 当鼠标单击这个子元素时&…

Android 埋点信息分析——内存篇

源码基于&#xff1a;Android U 0. 前言 在前一篇《Android statsd 埋点简析》一文中简单剖析了Android 埋点采集、传输的框架&#xff0c;本文在其基础对埋点信息进行解析&#xff0c;来看下Android 中埋下的内存信息有哪些。 1. 通过代码剖析google 埋点内容 1.1 PROCESS_M…

BootStrap前端面试常见问题

在前端面试中&#xff0c;关于Bootstrap的问题通常围绕其基本概念、使用方式、特性以及实际应用等方面展开。以下是一些常见的问题及其详细解答&#xff1a; 1. Bootstrap是哪家公司研发的&#xff1f; 回答&#xff1a;Bootstrap是由Twitter的Mark Otto和Jacob Thornton合作…

脊髓损伤小伙伴的活力重启秘籍! 让我们一起动起来,拥抱不一样的精彩生活✨

Hey小伙伴们~&#x1f44b; 今天咱们来聊聊一个超级重要又温暖的话题——脊髓损伤后的锻炼大法来啦&#xff01;&#x1f389; 记住&#xff0c;无论遇到什么挑战&#xff0c;我们都要像打不死的小强一样&#xff0c;活力满满地面对每一天&#xff01;&#x1f4aa; 首先&#…

2024实验班选拔考试(热身赛)

比赛传送门 邀请码&#xff1a;2024wksyb A. 简单的数列问题 签到&#xff0c;记得开long long。 #include<bits/stdc.h> #define rep(i,a,b) for (int ia;i<b;i) #define per(i,a,b) for (int ia;i>b;--i) #define se second #define fi first #define endl …

【C#语音文字互转】.NET的TTS文本转语音合成

官方文档给出环境为Visual Studio 2017及以上&#xff1b;C#SDK为.NET4.8及以上 本文章环境介绍&#xff1a; Visual Studio 2022&#xff1b;C#SDK为.NET6.0 语音转文字请移步&#xff1a;【C#语音文字互转】C#语音转文字&#xff08;方法一&#xff09; 一. 启动 Visual Stud…

【OceanBase系列】—— OceanBase应急三板斧

作者&#xff1a; 花名&#xff1a;洪波&#xff0c; OceanBase 数据库解决方案架构师 目前随着OceanBase数据库越来越流行&#xff0c;社区已经有很多用户在生产环境使用了OceanBase&#xff0c;也有不少用户的核心业务用到了OceanBase数据库&#xff0c;在使用OceanBase数据库…

新址·新征程|美创科技北京中心喜迎乔迁

7月30日&#xff0c;北京暴雨倾城 连绵大雨和隆隆雷声 却像是在为一场新征程洗礼 这一天&#xff0c;我们迎来了重要的时刻 ——美创科技北京中心搬新家啦&#xff01; 新址&#xff1a;北京市海淀区庚坊国际大厦6层 喜迎新址&#xff0c;一场简单但喜气盈盈、温馨十足的乔…

【Python学习手册(第四版)】学习笔记16-函数基础

个人总结难免疏漏&#xff0c;请多包涵。更多内容请查看原文。本文以及学习笔记系列仅用于个人学习、研究交流。 本文主要介绍Python中函数的基本概念&#xff0c;作用域以及参数传递&#xff0c;函数语法以及def和return语句的操作&#xff0c;函数调用表达式的行为&#xff…

Delphi5实现DLL的编写、调用

效果图 显式跟隐式调用差不多的&#xff0c;就重新画了窗体&#xff0c;画的有点粗糙。 DLL文件 DLL文件是一种包含了可执行代码的库文件&#xff0c;但它不能独立运行&#xff0c;必须由其他程序&#xff08;如EXE文件&#xff09;显式或隐式地加载并调用。DLL文件通常用于实…

全国地铁路线及站点SHP数据

数据是GIS的血液&#xff01; 我们在《126M全球手机基站SHP数据分享》一文中&#xff0c;为你分享过全球手机基站分布数据。 现在再为你分享全国地铁轻轨路线与站点SHP数据&#xff0c;你可以在文末查看该数据的领取方法。 全球地铁路线及站点数据 截至2023年12月31日&…

LAVIS在Mac,M1PRO芯片下的安装实战

LAVIS在Mac,M1PRO芯片下的安装实战 契机 ⚙ 本地想装个图片理解的大模型&#xff0c;看了下blip2感觉比较合适&#xff0c;macos安装的时候有点坑需要注意下&#xff0c;但是最终也无法使用mps加速&#xff0c;比较蛋疼。这里记录下安装步骤。 安装 LAVIS/projects/blip2 a…

【研发日记】Matlab/Simulink技能解锁(十二)——Stateflow中的两种状态机嵌套对比

文章目录 前言 项目背景 两级状态机 函数状态机 分析和应用 总结 参考资料 前言 见《【研发日记】Matlab/Simulink技能解锁(七)——两种复数移相算法》 见《【研发日记】Matlab/Simulink技能解锁(八)——分布式仿真》 见《【研发日记】Matlab/Simulink技能解锁(九)——基…