分布式系统事务一致性解决方案(基于事务消息)

参考:https://rocketmq.apache.org/zh/docs/featureBehavior/04transactionmessage/

文章目录

    • 概要
    • 错误的方案
    • 方案一:业务方自己实现
    • 方案二:RocketMQ 事务消息
      • 什么是事务消息
      • 事务消息处理流程
      • 事务消息生命周期
      • 使用限制
      • 使用示例
      • 使用建议


概要

说起分布式事务,就会谈到 Bob 给 Smith 账户转账的问题:2 个账号, 分别处于 A、B 两个 DB,或者说 2 个不同的子系统中,Bob 的账户要扣钱,Smith 的账户要加钱,如何保证原子性?

一般思路是通过消息中间件来实现 “最终一致性”:A系统扣钱,然后发送消息给 MQ,B系统接收此消息,进行加钱。

但这里有个问题:是先update DB后发消息?还是先发消息后update DB呢?

  • 假设先update DB成功发送消息时网络问题,重发又失败,怎么办?
  • 假设先发送消息成功,update DB失败。消息已经发出去了,又不能撤回,怎么办?

所以,只要发送消息和update DB这2个操作不是原子的,无论谁先谁后,都会有问题。

错误的方案

有人可能会想到,可以把 “发送消息” 这个网络调用和 update DB 放在同一个事务中,如果发送消息失败,update DB 自动回滚。这样不就保证原子性了吗?

这个方案看似正确,但其实是错误的,原因:

  1. 把网络调用放在 DB 事务中,可能会因为网络的延时,导致 DB 长事务。严重的还会 block 整个 DB,这个风险很大。
  2. 如果发送消息失败,发送方并不知道是 MQ 没收到消息,还是已收到消息只是返回 response 的时候失败了。如果接收方已经收到消息了,而发送方认为没有收到,执行 update DB 回滚的操作,会导致 A 账户钱没有扣,B 账户的钱却增加了。

方案一:业务方自己实现

假设消息中间件没提供 “事务消息” 功能,比如 Kafka。那该如何解决这个问题?

结局方案如下:

  1. Producer 准备 1 个消息表,把 update DB 和 send message 这 2 个操作,放在一个DB事务中。
  2. 准备一个后台程序,不停地把消息表中的 message 传送给消息中间件。发送失败则不断重试,直到成功。允许消息重复,但消息不会丢。
  3. Consumer 准备 1 个判重表。处理过的消息记录在里面,实现业务的幂等性

通过以上 3 步,就基本解决了原子性问题

但此方案缺点是:需要设计 DB 消息表,同时还需要 1 个后台任务,不断扫描本地消息。导致消息的处理和业务逻辑耦合,额外增加业务方的负担。

方案二:RocketMQ 事务消息

目前各大知名电商平台和互联网公司,几乎都是使用 “事务消息” 这种方案来实现 “最终一致性” 的。这种方式适用的业务场景广泛,且比较靠谱。

什么是事务消息

事务消息是 Apache RocketMQ 提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。

事务消息处理流程

事务消息交互流程如下图所示。

在这里插入图片描述

  1. 生产者将消息发送至Apache RocketMQ服务端。

  2. Apache RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。

  3. 生产者开始执行本地事务逻辑。

  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:

    • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。

    • 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。

  5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查(服务端回查的间隔时间和最大回查次数,请参见参数限制)。

  6. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

  7. 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

事务消息生命周期

在这里插入图片描述

  • 初始化:半事务消息被生产者构建并完成初始化,待发送到服务端的状态。

  • 事务待提交:半事务消息被发送到服务端,和普通消息不同,并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见。

  • 消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止。

  • 提交待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见,等待被消费者获取并消费。

  • 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ会对消息进行重试处理。具体信息,请参见消费重试。

  • 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。 Apache RocketMQ默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。

  • 消息删除:Apache RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。更多信息,请参见消息存储和清理机制。

