三种使用 RocketMQ 达到消息一致的最佳实践

引言

Hi 你好,我是有清

RocketMQ 作为一款消息中间件,它的信息的投递与消费,通常都会与数据库的更新进行挂钩,那么如何保证 消息和数据库的更新是一个原子性的操作呢?

比如在我数据库更新失败的时候,不进行消息的发送,或者在我发送消息失败的时候,数据库的更新进行回滚

这边总结三种最佳实践,提供给大家作为参考学习

1. 错误思路

正确方案分享之前,先分享一个错误做法,看下你是否踩坑

1.png

一旦发送 mq 超时,抛出异常,整体事务回滚,下游感知到订单更新成功,但是数据库依旧还是之前的状态,就出现了数据不一致的情况了

2. 基于 RocketMQ 的分布式事务

事务消息是 Apache RocketMQ 提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性

2.1 事务消息交互流程

image-20240714141258260.png

  1. 生产者将消息发送至Apache RocketMQ服务端。

  2. Apache RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。

  3. 生产者开始执行本地事务逻辑。

  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:

    • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
    • 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
  5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。

  6. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

  7. 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

2.2 代码示例

2.png

2.3 注意事项

避免大量未决事务导致超时

Apache RocketMQ支持在事务提交阶段异常的情况下发起事务回查,保证事务一致性。但生产者应该尽量避免本地事务返回未知结果。大量的事务检查会导致系统性能受损,容易导致事务处理延迟。

正确处理"进行中"的事务

消息回查时,对于正在进行中的事务不要返回Rollback或Commit结果,应继续保持Unknown的状态。 一般出现消息回查时事务正在处理的原因为:事务执行较慢,消息回查太快。解决方案如下:

  • 将第一次事务回查时间设置较大一些,但可能导致依赖回查的事务提交延迟较大。
  • 程序能正确识别正在进行中的事务。

3. 基于本地事务的一致性处理

在Spring框架中基于本地消息服务实现MQ最终一致性的过程大致如下:

  1. 业务数据更新:业务服务处理业务逻辑,在事务的保护下更新数据库中的业务数据。
  2. 创建本地消息记录:在同一个业务事务中,业务服务创建本地消息记录,并保存到数据库。这条记录包含了将来要发送到消息队列(MQ)的消息。
  3. 业务事务提交:业务事务成功提交,确保业务数据的更新和消息记录的创建在数据库中保持原子性和一致性。
  4. 异步发送消息:通过定时任务或者事件驱动机制,一个独立于业务逻辑的消息服务定期轮询数据库中的消息记录表,查找处于未发送状态的消息。
  5. 发送MQ消息:消息服务从本地消息表中取出消息记录,向MQ发送消息。成功发送后,更新本地消息记录的状态以表示消息已成功发出。
  6. 处理发送失败:若消息发送失败,比如出现网络问题或者MQ不可用,消息服务记录失败事件,并可根据配置进行重试,直至消息发送成功或者达到最大重试次数。
  7. 消息消费者处理:MQ消费者接收并处理消息。若成功处理(例如,执行了一些跨服务的业务逻辑),则结束。若消费失败(比如业务逻辑异常或者临时错误),通常MQ会重新入队此消息,进行后续重试直至成功或达到重试上限。
  8. 消息状态回查:对于在发送过程中失败或者状态模糊的消息,可能还需要一个检查机制。消息服务可能会对这些消息进行回查,重新核实业务逻辑的完成情况,并决定是否重新发布消息到MQ。

整个过程用于确保即使发生系统故障,也能够通过后续的消息重发和消费重试来达成最终数据的一致性。

3.1 时序说明

3.png

3.2 伪代码

5.png

4. 基于 binlog 变动实现消息一致性

基于监听数据库的binlog(二进制日志)来实现消息一致性是一种有效的技术手段,尤其是在需要跨多个数据库和服务进行数据同步时。这种方法通常称为Change Data Capture(CDC),它能够监听数据库的变更事件,并将这些事件转换为消息,然后传递到消息队列系统中,以便其他服务进行消费。

以下是基于监听binlog实现消息一致性的大致步骤:

  1. 监听数据库的binlog:需要一种机制或工具来监听数据库的binlog。这些工具可以是开源的如Debezium,Maxwell或者商业产品。
  2. 变更事件捕获:当数据库发生写入、更新或删除操作时,这些变更会被记录在binlog中。监听工具捕获这些事件并生成相应的变更事件。
  3. 变更事件发布:捕获的变更事件被发布到消息队列系统中。
  4. 处理消息队列中的事件:其他服务作为消息消费者,从消息队列中读取并处理这些变更事件,实现数据的最终一致性。
  5. 幂等性和错误处理:为了确保系统的健壮性,消费者服务应该具备正确处理重复消息的能力(幂等性)以及恰当的错误处理机制。

4.1 时序说明

6.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/386605.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

学习测试12-车(略)

系统讲解,可以在懂车帝网站去了解汽车结构

用AI做玄学壁纸!多篇笔记爆火,直接变现,轻松日入1000+

玄学是这两年赚钱的大风口,特别是80后、90后和00后这些年轻一代,他们对于个人财运、事业发展、爱情关系以及健康状态的预测和优化表现出浓厚的兴趣,希望通过这些方式来提升生活质量和实现个人目标。 今天就来给大家拆解其中一个赛道—用AI做…

信息安全工程师下午题

