基于本地事务表+MQ实现分布式事务

基于本地事务表+MQ实现分布式事务

  • 引言
    • 1、原理
    • 2、本地消息表优缺点
    • 3、本地启动rocketmq
    • 4、代码实现及验证
      • 4.1、核心代码
      • 4.2、代码执行流程
      • 4.3、项目结构
      • 4.4、项目源码

引言

本地消息表的方案最初由ebay的工程师提出,核心思想是将分布式事务拆分成本地事务进行处理。本地消息表实现最终一致性。本文主要学习mallchat开源项目中的实现方案,向mallchat开发团队致敬!

1、原理

分布式事务解决方案有许多比如二阶段提交、TCC、最大努力通知、Saga事务等,本文介绍本地消息表+MQ这种方式解决分布式事务消息最总一致性问题。目前利用本地消息表+MQ方案实现最终消息一致性的比较多,它的核心思想是,将分布式事务拆分成本地事务进行处理,不同事务之间通过消息表和MQ通信,最后通过定时任务扫描失败的数据进行重试,当在有效重试次数限制内,再次重试回调失败的数据,最终实现消息重复发送,达到一致性。
在这里插入图片描述
在这里插入图片描述

2、本地消息表优缺点

本地消息表实现了分布式事务的最终一致性,优缺点比较明显。
优点
1.实现逻辑简单,开发成本比较低
缺点
1.与业务场景绑定,高耦合,不可公用
2.本地消息表与业务数据表在同一个库,占用业务系统资源,量大可能会影响数据库性能

3、本地启动rocketmq

首先本地启动rocketmq服务。

1:在D:\rocketmq-all-5.3.1-bin-release\bin目录下,执行cmd命令
启动服务
start mqnamesrv.cmd
启动broker
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
关闭服务
mqshutdown namesrv
关闭broker
mqshutdown broker
2:启动rocketmq-dashboard项目,输入127.0.0.1:8080进入可视化界面

其中rocketmq-dashboard可视化代码参考windows下安装启动rocketmq可视化界面
将rocketmq-dashboard导入到idea中,在idea编译启动。
在这里插入图片描述

4、代码实现及验证

4.1、核心代码

(1)注解SecureInvoke

/*** 保证方法成功执行。如果在事务内的方法,会将操作记录入库,保证执行。** @author hauhua* @DATE 2025/1/21*/
@Retention(RetentionPolicy.RUNTIME)//运行时生效
@Target(ElementType.METHOD)//作用在方法上
public @interface SecureInvoke {/*** 默认3次** @return 最大重试次数(包括第一次正常执行)*/int maxRetryTimes() default 3;/*** 默认异步执行,先入库,后续异步执行,不影响主线程快速返回结果,毕竟失败了有重试,而且主线程的事务已经提交了,串行执行没啥意义。* 同步执行适合mq消费场景等对耗时不关心,但是希望链路追踪不被异步影响的场景。** @return 是否异步执行*/boolean async() default true;
}

(2)自定义切面SecureInvokeAspect