使用限制

  1. 消息类型一致性
    事务消息仅支持在 MessageType 为 Transaction 的主题内使用,即事务消息只能发送至类型为事务消息的主题中,发送的消息的类型必须和主题的类型一致。

  2. 消费事务性
    Apache RocketMQ 事务消息保证本地主分支事务和下游消息发送事务的一致性,但不保证消息消费结果和上游事务的一致性。因此需要下游业务分支自行保证消息正确处理,建议消费端做好消费重试,如果有短暂失败可以利用重试机制保证最终处理成功。

  3. 中间状态可见性
    Apache RocketMQ 事务消息为最终一致性,即在消息提交到下游消费端处理完成之前,下游分支和上游事务之间的状态会不一致。因此,事务消息仅适合接受异步执行的事务场景。

  4. 事务超时机制
    Apache RocketMQ 事务消息的生命周期存在超时机制,即半事务消息被生产者发送服务端后,如果在指定时间内服务端无法确认提交或者回滚状态,则消息默认会被回滚。事务超时时间,请参见参数限制。

使用示例

创建主题

Apache RocketMQ 5.0版本下创建主题操作,推荐使用mqadmin工具,需要注意的是,对于消息类型需要通过属性参数添加。示例如下:

sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=TRANSACTION

发送消息

事务消息相比普通消息发送时需要修改以下几点:

  • 发送事务消息前,需要开启事务并关联本地的事务执行。

  • 为保证事务一致性,在构建生产者时,必须设置事务检查器和预绑定事务消息发送的主题列表,客户端内置的事务检查器会对绑定的事务主题做异常状态恢复。

创建事务主题

NORMAL类型Topic不支持TRANSACTION类型消息,生产消息会报错。

./bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster -a +message.type=TRANSACTION

  • -c 集群名称
  • -t Topic名称
  • -n nameserver地址
  • -a 额外属性,本例给主题添加了message.type为TRANSACTION的属性用来支持事务消息

以Java语言为例,使用事务消息示例参考如下:

    //演示demo,模拟订单表查询服务,用来确认订单事务是否提交成功。private static boolean checkOrderById(String orderId) {return true;}//演示demo,模拟本地事务的执行结果。private static boolean doLocalTransaction() {return true;}public static void main(String[] args) throws ClientException {ClientServiceProvider provider = new ClientServiceProvider();MessageBuilder messageBuilder = new MessageBuilderImpl();//构造事务生产者:事务消息需要生产者构建一个事务检查器,用于检查确认异常半事务的中间状态。Producer producer = provider.newProducerBuilder().setTransactionChecker(messageView -> {/*** 事务检查器一般是根据业务的ID去检查本地事务是否正确提交还是回滚,此处以订单ID属性为例。* 在订单表找到了这个订单,说明本地事务插入订单的操作已经正确提交;如果订单表没有订单,说明本地事务已经回滚。*/final String orderId = messageView.getProperties().get("OrderId");if (Strings.isNullOrEmpty(orderId)) {// 错误的消息,直接返回Rollback。return TransactionResolution.ROLLBACK;}return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;}).build();//开启事务分支。final Transaction transaction;try {transaction = producer.beginTransaction();} catch (ClientException e) {e.printStackTrace();//事务分支开启失败,直接退出。return;}Message message = messageBuilder.setTopic("topic")//设置消息索引键,可根据关键字精确查找某条消息。.setKeys("messageKey")//设置消息Tag,用于消费端根据指定Tag过滤消息。.setTag("messageTag")//一般事务消息都会设置一个本地事务关联的唯一ID,用来做本地事务回查的校验。.addProperty("OrderId", "xxx")//消息体。.setBody("messageBody".getBytes()).build();//发送半事务消息final SendReceipt sendReceipt;try {sendReceipt = producer.send(message, transaction);} catch (ClientException e) {//半事务消息发送失败,事务可以直接退出并回滚。return;}/*** 执行本地事务,并确定本地事务结果。* 1. 如果本地事务提交成功,则提交消息事务。* 2. 如果本地事务提交失败,则回滚消息事务。* 3. 如果本地事务未知异常,则不处理,等待事务消息回查。*/boolean localTransactionOk = doLocalTransaction();if (localTransactionOk) {try {transaction.commit();} catch (ClientException e) {// 业务可以自身对实时性的要求选择是否重试,如果放弃重试,可以依赖事务消息回查机制进行事务状态的提交。e.printStackTrace();}} else {try {transaction.rollback();} catch (ClientException e) {// 建议记录异常信息,回滚异常时可以无需重试,依赖事务消息回查机制进行事务状态的提交。e.printStackTrace();}}}

