命运交织的节点:分布式事务最终一致性的心跳共鸣纪实

关注微信公众号 “程序员小胖” 每日技术干货,第一时间送达!

引言

在当今云计算和微服务架构大行其道的时代,分布式系统成为了构建高可用、高性能应用的基石。然而,随着系统规模的扩张,数据的一致性问题如同幽灵般萦绕在每位架构师心头,尤其是分布式事务处理中的挑战更是首当其冲。今天,让我们一起深入探索分布式事务模型中的“最终一致性”,揭开它那既神秘又强大的面纱。

分布式事务的挑战与背景

想象一下双十一购物节,数百万用户同时下单,订单系统、库存系统、支付系统等多个服务间需要协同完成交易。采用最终一致性模型,即使瞬间请求激增导致部分操作延迟,系统也能确保在合理的时间框架内调整库存、确认订单状态,从而维持整体业务流程的顺畅。

最终一致性?

在分布式系统中,最终一致性是一种事务模型,它保证系统中的所有数据副本最终会达到一致的状态,但不保证立即的一致性。这种模型允许在数据复制过程中存在短暂的不一致状态,但随着时间的推移,系统会通过各种机制确保数据最终达到一致。

实现策略

补偿事务(TCC)

TCC,即Try-Confirm-Cancel,是一种通过预先定义的确认和取消操作来保证事务最终一致性的模式。

**Try 阶段:**调用 Try 接口,尝试执行业务,完成所有业务检查,预留业务资源。
Confirm 或 Cancel 阶段:两者是互斥的,只能进入其中一个,并且都满足幂等性,允许失败重试。

**Confirm 操作:**对业务系统做确认提交,确认执行业务操作,不做其他业务检查,只使用 Try 阶段预留的业务资
源。

**Cancel 操作:**在业务执行错误,需要回滚的状态下执行业务取消,释放预留资源。

转账场景示例:


//Account类代表一个账户,拥有冻结、解冻、存款和取款的方法。Money类代表金额。
public class AccountService {// Try阶段:检查账户余额并冻结资金public boolean prepareTransfer(Account source, Account target, Money amount) {if (source.getBalance() < amount) {return false; // 余额不足}source.freeze(amount); // 冻结资金return true;}// Confirm阶段:实际转账操作public void confirmTransfer(Account source, Account target, Money amount) {source.withdraw(amount); // 从源账户扣除金额target.deposit(amount); // 向目标账户增加金额}// Cancel阶段:回滚操作,解冻资金public void cancelTransfer(Account source, Account target, Money amount) {source.unfreeze(amount); // 解冻资金}
}

注意事项

**幂等性:**确保Try、Confirm和Cancel操作都是幂等的,以支持重复执行而不会引起副作用。

**空回滚:**系统应能够处理“空回滚”的情况,即Cancel操作被调用,但Try操作并未实际执行。

**防悬挂:**确保系统能够处理悬挂操作,即Try操作在网络延迟后到达,而Cancel操作已经执行。

本地消息表

本地消息表的方案最初是由 ebay 的工程师提出,核心思想是将分布式事务拆分成本地事务进行处理,通过消息日志的方式来异步执行。本地消息表是一种业务耦合的设计,消息生产方需要额外建一个事务消息表,并记录消息发送状态,消息消费方需要处理这个消息,并完成自己的业务逻辑,另外会有一个异步机制来定期扫描未完成的消息,确保最终一致性。