试题一(共 20 分) 阅读下列说明和图,回答问题 1 至问题 5,将解答填入答题纸的对应栏内。【说明】已知某公司网络环境结构主要由三个部分组成,分别是 DMZ 区、内网办公区和生产区,其拓扑结构如图 1-1 所示。信息安全部的王工正在按…

【BES2500x系列 -- RTX5操作系统】系统执行流程 -- 引导程序(boot loader)--(十)

💌 所属专栏:【BES2500x系列】 😀 作  者:我是夜阑的狗🐶 🚀 个人简介:一个正在努力学技术的CV工程师,专注基础和实战分享 ,欢迎咨询! &#x1f49…

地球磁场的形成、变迁、特点

还是大剑师兰特:曾是美国某知名大学计算机专业研究生,现为航空航海领域高级前端工程师;CSDN知名博主,GIS领域优质创作者,深耕openlayers、leaflet、mapbox、cesium,canvas,webgl,ech…

Unity多客户端位置同步信息

书接上文,有了一个基本的网络同步消息的服务器,客户端这边其实要做的工作就简单许多。 如果对位置信息的保密程度没那么高的话,可以放在客户端处理这部分的逻辑。 即一个客户端移动的时候,另一个客户端跟着移动,基本…

【电控笔记-xuan】各种估测器扰动估计性能比较

各种扰动观测器观测结果 蓝色: 扰动值 隆博戈估测器扰动补偿 论文53disturb扰动补偿 2order eso 观测

LabVIEW学习-LabVIEW处理带分隔符的字符串从而获取数据

带分隔符的字符串很好处理,只需要使用"分隔符字符串至一维字符串数组"函数或者"一维字符串数组至分隔符字符串"函数就可以很轻松地处理带分隔符地字符串。 这两个函数所在的位置为: 函数选板->字符串->附加字符串函数->分…

APT 安装软件详细教程

文章目录 APT 安装软件详细教程APT 概述APT 的基本命令APT 命令详解安装软件包更新和升级软件包删除软件包搜索和查找软件包管理软件包依赖清理软件包缓存APT 配置软件源配置自定义软件源常见问题及解决方案解决软件包依赖问题处理软件源错误其他常见问题使用 APT 的最佳实践总…

在Postman中引用JS库

前言 在做接口测试时,出于安全因素,请求参数需要做加密或者加上签名才能正常请求,例如:根据填写的请求参数进行hash计算进行签名。postman作为主流的接口调试工具也是支持请求预处理的,即在请求前使用JavaScript脚本对…

昇思MindSpore学习入门-自动混合精度

混合精度(Mix Precision)训练是指在训练时,对神经网络不同的运算采用不同的数值精度的运算策略。在神经网络运算中,部分运算对数值精度不敏感,此时使用较低精度可以达到明显的加速效果(如conv、matmul等&am…

OSI七层模型详解

OSI七层模型 OSI(Open System Interconnect),即开放式系统互连。 一般都叫OSI参考模型,是ISO组织在1985年研究的网络互连模型。该体系结构标准定义了网络互连的七层框架(物理层、数据链路层、网络层、传输层、会话层、…

[玄机]流量特征分析-常见攻击事件 tomcat

[玄机]流量特征分析-常见攻击事件 tomcat 题目做法及思路解析(个人分享) Tomcat是一个开源的Java Servlet容器,它实现了Java Servlet和JavaServer Pages (JSP) 技术,提供了一个运行这些应用程序的Web服务器环境。Tomcat由A…

go程序在windows服务中优雅开启和关闭

本篇主要是讲述一个go程序,如何在windows服务中优雅开启和关闭,废话不多说,开搞!!!   使用方式:go程序 net服务启动 Ⅰ 开篇不利 Windows go进程编译后,为一个.exe文件,直接执行即…

语言转文字

因为工作原因需要将语音转化为文字,经常搜索终于找到一个免费的好用工具,记录下使用方法 安装Whisper 搜索Colaboratory 右上方链接服务 执行 !pip install githttps://github.com/openai/whisper.git !sudo apt update && sudo apt install f…

NSSRound#4 Team

[NSSRound#4 SWPU]1zweb 考察&#xff1a;phar的反序列化 1.打开环境&#xff0c;审计代码 1.非预期解 直接用file伪协议读取flag,或直接读取flag file:///flag /flag 2.正常解法 用读取文件读取index.php,upload.php的源码 index.php: <?php class LoveNss{publi…

hadoop学习(一)

一.hadoop概述 1.1hadoop优势 1&#xff09;高可靠性&#xff1a;Hadoop底层维护多个数据副本&#xff0c;即使Hadoop某个计算元素或存储出现故障&#xff0c;也不会导致数据的丢失。 2&#xff09;高扩展性&#xff1a;在集群间分配任务数据&#xff0c;可方便扩展数以千计…

c++ 内存管理(newdeletedelete[])

因为在c里面新增了类&#xff0c;所以我们在有时候会用malloc来创建类&#xff0c;但是这种创建只是单纯的开辟空间&#xff0c;没有什么默认构造的。同时free也是free的表面&#xff0c;如果类里面带有指针指向堆区的成员变量就会free不干净。 所以我们c增加了new delete和de…

Python --Pandas库基础方法(2)

文章目录 Pandas 变量类型的转换查看各列数据类型改变数据类型 重置索引删除行索引和切片seriesDataFrame取列按行列索引选择loc与iloc获取 isin()选择query()的使用排序用索引排序使用变量值排序 修改替换变量值对应数值的替换 数据分组基于拆分进行筛选 分组汇总引用自定义函…