使用建议

  1. 避免大量未决事务导致超时

    Apache RocketMQ支持在事务提交阶段异常的情况下发起事务回查,保证事务一致性。但生产者应该尽量避免本地事务返回未知结果。大量的事务检查会导致系统性能受损,容易导致事务处理延迟。

  2. 正确处理"进行中"的事务

    消息回查时,对于正在进行中的事务不要返回Rollback或Commit结果,应继续保持Unknown的状态。 一般出现消息回查时事务正在处理的原因为:事务执行较慢,消息回查太快。解决方案如下:

    • 将第一次事务回查时间设置较大一些,但可能导致依赖回查的事务提交延迟较大。

    • 程序能正确识别正在进行中的事务。


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/315844.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

单纯形投影算法

目录 一&#xff0c;任意点到平移坐标轴面的投影 1&#xff0c;求解目标 2&#xff0c;转换变量 3&#xff0c;求解结果 4&#xff0c;f(t)的导数 5&#xff0c;f(t)的最小值 二&#xff0c;任意点到标准单纯形的投影 1&#xff0c;求解目标 2&#xff0c;公式变形 3…

制作一个RISC-V的操作系统十四-任务同步和锁

文章目录 并发与同步临界区和锁锁死锁解决死锁自旋锁&#xff08;spin lock&#xff09;原子性问题原子操作实现amoswap.w.aq例子 另一种方法自旋锁的注意事项代码其他同步技术 并发与同步 控制流&#xff1a;可理解为任务或进程 中断也可以理解为一个切换到另一个任务&#…

LeetCode 226.翻转二叉树

题目描述 给你一棵二叉树的根节点 root &#xff0c;翻转这棵二叉树&#xff0c;并返回其根节点。 示例 1&#xff1a; 输入&#xff1a;root [4,2,7,1,3,6,9] 输出&#xff1a;[4,7,2,9,6,3,1]示例 2&#xff1a; 输入&#xff1a;root [2,1,3] 输出&#xff1a;[2,3,1]示例…

CMake+qt+Visual Studio

#使用qt Creator 创建Cmake 项目,使用Cmake Gui 生成sln 工程&#xff0c;使用Visual Studio 开发 ##使用qt Creator 创建CMake项目 和创建pro工程的步骤一致&#xff0c;只是在选择构建系统的步骤上选择CMake,接下来步骤完全相同 工程新建完成之后&#xff0c;构建cmake 项…

想冲宇宙厂,直接挂了。。。

宇宙厂实际是字节&#xff0c;这个称呼是因为字节跳动主宰了宇宙内一切App&#xff0c;有点家大业大的意思。 今天分享一位字节春招凉经&#xff0c;问了一些数据库和Java八股&#xff0c;没出算法题&#xff0c;直接挂了&#xff0c;竟然最喜欢出算法题的字节&#xff0c;这次…

刷代码随想录有感(49):找树左下角的值

题干&#xff1a; 用层序遍历方便些&#xff0c;因为只需要把res不断替换成每一层第一个节点值即可&#xff0c;代码如下&#xff1a; class Solution { public:int findBottomLeftValue(TreeNode* root) {queue<TreeNode*>que;if(root ! NULL)que.push(root);int res …

C++中的数据结构与算法

随处可见的红黑树 一般会用到[key,value]。 例如github中这个例子&#xff0c;第一个是访问网站&#xff0c;第二个是访问次数&#xff0c;但是这个不是静态的&#xff0c;这有个动态排序&#xff0c;并且当我们需要让相应的访问次数加1的时候&#xff0c;我们用红黑树查找的时…

C++每日一练——杨辉三角

给定一个非负整数 numRows&#xff0c;生成「杨辉三角」的前 numRows 行。 在「杨辉三角」中&#xff0c;每个数是它左上方和右上方的数的和。 示例 1: 输入: numRows 5 输出: [[1],[1,1],[1,2,1],[1,3,3,1],[1,4,6,4,1]]示例 2: 输入: numRows 1 输出: [[1]]提示: 1 <…

Jsoncpp搭建交叉编译环境(移植到arm)

1. 官网下载源码 github地址&#xff1a;GitHub - open-source-parsers/jsoncpp at update 2. 交叉编译环境 当前平台/开发平台-编译环境&#xff1a; [rootlocalroot ~]# cat /etc/redhat-release CentOS Linux release 7.9.2009 (Core) [rootlocalroot ~]# uname -a Lin…

ELK创建仪表盘