实战代码示例:

  1. 系统收到下单请求,将订单业务数据存入到订单库中,并且同时存储该订单对应的消息数据,比如购买商品的 ID 和数量,消息数据与订单库为同一库,更新订单和存储消息为一个本地事务,要么都成功,要么都失败。
   @Servicepublic class OrderService {@Resourceprivate OrderMapper orderMapper;@Resourceprivate OrderMessageMapper orderMessageMapper;@Autowiredprivate MessageProducer messageProducer; // 消息队列的发送器@Transactionalpublic void placeOrder(Order order, OrderMessage orderMessage) {// 将订单业务数据存入到订单库中int orderRows = orderMapper.insert(order);// 同时存储该订单对应的消息数据int messageRows = orderMessageMapper.insert(orderMessage);// 确保订单和消息数据都成功插入if (orderRows == 1 && messageRows == 1) {// 发送库存更新消息到消息队列messageProducer.sendMessage(orderMessage);} else {// 如果任何插入失败,抛出异常以回滚事务throw new RuntimeException("Order or message data insertion failed");}}}
  1. 库存服务更新
    @Autowiredprivate InventoryDomainService inventoryDomainService;@Overridepublic boolean consume(MqMessageEntity<OrderMessage> mqMessageEntity) {log.info("接收订单支付完成请求,扣件库存:{}", JSON.toJSONString(mqMessageEntity));return inventoryDomainService.deductionInventory(mqMessageEntity);}
  1. 订单服务更新本地消息表
        @Autowiredprivate OrderMessageMapper orderMessageMapper;public void sendMessage(OrderMessage orderMessage) {// 向消息队列发送库存更新消息// 消息发送成功的回调中更新本地消息表状态orderMessageMapper.upodateMessageStatus(orderMessage);}
  1. 异步任务重试机制
    使用Spring的@Scheduled注解来定时触发异步任务。这个地方用任何调度计划都可以实现 我用的是spring自带的@Scheduled注解实现的。异步技术也可以根据自己的情况选择。

@Component
public class MessageRetryScheduler {@Autowiredprivate MessageProducer messageProducer;@Scheduled(fixedRate = 60000) // 每60秒执行一次public void scheduleMessageRetry() {messageProducer.scanAndRetryUnsentMessages();}
}@Autowiredprivate OrderMessageMapper orderMessageMapper;@Asyncpublic void scanAndRetryUnsentMessages() {List<OrderMessageDO> unsentMessages = orderMessageMapper.queryByStatus("PENDING");for (OrderMessage message : unsentMessages) {try {sendMessage(message); // 重试发送消息orderMessageMapper.updateStatus(message);} catch (Exception e) {// 可以选择更新状态为错误或其他逻辑orderMessageMapper.updateStatus(message);}}}

RocketMQ 事务消息

RocketMQ 事务消息是一种支持分布式事务的消息。它通过引入 prepare、commit 和 rollback 三个阶段,来确保事务消息的一致性。

**prepare 阶段:**消息发送方发送半消息,此时消息的状态为“待提交”。

**commit 阶段:**消息发送方向 RocketMQ 发送 commit 消息请求,RocketMQ 判断此时半消息是否被确认,如果半消息已被确认,则将消息标记为“可消费”并提交事务。如果半消息未被确认,则将消息标记为“不可消费”并终止事务。

**rollback 阶段:**消息发送方向 RocketMQ 发送 rollback 消息请求,RocketMQ 将半消息标记为“不可消费”并回滚
事务。


代码示例:

创建并初始化一个事务消息生产者:

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class TransactionProducer {public static void main(String[] args) throws Exception {// 实例化消息生产者,并指定NameServer地址TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");producer.setNamesrvAddr("localhost:9876");// 指定事务监听器producer.setTransactionListener(new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {// 执行本地事务逻辑,例如数据库操作// 假设执行成功,返回Commit状态return LocalTransactionState.CommitMessage;}@Overridepublic LocalTransactionState checkLocalTransaction(Message msg) {// 检查本地事务状态,确认是否需要提交或回滚// 这里可以根据业务逻辑来实现检查// 假设检查通过,返回Unknown状态,让消息服务决定是提交还是回滚return LocalTransactionState.Unknown;}});// 启动生产者producer.start();// 创建消息Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());// 发送事务消息SendResult sendResult = producer.sendMessageInTransaction(msg, null);System.out.printf("%s%n", sendResult);// 关闭生产者producer.shutdown();}
}

在代码示例中我们实现了TransactionListener接口的两个方法:

**executeLocalTransaction:**执行本地事务逻辑,返回事务状态。如果本地事务执行成功,返回CommitMessage;如果执行失败,返回RollbackMessage。

**checkLocalTransaction:**检查本地事务状态。如果事务状态未知,返回Unknown,让消息队列服务决定是提交还是回滚消息。

最大努力通知

最大努力通知型( Best-effort delivery)是最简单的一种柔性事务,适用于一些最终一致性时间敏感度低
的业务,且被动方处理结果 不影响主动方的处理结果。典型的使用场景:如银行通知、商户通知等。
最大努力通知型的实现方案,一般符合以下特点:

  1. 不可靠消息:业务活动主动方,在完成业务处理之后,向业务活动的被动方发送消息,直到通知N次后不再通知,允许消息丢失(不可靠消息)。
  2. 定期校对:业务活动的被动方,根据定时策略,向业务活动主动方查询(主动方提供查询接口),恢复丢失的业务消息

代码示例:

发送通知

@Service
public class NotificationService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendNotification(String message) {// 发送通知消息到MQrabbitTemplate.convertAndSend("notificationExchange", "notificationRoutingKey", message);}
}

监听消息并处理

@Component
public class NotificationListener {@RabbitListener(queues = "notificationQueue")public void handleNotification(String message) {// 处理消息,例如更新库存processNotification(message);// 确认消息已处理acknowledgeMessage();}private void processNotification(String message) {// 业务逻辑处理}private void acknowledgeMessage() {// 向发送方确认消息已处理的逻辑}
}

重试机制通常由消息中间件提供,如RabbitMQ的死信队列和重试策略。校对机制可能需要额外的接口和逻辑来实现。

结语

最终一致性作为分布式系统中一种重要的事务处理哲学,它在实践中展现出了强大的生命力。然而,没有银弹存在,每种模型都有其适用场景与局限。作为技术探索者,我们应当持续思考如何更精细地控制一致性级别,结合业务特性量体裁衣,设计出既能满足业务需求又能保持系统弹性的解决方案。那么,您在实际项目中遇到过哪些分布式事务的挑战?对于最终一致性模型又有何独到见解或疑问呢?欢迎留言讨论,共同推进技术的边界。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/322845.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

mib browser读取mib文件的oid(飞塔防火墙为例)

在配置zabbix监控的时候,配置监控项最为麻烦,一般我们都会套用模板,这种方式比较简单,但是有些设备就是没有现成的zabbix模板,怎么办? 今天我们使用MIB Browser来获取设备SNMP的OID,然后加入zabbix 。 1.什么是MIB Browser SNMP客户端工具MIB Browser, 全名iReasonin…

华为手机 鸿蒙系统-android studio识别调试设备,开启adb调试权限

1.进入设置-关于手机-版本号&#xff0c;连续点击7次 认证&#xff1a;有锁屏密码需要输入密码&#xff0c; 开启开发者配置功能ok 进入开发者配置界面 打开调试功能 重新在androd studio查看可运行running devices显示了&#xff0c; 不行的话&#xff0c;重启一下android …

照片生成ai漫改头像生成漫画全套教程免费(自取)

今天给大家分享一一个AI漫改头像&#xff0c;轻松日增1000&#xff0c;简单操作好上手的一个互联网新项目&#xff0c;哈那其实AI漫改头像也火了差不多有半年左右了&#xff0c; 那其实利用AI软件将真人的照片生成漫画的形象&#xff0c;这个看起来很简单的方法却在小红书上大…

Vue入门到关门之Vue3学习

一、常用API 注意&#xff1a;本文项目均使用脚手架为 Vite 1、setup函数 &#xff08;1&#xff09;介绍 如果在项目中使用配置项API&#xff0c;那么写起来就和vue2的写法是一样的&#xff1b;但是如果在项目中写的是组合式API&#xff0c;那么组件中所用到的&#xff1a…

《架构即未来》读后感

目录 一、引言 二、《架构即未来》读后感 1、主题的简要介绍 2、我的看法和理解 3、作者的优点和传递的信息 4、思想如何适用于当今社会 三、《架构即未来》对于企业发展的影响具体体现在哪些方面&#xff1f; 一、引言 任何一个持续成长的公司最终都需要解决系统、组织…

关于GitHub仓库建立及提交问题

文章目录 前言GitHub仓库创建token令牌的获取GitHub克隆到本地GitHub上传文件 前言 为了整一个GitHub仓库然后上传文件&#xff0c;笔者看了不下100篇博客&#xff0c;20段教程&#xff0c;最后在两位大佬的帮助下&#xff0c;才整明白了&#x1f62d; 先提前说一嘴从 2021年8月…

python实现txt文件内容对比功能

欢迎关注我👆,收藏下次不迷路┗|`O′|┛ 嗷~~ 目录 一.前言 二.代码 三.演示 四.代码分析 一.前言 内容对比是一种常见的信息分析和研究方法,主要涉及对不同来源、类型或版本的内容进行比

【JAVA】类加载过程,以及类加载器

类加载过程&#xff0c;以及类加载器 一、类加载的过程二、类加载器介绍三、跨类加载三、举例说明 一、类加载的过程 类加载是Java虚拟机&#xff08;JVM&#xff09;将类文件加载到内存中并转换成对应的类对象的过程。它确保了类文件能够正确加载并转换成可执行的类对象&…

UE4_Water插件_Buoyancy组件使用

water插件提供了一个浮力Actor蓝图类。 需要注意的几个问题: 1、StaticMesh需要替换根组件。 2、需要模拟物理设置质量。 3、需要添加浮力组件,设置浮力点,应用水中牵引力。

Unity Animation--动画剪辑

Unity Animation--动画剪辑 动画剪辑 动画剪辑是Unity动画系统的核心元素之一。Unity支持从外部来源导入动画&#xff0c;并提供创建动画剪辑的能力使用“动画”窗口在编辑器中从头开始。 外部来源的动画 从外部来源导入的动画剪辑可能包括&#xff1a; 人形动画 运动捕捉…

【Linux】Linux——Centos7安装RabbitMQ

目录 安装包准备socaterlang 安装rabbitmq安装命令启动rabbitmq&#xff0c;两种方式查看rabbitmq 启动后的情况配置并开启网页插件关闭防火墙或开放端口测试登录问题配置web端访问账号密码和权限添加用户&#xff0c;后面两个参数分别是用户名和密码.添加权限修改用户角色再次…

24_Scala集合Map

文章目录 Scala集合Map1.构建Map2.增删改查3.Map的get操作细节 Scala集合Map –默认immutable –概念和Java一致 1.构建Map –创建kv键值对 && kv键值对的表达 –创建immutable map –创建mutable map //1.1 构建一个kv键值对 val kv "a" -> 1 print…

十四五”智慧城市:视频大数据汇聚系统2.0建设方案与特点分析

一、背景需求分析 随着科技的不断发展&#xff0c;智慧城市的建设已经成为城市发展的重要方向。视频汇聚系统作为智慧城市建设的重要组成部分&#xff0c;已经得到了广泛的应用和推广。视频汇聚系统是智慧城市中非常重要的组成部分&#xff0c;它利用摄像头和传感器技术来收集…

压缩机继电器EOCRDS-30NY7Q升级后型号:EOCRDS3-30S

EOCR-DS3系列型号&#xff1a; EOCRDS3-05S EOCRDS-05S EOCRDS1-05S EOCRDS3-30S EOCRDS-30S EOCRDS1-30S EOCRDS3-60S EOCRDS-60S EOCRDS1-60S EOCRDS3-05W EOCRDS-05W EOCRDS1-05W EOCRDS3-30W EOCRDS-30W EOCRDS1-30W EOCRDS3-60W EOCRDS-60W EOCRDS1-60W EOCR-DS3T-…

分享10个高质量宝藏网站~

分享一波高质量宝藏网站~ 这10个宝藏网站&#xff0c;个个都好用到爆&#xff0c;娱乐、办公、学习都能在这里找到&#xff01; 1、Z-Library https://zh.zlibrary-be.se/ 世界最大的免费电子书下载网站&#xff01;电子书资源超千万&#xff0c;不过这个网站不太稳定&#…

学习torchmd分子动力学模拟

TorchMD打算提供一种简单易用的API&#xff0c;用于使用PyTorch进行分子动力学。这使研究人员能够更快地进行力场开发研究&#xff0c;并以PyTorch的简单性和强大性将神经网络潜力无缝集成到动力学中。 TorchMD使用与经典MD代码&#xff08;如ACEMD&#xff09;一致的化学单位&…

数据结构——二叉排序树

懒猫老师-数据结构-(58)二叉排序树的删除(二叉查找树)_哔哩哔哩_bilibili 概念 (1)若它的左子树不空&#xff0c;则左子树上所有结点的值均小于根结点的值; (2)若它的右子树不空&#xff0c;则右子树上所有结点的值均大于根结点的值; (3)它的左右子树也都是二叉排序树。 通…

SMB 协议详解之-TreeID原理和SMB数据包分析技巧

在前面分析SMB协议数据包的过程中,这里,可以看到在SMB协议中存在很多的ID,即Unique Identifiers。那么这些ID表示什么含义?在实际分析数据包的过程中如何根据这些ID进行过滤分析?本文将介绍SMB/SMB2中的tree id ,并介绍如何通过tree id 快速的分析SMB数据包中各种命令交互…

配置完串口,用了printf函数,LCD屏不显示了

配置完串口后&#xff0c;记得打开微库&#xff01; /* USER CODE BEGIN PFP */u8 rx[2];u8 rx_buff[10];int fputc(int c, FILE* s){u8 ch[1] {c};HAL_UART_Transmit(&huart1,ch, 1, 0xfff);return c; } /* USER CODE END PFP */

数据结构-链表练习(面试题)

1&#xff0c;翻转一个单链表 建立变量cur指向第二个节点&#xff0c;curN指向cur.next&#xff0c;将第二个节点的next改为head&#xff0c;headcur这样实现&#xff0c;前两个节点顺序的翻转&#xff0c;第二个节点指向了第一个节点&#xff0c;之后cur向后移&#xff08;cu…