/*** 安全执行切面** @author hauhua* @DATE 2025/1/21*/
@Slf4j
@Aspect
@Order(Ordered.HIGHEST_PRECEDENCE + 1)//确保最先执行
@Component
public class SecureInvokeAspect {@Autowiredprivate SecureInvokeService secureInvokeService;@Around("@annotation(secureInvoke)")public Object around(ProceedingJoinPoint joinPoint, SecureInvoke secureInvoke) throws Throwable {boolean async = secureInvoke.async();boolean inTransaction = TransactionSynchronizationManager.isActualTransactionActive();// 非事务状态,直接执行,不做任何保证。if (!inTransaction) {return joinPoint.proceed();}// 如果是事务状态,保证保存到本地消息表中的数据和保存在数据库中数据,在同一个事务内Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();List<String> parameters = Stream.of(method.getParameterTypes()).map(Class::getName).collect(Collectors.toList());SecureInvokeVo dto = SecureInvokeVo.builder().args(JsonUtils.toStr(joinPoint.getArgs())).className(method.getDeclaringClass().getName()).methodName(method.getName()).parameterTypes(JsonUtils.toStr(parameters)).build();SecureInvokeRecord record = SecureInvokeRecord.builder().secureInvokeVo(dto).maxRetryTimes(secureInvoke.maxRetryTimes()).nextRetryTime(DateUtil.offsetMinute(new Date(), (int) SecureInvokeService.RETRY_INTERVAL_MINUTES)).createTime(new Date()).updateTime(new Date()).build();secureInvokeService.invoke(record, async);return null;}
}

(3)控制层MessageController

/*** 消息管理** @author hauhua* @DATE 2025/1/21**/
@RestController
public class MessageController {@Autowiredprivate MQProducerService mqProducerService;@RequestMapping("sendMsg")@Transactionalpublic void sendMsg(@RequestParam String topic, @RequestBody Object body) {mqProducerService.sendSecureMsg(topic, body,"test");}
}

(4)消息Service层MQProducerService

/*** 发送mq工具类** @author hauhua* @DATE 2025/1/21*/
@Slf4j
public class MQProducerService {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public void sendMsg(String topic, Object body) {Message<Object> build = MessageBuilder.withPayload(body).build();rocketMQTemplate.send(topic, build);}/*** 发送可靠消息,在事务提交后保证发送成功** @param topic* @param body*/@SecureInvokepublic void sendSecureMsg(String topic, Object body, Object key) {// 通过切面实现本地消息表,因为要在此方法执行前将数据保存到本地消息表,进行功能增强// 模拟发送mq消息有1/3概率失败if(RandomUtil.randomInt(3) >= 1){log.info("sendSecureMsg fail");throw new IllegalStateException();}Message<Object> build = MessageBuilder.withPayload(body).setHeader("KEYS", key).build();log.info("sendSecureMsg start");rocketMQTemplate.send(topic, build);log.info("sendSecureMsg end");}
}

(5)安全执行处理器SecureInvokeService

@Slf4j
@AllArgsConstructor
public class SecureInvokeService {public static final double RETRY_INTERVAL_MINUTES = 2D;private final SecureInvokeRecordDao secureInvokeRecordDao;private final Executor executor;// 每5秒执行一次定时任务,从数据库中获取所有等待重试的记录,并异步调用这些记录对应的方法@Scheduled(cron = "*/5 * * * * ?")public void retry() {List<SecureInvokeRecord> secureInvokeRecords = secureInvokeRecordDao.getWaitRetryRecords();if (secureInvokeRecords.isEmpty()) {log.info("SecureInvokeService retry no record");return;}for (SecureInvokeRecord secureInvokeRecord : secureInvokeRecords) {doAsyncInvoke(secureInvokeRecord);}}public void save(SecureInvokeRecord record) {secureInvokeRecordDao.save(record);}private void retryRecord(SecureInvokeRecord record, String errorMsg) {Integer retryTimes = record.getRetryTimes() + 1;SecureInvokeRecord update = new SecureInvokeRecord();update.setId(record.getId());update.setFailReason(errorMsg);update.setNextRetryTime(getNextRetryTime(retryTimes));update.setUpdateTime(new Date());if (retryTimes > record.getMaxRetryTimes()) {update.setStatus(SecureInvokeRecord.STATUS_FAIL);} else {update.setRetryTimes(retryTimes);}secureInvokeRecordDao.updateById(update);}private Date getNextRetryTime(Integer retryTimes) {//或者可以采用退避算法double waitMinutes = Math.pow(RETRY_INTERVAL_MINUTES, retryTimes);//重试时间指数上升 2m 4m 8m 16mreturn DateUtil.offsetMinute(new Date(), (int) waitMinutes);}private void removeRecord(Long id) {secureInvokeRecordDao.removeById(id);}public void invoke(SecureInvokeRecord record, boolean async) {boolean inTransaction = TransactionSynchronizationManager.isActualTransactionActive();//非事务状态,直接执行,不做任何保证。if (!inTransaction) {return;}//保存执行数据record.setStatus(SecureInvokeRecord.STATUS_WAIT);save(record);TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {@SneakyThrows@Overridepublic void afterCommit() {//事务后执行if (async) {doAsyncInvoke(record);} else {doInvoke(record);}}});}public void doAsyncInvoke(SecureInvokeRecord record) {executor.execute(() -> {System.out.println(Thread.currentThread().getName());doInvoke(record);});}public void doInvoke(SecureInvokeRecord record) {SecureInvokeVo secureInvokeVo = record.getSecureInvokeVo();try {// 构造method以及对应的类和method中的args参数SecureInvokeHolder.setInvoking();Class<?> beanClass = Class.forName(secureInvokeVo.getClassName());Object bean = SpringUtil.getBean(beanClass);List<String> parameterStrings = JsonUtils.toList(secureInvokeVo.getParameterTypes(), String.class);List<Class<?>> parameterClasses = getParameters(parameterStrings);Method method = ReflectUtil.getMethod(beanClass, secureInvokeVo.getMethodName(), parameterClasses.toArray(new Class[]{}));Object[] args = getArgs(secureInvokeVo, parameterClasses);// 执行方法,回调自己在事务提交后的逻辑method.invoke(bean, args);// 执行成功则删除secure_invoke_record对应的数据,说明消息已经发送成功removeRecord(record.getId());} catch (Throwable e) {log.error("SecureInvokeService invoke fail", e);// 执行失败,等待下次执行(用于重试,下一次执行时间会有对应的算法)retryRecord(record, e.getMessage());} finally {SecureInvokeHolder.invoked();}}@NotNullprivate Object[] getArgs(SecureInvokeVo secureInvokeVo, List<Class<?>> parameterClasses) {JsonNode jsonNode = JsonUtils.toJsonNode(secureInvokeVo.getArgs());Object[] args = new Object[jsonNode.size()];for (int i = 0; i < jsonNode.size(); i++) {Class<?> aClass = parameterClasses.get(i);args[i] = JsonUtils.nodeToValue(jsonNode.get(i), aClass);}return args;}@NotNullprivate List<Class<?>> getParameters(List<String> parameterStrings) {return parameterStrings.stream().map(name -> {try {return Class.forName(name);} catch (ClassNotFoundException e) {log.error("SecureInvokeService class not fund", e);}return null;}).collect(Collectors.toList());}
}

4.2、代码执行流程

请添加图片描述
由于在代码中写死了模拟1/3概率发送消息失败,所以刚开始会将数据插入secure_invoke_record,然后通过TransactionSynchronizationManager执行后置处理,最后通过定时任务,扫描失败的任务,从secure_invoke_record取出数据利用MQ再次重试,并且删除secure_invoke_record表刚刚重试的数据,最后执行完成,达到消息最终一致性。
(1)数据库层面执行验证效果
刚开始1/3概率发送消息失败,通过切面将消息保存到本地消息表secure_invoke_record中。
请添加图片描述
可以看出创建时间为00:25,下一次重试时间为00:27,到了00:27定时任务启动,执行重试,成功后把这个本地消息删除,并重发MQ
请添加图片描述

自定义的topic也已经经过重试发送消息成功
在这里插入图片描述
(2)本地idea控制台日志层面分析
在这里插入图片描述
在这里插入图片描述

在时间为00:25发送失败,写入本地消息表。在重试时间为00:27,发送MQ消息成功。

4.3、项目结构

在这里插入图片描述
主要提交代码如下
在这里插入图片描述
提交MR地址

4.4、项目源码

项目源码demo-springboot-mybatisplus,欢迎大家star!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/7038.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Chrome插件:图片缩放为头像(128*128)

前置条件&#xff1a; 安装有chrome谷歌浏览器的电脑 使用步骤&#xff1a; 1.打开chrome扩展插件 2.点击管理扩展程序 3.加载已解压的扩展程序 4.选择对应文件夹 5.成功后会出现一个扩展小程序 6.点击对应小程序 7.使用小程序 8.拖拽成功后会自动保存到下载 代码&#xf…

idea maven本地有jar包,但还要从远程下载

idea 中&#xff0c;java 工程执行 maven reimport&#xff0c;报jar报无法下载。 我奇了个怪&#xff0c;我明明在本地仓库有啊&#xff0c;你非得从远程下载&#xff1f; 我从供应商那里拿来的&#xff0c;远程当然没有了。 这太奇葩了吧&#xff0c;折腾好久不行。 后来…

HTML<label>标签

例子 三个带标签的单选按钮&#xff1a; <form action"/action_page.php"> <input type"radio" id"html" name"fav_language" value"HTML"> <label for"html">HTML</label><br&…

【数据结构】_不带头非循环单向链表

目录 1. 链表的概念及结构 2. 链表的分类 3. 单链表的实现 3.1 SList.h头文件 3.2 SList.c源文件 3.3 Test_SList.c测试文件 关于线性表&#xff0c;已介绍顺序表&#xff0c;详见下文&#xff1a; 【数据结构】_顺序表-CSDN博客 本文介绍链表&#xff1b; 基于顺序表…

算法刷题笔记——图论篇

这里写目录标题 理论基础图的基本概念图的种类度 连通性连通图强连通图连通分量强连通分量 图的构造邻接矩阵邻接表 图的遍历方式 深度优先搜索理论基础dfs 与 bfs 区别dfs 搜索过程深搜三部曲所有可达路径广度优先搜索理论基础广搜的使用场景广搜的过程 岛屿数量孤岛的总面积沉…

“AI视觉贴装系统:智能贴装,精准无忧

嘿&#xff0c;朋友们&#xff01;今天我要跟你们聊聊一个特别厉害的技术——AI视觉贴装系统。这可不是普通的贴装设备&#xff0c;它可是融合了人工智能、计算机视觉和自动化控制等前沿科技的“智能贴装大师”。有了它&#xff0c;那些繁琐、复杂的贴装工作变得轻松又精准。来…

vim如何设置显示空白符

:set list 显示空白符 示例&#xff1a; :set nolist 不显示空白符 示例&#xff1a; &#xff08;vim如何使设置显示空白符永久生效&#xff1a;vim如何使相关设置永久生效-CSDN博客&#xff09;

Flutter android debug 编译报错问题。插件编译报错

下面相关内容 都以 Mac 电脑为例子。 一、问题 起因&#xff1a;&#xff08;更新 Android studio 2024.2.2.13、 Flutter SDK 3.27.2&#xff09; 最近 2025年 1 月 左右&#xff0c;我更新了 Android studio 和 Flutter SDK 再运行就会出现下面的问题。当然 下面的提示只是其…

AI导航工具我开源了利用node爬取了几百条数据

序言 别因今天的懒惰&#xff0c;让明天的您后悔。输出文章的本意并不是为了得到赞美&#xff0c;而是为了让自己能够学会总结思考&#xff1b;当然&#xff0c;如果有幸能够给到你一点点灵感或者思考&#xff0c;那么我这篇文章的意义将无限放大。 背景 随着AI的发展市面上…

Android Studio打包APK

1.导出APK安装包 如果是首次打包&#xff0c;Create new 单击蓝色对话框右边文件夹&#x1f4c2;图标 &#xff0c;选择密钥保存路径&#xff0c;然后在下方File name对话框中填写您想要名称&#xff0c;再点击OK回到密钥创建对话框。 在此对话框中填写密码&#xff08;Passwo…

ssh密钥登录GitHub时一直提示“Error: Permission denied (publickey)”

起因 环境&#xff1a;Windows10 背景&#xff1a;之前就是按照官方说明创建个rsa密钥&#xff0c;在git后台添加上&#xff0c;就行了&#xff0c;近期怎么添加怎么失败&#xff0c;总是“Error: Permission denied (publickey)”的提示&#xff01; 尝试 各种尝试&#xf…

【玩转全栈】----Django连接MySQL

阅前先赞&#xff0c;养好习惯&#xff01; 目录 1、ORM框架介绍 选择建议 2、安装mysqlclient 3、创建数据库 4、修改settings&#xff0c;连接数据库 5、对数据库进行操作 创建表 删除表 添加数据 删除数据 修改&#xff08;更新&#xff09;数据&#xff1a; 获取数据 1、OR…

软件质量与测试报告5-压力测试 JMeter 与 Badboy

A&#xff0e;百度搜索引擎压力测试 通过在Badboy下执行如下的测试场景来生成压力测试的脚本&#xff1a; a) 在Badboy的地址栏里面输入www.baidu.com&#xff0c;回车&#xff1b; b) 在右下区域打开的百度的主页上输入搜索关键字JMeter&#xff0c;回车&#xff1b; c) 在…

vim如何显示行号

:set nu 显示行号 :set nonu 不显示行号 &#xff08;vim如何使设置显示行号永久生效&#xff1a;vim如何使相关设置永久生效-CSDN博客&#xff09;

Python Typing: 实战应用指南

文章目录 1. 什么是 Python Typing&#xff1f;2. 实战案例&#xff1a;构建一个用户管理系统2.1 项目描述2.2 代码实现 3. 类型检查工具&#xff1a;MyPy4. 常见的 typing 用法5. 总结 在 Python 中&#xff0c;静态类型检查越来越受到开发者的重视。typing 模块提供了一种方式…

Linux的基本指令(上)

1.ls指令 语法&#xff1a;ls [选项] [目录或文件] 功能&#xff1a;对于⽬录&#xff0c;该命令列出该⽬录下的所有⼦⽬录与⽂件。对于⽂件&#xff0c;将列出⽂件名以及其他信息。 常用选项&#xff1a; -a 列出⽬录下的所有⽂件&#xff0c;包括以 . 开头的隐含⽂件。 -d 将…

【数据分享】1929-2024年全球站点的逐日平均能见度(Shp\Excel\免费获取)

气象数据是在各项研究中都经常使用的数据&#xff0c;气象指标包括气温、风速、降水、湿度等指标&#xff01;说到气象数据&#xff0c;最详细的气象数据是具体到气象监测站点的数据&#xff01; 有关气象指标的监测站点数据&#xff0c;之前我们分享过1929-2024年全球气象站点…

算法每日双题精讲 —— 二分查找(山脉数组的峰顶索引,寻找峰值)

&#x1f31f;快来参与讨论&#x1f4ac;&#xff0c;点赞&#x1f44d;、收藏⭐、分享&#x1f4e4;&#xff0c;共创活力社区。 &#x1f31f; 别再犹豫了&#xff01;快来订阅我们的算法每日双题精讲专栏&#xff0c;一起踏上算法学习的精彩之旅吧&#x1f4aa; 在算法的…

macOS如何进入 Application Support 目录(cd: string not in pwd: Application)

错误信息 cd: string not in pwd: Application 表示在当前目录下找不到名为 Application Support 的目录。可能的原因如下&#xff1a; 拼写错误或路径错误&#xff1a;确保你输入的目录名称正确。目录名称是区分大小写的&#xff0c;因此请确保使用正确的大小写。正确的目录名…

如何为64位LabVIEW配置正确的驱动程序

在安装 64位 LabVIEW 后&#xff0c;确保驱动程序正确配置是关键。如果您首先安装了 32位 LabVIEW 和相关驱动&#xff0c;然后安装了 64位 LabVIEW&#xff0c;需要确保为 64位 LabVIEW 安装和配置适当的驱动程序&#xff0c;才能正常访问硬件设备。以下是详细步骤&#xff1a…