创建仪表盘步骤&#xff1a; 一、保存search二、生成饼图三、创建仪表盘 一、保存search 首先保存一段时间内的search&#xff0c;可以添加想要的字段&#xff0c;并保存这个search方便下次直接打开该search&#xff0c;并方便在可视化和仪表盘中使用该search. 二、生成饼图…

Mac安装telnet

一、安装Homebrew 1、打开官网&#xff1a;Homebrew — The Missing Package Manager for macOS (or Linux) 2、打开终端输入&#xff1a; /usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)" 二、安装Telnet bre…

数字文旅重塑旅游发展新格局:以数字化转型为突破口,提升旅游服务的智能化水平,为游客带来全新的旅游体验

随着信息技术的迅猛发展&#xff0c;数字化已成为推动各行各业创新发展的重要力量。在旅游业领域&#xff0c;数字文旅的兴起正以其强大的驱动力&#xff0c;重塑旅游发展的新格局。数字文旅以数字化转型为突破口&#xff0c;通过提升旅游服务的智能化水平&#xff0c;为游客带…

急急急!微信朋友圈删除了怎么恢复?

微信朋友圈是我们与朋友分享生活点滴的重要平台&#xff0c;但有时候微信出现异常&#xff0c;导致我们编辑好的朋友圈被删除了&#xff0c;这时候该怎么办呢&#xff1f; 幸运的是&#xff0c;微信提供了一种简单的方式来恢复已删除的朋友圈内容。微信朋友圈删除了怎么恢复&a…

四信数字孪生水库解决方案,加快构建现代化水库运行管理矩阵

近年&#xff0c;水利部先后出台《关于加快构建现代化水库运行管理矩阵的指导意见》与《构建现代化水库运行管理矩阵先行先试工作方案》等文件&#xff0c;明确总体要求及试点水库、先行区域建设技术要求等&#xff0c;为全面推进现代化水库运行管理矩阵建设工作提供依据。 《2…

【Elasticsearch<一>✈️✈️】简单安装使用以及各种踩坑

目录 &#x1f378;前言 &#x1f37b;一、软件安装&#xff08;Windows版&#xff09; 1.1、Elasticsearch 下载 2.1 安装浏览器插件 3.1、安装可视化工具 Kibana 4.1、集成 IK 分词器 &#x1f37a;二、安装问题 &#x1f379;三、测试 IK 分词器 ​&#x1f377; 四、章…

spi接口的基本概念、引脚定义及注意事项

目录 基本概念 引脚定义 注意事项 SPI&#xff08;Serial Peripheral Interface&#xff09;是一种同步串行接口技术&#xff0c;广泛应用于微控制器和各种外围设备之间的短距离通信。 基本概念 SPI接口允许微控制器以串行方式与一个或多个外围设备进行通信。它是一种高速、…

统信UOS下如何配置全局搜索

原文链接&#xff1a;统信UOS下如何配置全局搜索 Hello&#xff0c;大家好啊&#xff01;全局搜索是操作系统中非常实用的一个功能&#xff0c;它可以帮助我们快速定位文件、程序和其他资源。今天&#xff0c;我要给大家介绍如何在统信UOS桌面操作系统上配置全局搜索&#xff0…

IT廉连看——UniApp——样式绑定

IT廉连看——UniApp——样式绑定 一、样式绑定 两种添加样式的方法&#xff1a; 1、第一种写法 写一个class属性&#xff0c;然后将css样式写在style中。 2、第二种写法 直接把style写在class后面 添加一些效果&#xff1a;字体大小 查看效果 证明这样添加样式是没有问题的…

基于阶梯碳交易的含P2G-CCS耦合和燃气掺氢的虚拟电厂优化调度(matlab代码)

目录 1 主要内容 系统结构图 P2G-CCS 耦合模型 系统掺氢分析 其他算例对比 2 部分代码 3 下载链接 1 主要内容 该程序复现《基于阶梯碳交易的含P2G-CCS耦合和燃气掺氢的虚拟电厂优化调度》模型&#xff0c;以碳交易和碳封存成本、燃煤机组启停和煤耗成本、弃风成本、购…

数据结构-二叉搜索树(BST)

目录 什么是二叉搜索树 二叉搜索树的特性 (1)顺序性 (2)局限性 二叉搜索树的应用 二叉搜索树的操作 (1)查找节点 (2)插入节点 (3)删除节点 (4)中序遍历 什么是二叉搜索树 如图所示&#xff0c;二叉搜索树&#xff08;binary search tree&#xff09;满足以下条件。…