多线程事务

一、业务场景

        我们在工作中经常会到往数据库里插入大量数据的工作,但是既需要保证数据的一致性,又要保证程序执行的效率。因此需要在多线程中使用事务,这样既可以保证数据的一致性,又能保证程序的执行效率。但是spring自带的@Transactional注解无法满足多线程间的事务一致性,因为这几个事务执行的线程不同,无法保持数据的一致性。

二、解决方案

        我的解决方案参考分布式事务2PC(Two-phase commit protocol),各个线程需要等待所有的线程执行完成后才能进行下一步操作,在使用线程池执行任务时,如果线程池的最大线程数小于任务列表的数量,就会发生“死锁”,即获取到线程的任务阻塞等待没有获取线程的任务执行完成,而没有获取线程的任务会在阻塞队列中等待空闲线程的调用。这种情况需要使用一阶段的超时机制来“解开”,超时机制会发送回滚命令,线程池收到后进行回滚,但这种情况任务始终无法提交,再次提交结果依然是等到超时再回滚。再使用中需要结合具体业务来对线程池参数以及数据库连接池参数进行合理的设置。如果这里听的优点迷,可以先看下面具体代码实现再来结合这段文字思考。

 

1、工具类代码: 

import lombok.extern.slf4j.Slf4j;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;/*** @author poxiao* @create 2023-01-05 22:22* <p>* 多线程事务管理器* 基于分布式事务思想,采用2PC(Two-phase commit protocol)协议* 解决基于线程池的多线程事务一致性问题*/
@Slf4j
public class MultiThreadingTransactionManager {/*** 事务管理器*/private final PlatformTransactionManager transactionManager;/*** 超时时间*/private final long timeout;/*** 时间单位*/private final TimeUnit unit;/*** 一阶段门闩,(第一阶段的准备阶段),当所有子线程准备完成时(除“提交/回滚”操作以外的工作都完成),countDownLatch的值为0*/private CountDownLatch oneStageLatch = null;/*** 二阶段门闩,(第二阶段的执行执行),主线程将不再等待子线程执行,直接判定总的任务执行失败,执行第二阶段让等待确认的线程进行回滚*/private final CountDownLatch twoStageLatch = new CountDownLatch(1);/*** 是否提交事务,默认是true(当任一线程发生异常时,isSubmit会被设置为false,即回滚事务)*/private final AtomicBoolean isSubmit = new AtomicBoolean(true);/*** 构造方法* @param transactionManager 事务管理器* @param timeout 超时时间* @param unit 时间单位*/public MultiThreadingTransactionManager(PlatformTransactionManager transactionManager, long timeout, TimeUnit unit) {this.transactionManager = transactionManager;this.timeout = timeout;this.unit = unit;}/*** 线程池方式执行任务,可保证线程间的事务一致性* @param runnableList 任务列表* @param executor 线程池* @return*/public boolean execute(List<Runnable> runnableList, Executor executor) {// 排除null值runnableList.removeAll(Collections.singleton(null));// 属性初始化innit(runnableList.size());// 遍历任务列表并放入线程池for (Runnable runnable : runnableList) {// 创建线程Thread thread = new Thread() {@Overridepublic void run() {// 如果别的线程执行失败,则该任务就不需要再执行了if (!isSubmit.get()) {log.info("当前子线程执行中止,因为线程事务中有子线程执行失败");oneStageLatch.countDown();return;}// 开启事务TransactionStatus transactionStatus = transactionManager.getTransaction(new DefaultTransactionDefinition());try {// 执行业务逻辑runnable.run();} catch (Exception e) {// 执行体发生异常,设置回滚isSubmit.set(false);log.error("线程{}:业务发生异常,执行体:{}", Thread.currentThread().getName(), runnable);}// 计数器减一oneStageLatch.countDown();try {//等待所有线程任务完成,监控是否有异常,有则统一回滚twoStageLatch.await();// 根据isSubmit值判断事务是否提交,可能是子线程出现异常,也有可能是子线程执行超时if (isSubmit.get()) {// 提交transactionManager.commit(transactionStatus);log.info("线程{}:事务提交成功,执行体:{}", Thread.currentThread().getName(), runnable);} else {// 回滚transactionManager.rollback(transactionStatus);log.info("线程{}:事务回滚成功,执行体:{}", Thread.currentThread().getName(), runnable);}} catch (InterruptedException e) {e.printStackTrace();}}};executor.execute(thread);}/*** 主线程担任协调者,当第一阶段所有参与者准备完成,oneStageLatch的计数为0* 主线程发起第二阶段,执行阶段(提交或回滚),根据*/try {// 主线程等待所有线程执行完成,超时时间设置为五秒oneStageLatch.await(timeout, unit);long count = oneStageLatch.getCount();System.out.println("countDownLatch值:" + count);// 主线程等待超时,子线程可能发生长时间阻塞,死锁if (count > 0) {// 设置为回滚isSubmit.set(false);log.info("主线线程等待超时,任务即将全部回滚");}twoStageLatch.countDown();} catch (InterruptedException e) {e.printStackTrace();}// 返回结果,是否执行成功,事务提交即为执行成功,事务回滚即为执行失败return isSubmit.get();}/*** 初始化属性* @param size 任务数量*/private void innit(int size) {oneStageLatch = new CountDownLatch(size);}
}

2、业务代码:

(1)线程池参数

我这里采用自定义线程池,线程池参数如下:

@Configuration
public class ThreadPoolConfig {// 获取服务器的cpu个数private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();// 获取cpu个数private static final int COUR_SIZE = CPU_COUNT * 4;private static final int MAX_COUR_SIZE = CPU_COUNT * 8;// 接下来配置一个bean,配置线程池。@Beanpublic Executor threadPoolTaskExecutor() {ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();threadPoolTaskExecutor.setCorePoolSize(COUR_SIZE);// 设置核心线程数threadPoolTaskExecutor.setMaxPoolSize(MAX_COUR_SIZE);// 配置最大线程数threadPoolTaskExecutor.setQueueCapacity(MAX_COUR_SIZE * 4);// 配置队列容量(这里设置成最大线程数的四倍)threadPoolTaskExecutor.setThreadNamePrefix("thirdParty-thread");// 给线程池设置名称threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 设置任务的拒绝策略return threadPoolTaskExecutor;}}

(2)任务业务正常,无异常抛出时正常提交事务情况

public Result<?> testTransaction() throws SQLException {List<User> users = new LinkedList<>();User user = new User();user.setName("1111");users.add(user);User user1 = new User();user1.setName("2222");users.add(user1);MultiThreadingTransactionManager multiThreadingTransactionManage = new MultiThreadingTransactionManager(transactionManager, 60, TimeUnit.SECONDS);List<Runnable> runnableList = new ArrayList<>();users.forEach((x) -> {runnableList.add(new Runnable() {@Overridepublic void run() {System.out.println("当前线程:" + Thread.currentThread().getName() + "插入数据:" + x);secondUserMapper.insertUser(x);}});});multiThreadingTransactionManage.execute(runnableList, threadPoolTaskExecutor);return Result.success(1);}

执行时的日志: 

执行成功后数据库多次了两条数据 

 (3)展示出现异常任务时回滚事务情况

 public Result<?> testTransaction() throws SQLException {List<User> users = new LinkedList<>();User user = new User();user.setName("1111");users.add(user);User user1 = new User();user1.setName("2222");users.add(user1);MultiThreadingTransactionManager multiThreadingTransactionManage = new MultiThreadingTransactionManager(transactionManager, 60, TimeUnit.SECONDS);List<Runnable> runnableList = new ArrayList<>();//模拟任务出现异常runnableList.add(() -> {int a = 10 / 0;});users.forEach((x) -> {runnableList.add(new Runnable() {@Overridepublic void run() {System.out.println("当前线程:" + Thread.currentThread().getName() + "插入数据:" + x);secondUserMapper.insertUser(x);}});});multiThreadingTransactionManage.execute(runnableList, threadPoolTaskExecutor);return Result.success(1);}

执行时的日志:  

数据库没有新增的数据 

 

参考文章: 

Spring多线程事务解决方案-CSDN博客

 两阶段VS三阶段提交协议_两阶段提交-CSDN博客

详解Spring多线程下如何保证事务的一致性-51CTO.COM 

多线程结合sprongboot事务(完善)_springboot多线程事务-CSDN博客 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/331445.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

开关电源AC-DC(15W 3-18V可调)

简介: 该模块使用PI的TNY268PN电源芯片制作的开关电源,实现最大功率15W 3-18V可调输出(更改反馈电阻)隔离式反激电源; 简介:该模块使用PI的TNY268PN电源芯片制作的开关电源,实现最大功率15W 3-18V可调输出(更改反馈电阻,现电路图输出5V)隔离式反激电源; 一、产品简…

论文阅读--CLIPasso

让计算机把真实图片抽象成简笔画&#xff0c;这个任务很有挑战性&#xff0c;需要模型捕获最本质的特征 以往的工作是找了素描的数据集&#xff0c;而且抽象程度不够高&#xff0c;笔画是固定好的&#xff0c;素描对象的种类不多&#xff0c;使得最后模型的效果十分受限 之所以…

云计算和大数据处理

文章目录 1.云计算基础知识1.1 基本概念1.2 云计算分类 2.大数据处理基础知识2.1 基础知识2.3 大数据处理技术 1.云计算基础知识 1.1 基本概念 云计算是一种提供资源的网络&#xff0c;使用者可以随时获取“云”上的资源&#xff0c;按需求量使用&#xff0c;并且可以看成是无…

面试八股之JVM篇3.5——垃圾回收——G1垃圾回收器

&#x1f308;hello&#xff0c;你好鸭&#xff0c;我是Ethan&#xff0c;一名不断学习的码农&#xff0c;很高兴你能来阅读。 ✔️目前博客主要更新Java系列、项目案例、计算机必学四件套等。 &#x1f3c3;人生之义&#xff0c;在于追求&#xff0c;不在成败&#xff0c;勤通…

优先级队列(堆)的实现

1.什么是优先级队列 队列是一种先进先出(FIFO)的数据结构&#xff0c;但有些情况下&#xff0c;操作的数据可能带有优先级&#xff0c;一般出队 列时&#xff0c;可能需要优先级高的元素先出队列&#xff0c;该中场景下&#xff0c;使用队列显然不合适&#xff0c;比如&#x…

Python线程

Python线程 1. 进程和线程 先来了解下进程和线程。 类比&#xff1a; 一个工厂&#xff0c;至少有一个车间&#xff0c;一个车间中至少有一个工人&#xff0c;最终是工人在工作。 一个程序&#xff0c;至少有一个进程&#xff0c;一个进程中至少有一个线程&#xff0c;最终…

不靠后端,前端也能搞定接口!

嘿&#xff0c;前端开发达人们&#xff01;有个超酷的消息要告诉你们&#xff1a;MemFire Cloud来袭啦&#xff01;这个神奇的东东让你们不用依赖后端小伙伴们&#xff0c;也能妥妥地搞定 API 接口。是不是觉得有点不可思议&#xff1f;但是事实就是这样&#xff0c;让我们一起…

141.字符串:重复的字符串(力扣)

题目描述 代码解决 class Solution { public:// 计算字符串s的next数组&#xff0c;用于KMP算法void getNext(int *next, const string& s){int j 0; // j是前缀的长度next[0] 0; // 初始化next数组&#xff0c;第一个字符的next值为0for (int i 1; i < s.size(); …

OpenHarmony 实战开发——一文总结ACE代码框架

一、前言 ACE_Engine框架是OpenAtom OpenHarmony&#xff08;简称“OpenHarmony”&#xff09;的UI开发框架&#xff0c;为开发者提供在进行应用UI开发时所必需的各种组件&#xff0c;以及定义这些组件的属性、样式、事件及方法&#xff0c;通过这些组件可以方便进行OpenHarmo…

AUTOMATIC1111/stable-diffusion-webui/stable-diffusion-webui-v1.9.3

配置环境介绍 目前平台集成了 Stable Diffusion WebUI 的官方镜像&#xff0c;该镜像中整合如下资源&#xff1a; GpuMall智算云 | 省钱、好用、弹性。租GPU就上GpuMall,面向AI开发者的GPU云平台 Stable Diffusion WebUI版本&#xff1a;v1.9.3 Python版本&#xff1a;3.10.…

插件:NGUI

一、版本 安装完毕后重启一下即可&#xff0c;否则可能创建的UI元素不生效 二、使用 Label文字 1、创建Canvs 2、只有根节点的这些脚本全部展开才能鼠标右键创建UI元素 3、选择字体 Sprite图片 1、选择图集 2、选择图集中的精灵 Panel容器 用来装UI的容器&#xff0c;一般UI…

从 0 实现一个文件搜索工具 (Java 项目)

背景 各文件系统下, 都有提供文件查找的功能, 但是一般而言搜索速度很慢 本项目仿照 everything 工具, 实现本地文件的快速搜索 实现功能 选择指定本地目录, 根据输入的信息, 进行搜索, 显示指定目录下的匹配文件信息文件夹包含中文时, 支持汉语拼音搜索 (全拼 / 首字母匹配…

Boss说,搞个深色B端系统。敢要就敢搞,宁被累死,不被吓死。

老规矩&#xff0c;先上文字说服&#xff08;洗脑&#xff09;自己&#xff0c;再附案例。 深色系B端系统是指在企业级应用中&#xff0c;使用深色主题的后台管理系统。这种设计风格主要以暗色调为主&#xff0c;如黑色、深灰色等&#xff0c;与传统的亮色主题相比&#xff0c…

Orangepi Zero2 linux系统摄像头设备文件名固定

文章目录 1. 寻找设备规则2. 使用udev规则修改挂载设备文件名称 问题&#xff1a; 在多次插拔usb摄像头或者在使用中不小心碰到或松了会导致设备文件名称变化&#xff0c;如从/dev/video1和/dev/video2变为/dev/video2和/dev/video3, 所以每次发生变化后都要充型修改代码或者重…

小红书无限加群脚本无需ROOT【使用简单无教程】

小红书无限加群脚本无需ROOT&#xff0c;包含了对应的小红书版本【使用简单无教程】 链接&#xff1a;https://pan.baidu.com/s/1HkLhahmHDFMKvqCC3Q3haA?pwd6hzf 提取码&#xff1a;6hzf

【Linux】TCP协议【中】{确认应答机制/超时重传机制/连接管理机制}

文章目录 1.确认应答机制2.超时重传机制&#xff1a;超时不一定是真超时了3.连接管理机制 1.确认应答机制 TCP协议中的确认应答机制是确保数据可靠传输的关键部分。以下是该机制的主要步骤和特点的详细解释&#xff1a; 数据分段与发送&#xff1a; 发送方将要发送的数据分成一…

Vue 3 组件基础与模板语法详解

title: Vue 3 组件基础与模板语法详解 date: 2024/5/24 16:31:13 updated: 2024/5/24 16:31:13 categories: 前端开发 tags: Vue3特性CompositionAPITeleportSuspenseVue3安装组件基础模板语法 Vue 3 简介 1. Vue 3 的新特性 Vue 3引入了许多新的特性&#xff0c;以提高框…

c++编程14——STL(3)list

欢迎来到博主的专栏&#xff1a;c编程 博主ID&#xff1a;代码小豪 文章目录 list成员类型构造、析构、与赋值iterator元素访问修改元素list的操作 list list的数据结构是一个链表&#xff0c;准确的说应该是一个双向链表。这是一个双向链表的节点结构&#xff1a; list的使用…

共享单车(八):数据库

实现后台数据库访问模块的框架&#xff0c;能够实现验证请求并响应&#xff08;支持数据库操作&#xff09;。 数据库设计 class SqlTabel //负责数据库表的创建 { public:SqlTabel(std::shared_ptr<MysqlConnection> sqlconn) :sqlconn_(sqlconn) {}bool CreateUserI…

超值分享50个DFM模型格式的素人直播资源,适用于DeepFaceLive的DFM合集

50直播模型&#xff1a;点击下载 作为直播达人&#xff0c;我在网上购买了大量直播用的模型资源&#xff0c;包含男模女模、明星脸、大众脸、网红脸及各种稀缺的路人素人模型。现在&#xff0c;我将这些宝贵的资源整理成合集分享给大家&#xff0c;需要的朋友们可以直接点击下…