多线程事务如何回滚?

背景介绍

1,最近有一个大数据量插入的操作入库的业务场景,需要先做一些其他修改操作,然后在执行插入操作,由于插入数据可能会很多,用到多线程去拆分数据并行处理来提高响应时间,如果有一个线程执行失败,则全部回滚。

2,在spring中可以使用@Transactional注解去控制事务,使出现异常时会进行回滚,在多线程中,这个注解则不会生效,如果主线程需要先执行一些修改数据库的操作,当子线程在进行处理出现异常时,主线程修改的数据则不会回滚,导致数据错误。

3,下面用一个简单示例演示多线程事务。

公用的类和方法

/*** 平均拆分list方法.* @param source* @param n* @param <T>* @return*/
public static <T> List<List<T>> averageAssign(List<T> source,int n){List<List<T>> result=new ArrayList<List<T>>();int remaider=source.size()%n; int number=source.size()/n; int offset=0;//偏移量for(int i=0;i<n;i++){List<T> value=null;if(remaider>0){value=source.subList(i*number+offset, (i+1)*number+offset+1);remaider--;offset++;}else{value=source.subList(i*number+offset, (i+1)*number+offset);}result.add(value);}return result;
}
/**  线程池配置* @version V1.0*/
public class ExecutorConfig {private static int maxPoolSize = Runtime.getRuntime().availableProcessors();private volatile static ExecutorService executorService;public static ExecutorService getThreadPool() {if (executorService == null){synchronized (ExecutorConfig.class){if (executorService == null){executorService =  newThreadPool();}}}return executorService;}private static  ExecutorService newThreadPool(){int queueSize = 500;int corePool = Math.min(5, maxPoolSize);return new ThreadPoolExecutor(corePool, maxPoolSize, 10000L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(queueSize),new ThreadPoolExecutor.AbortPolicy());}private ExecutorConfig(){}
}
/** 获取sqlSession* @author 86182* @version V1.0*/
@Component
public class SqlContext {@Resourceprivate SqlSessionTemplate sqlSessionTemplate;public SqlSession getSqlSession(){SqlSessionFactory sqlSessionFactory = sqlSessionTemplate.getSqlSessionFactory();return sqlSessionFactory.openSession();}

示例事务不成功操作

/*** 测试多线程事务.* @param employeeDOList*/
@Override
@Transactional
public void saveThread(List<EmployeeDO> employeeDOList) {try {//先做删除操作,如果子线程出现异常,此操作不会回滚this.getBaseMapper().delete(null);//获取线程池ExecutorService service = ExecutorConfig.getThreadPool();//拆分数据,拆分5份List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);//执行的线程Thread []threadArray = new Thread[lists.size()];//监控子线程执行完毕,再执行主线程,要不然会导致主线程关闭,子线程也会随着关闭CountDownLatch countDownLatch = new CountDownLatch(lists.size());AtomicBoolean atomicBoolean = new AtomicBoolean(true);for (int i =0;i<lists.size();i++){if (i==lists.size()-1){atomicBoolean.set(false);}List<EmployeeDO> list  = lists.get(i);threadArray[i] =  new Thread(() -> {try {//最后一个线程抛出异常if (!atomicBoolean.get()){throw new ServiceException("001","出现异常");}//批量添加,mybatisPlus中自带的batch方法this.saveBatch(list);}finally {countDownLatch.countDown();}});}for (int i = 0; i <lists.size(); i++){service.execute(threadArray[i]);}//当子线程执行完毕时,主线程再往下执行countDownLatch.await();System.out.println("添加完毕");}catch (Exception e){log.info("error",e);throw new ServiceException("002","出现异常");}finally {connection.close();}
}

数据库中存在一条数据:

//测试用例
@RunWith(SpringRunner.class)
@SpringBootTest(classes = { ThreadTest01.class, MainApplication.class})
public class ThreadTest01 {@Resourceprivate EmployeeBO employeeBO;/***   测试多线程事务.* @throws InterruptedException*/@Testpublic  void MoreThreadTest2() throws InterruptedException {int size = 10;List<EmployeeDO> employeeDOList = new ArrayList<>(size);for (int i = 0; i<size;i++){EmployeeDO employeeDO = new EmployeeDO();employeeDO.setEmployeeName("lol"+i);employeeDO.setAge(18);employeeDO.setGender(1);employeeDO.setIdNumber(i+"XX");employeeDO.setCreatTime(Calendar.getInstance().getTime());employeeDOList.add(employeeDO);}try {employeeBO.saveThread(employeeDOList);System.out.println("添加成功");}catch (Exception e){e.printStackTrace();}}
}

测试结果:

可以发现子线程组执行时,有一个线程执行失败,其他线程也会抛出异常,但是主线程中执行的删除操作,没有回滚,@Transactional注解没有生效。

使用sqlSession控制手动提交事务

 @ResourceSqlContext sqlContext;/*** 测试多线程事务.* @param employeeDOList*/
@Override
public void saveThread(List<EmployeeDO> employeeDOList) throws SQLException {// 获取数据库连接,获取会话(内部自有事务)SqlSession sqlSession = sqlContext.getSqlSession();Connection connection = sqlSession.getConnection();try {// 设置手动提交connection.setAutoCommit(false);//获取mapperEmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);//先做删除操作employeeMapper.delete(null);//获取执行器ExecutorService service = ExecutorConfig.getThreadPool();List<Callable<Integer>> callableList  = new ArrayList<>();//拆分listList<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);AtomicBoolean atomicBoolean = new AtomicBoolean(true);for (int i =0;i<lists.size();i++){if (i==lists.size()-1){atomicBoolean.set(false);}List<EmployeeDO> list  = lists.get(i);//使用返回结果的callable去执行,Callable<Integer> callable = () -> {//让最后一个线程抛出异常if (!atomicBoolean.get()){throw new ServiceException("001","出现异常");}return employeeMapper.saveBatch(list);};callableList.add(callable);}//执行子线程List<Future<Integer>> futures = service.invokeAll(callableList);for (Future<Integer> future:futures) {//如果有一个执行不成功,则全部回滚if (future.get()<=0){connection.rollback();return;}}connection.commit();System.out.println("添加完毕");}catch (Exception e){connection.rollback();log.info("error",e);throw new ServiceException("002","出现异常");}finally {connection.close();}
}
// sql
<insert id="saveBatch" parameterType="List">INSERT INTOemployee (employee_id,age,employee_name,birth_date,gender,id_number,creat_time,update_time,status)values<foreach collection="list" item="item" index="index" separator=",">(#{item.employeeId},#{item.age},#{item.employeeName},#{item.birthDate},#{item.gender},#{item.idNumber},#{item.creatTime},#{item.updateTime},#{item.status})</foreach></insert>

数据库中一条数据:

测试结果:抛出异常,

删除操作的数据回滚了,数据库中的数据依旧存在,说明事务成功了。

成功操作示例:

 @Resource
SqlContext sqlContext;
/*** 测试多线程事务.* @param employeeDOList*/
@Override
public void saveThread(List<EmployeeDO> employeeDOList) throws SQLException {// 获取数据库连接,获取会话(内部自有事务)SqlSession sqlSession = sqlContext.getSqlSession();Connection connection = sqlSession.getConnection();try {// 设置手动提交connection.setAutoCommit(false);EmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);//先做删除操作employeeMapper.delete(null);ExecutorService service = ExecutorConfig.getThreadPool();List<Callable<Integer>> callableList  = new ArrayList<>();List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);for (int i =0;i<lists.size();i++){List<EmployeeDO> list  = lists.get(i);Callable<Integer> callable = () -> employeeMapper.saveBatch(list);callableList.add(callable);}//执行子线程List<Future<Integer>> futures = service.invokeAll(callableList);for (Future<Integer> future:futures) {if (future.get()<=0){connection.rollback();return;}}connection.commit();System.out.println("添加完毕");}catch (Exception e){connection.rollback();log.info("error",e);throw new ServiceException("002","出现异常");// throw new ServiceException(ExceptionCodeEnum.EMPLOYEE_SAVE_OR_UPDATE_ERROR);}
}

测试结果:

数据库中数据:

删除的删除了,添加的添加成功了,测试成功。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/248525.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

构建基于Flask的跑腿外卖小程序

跑腿外卖小程序作为现代生活中的重要组成部分&#xff0c;其技术实现涉及诸多方面&#xff0c;其中Web开发框架是至关重要的一环。在这篇文章中&#xff0c;我们将使用Python的Flask框架构建一个简单的跑腿外卖小程序的原型&#xff0c;展示其基本功能和实现原理。 首先&…

echarts:获取省、市、区/县、镇的地图数据

目录 第一章 前言 第二章 获取地图的数据&#xff08;GeoJSON格式&#xff09; 2.1 获取省、市、区/县地图数据 2.2 获取乡/镇/街道地图数据 第一章 前言 需求&#xff1a;接到要做大屏的需求&#xff0c;其中需要用echarts绘画一个地图&#xff0c;但是需要的地图是区/县…

Linux浅学笔记03

目录 有关root的命令 用户和用户组 用户组管理&#xff1a;&#xff08;以下需要root用户执行&#xff09; 创建用户组: 删除用户组&#xff1a; 用户管理&#xff1a;&#xff08;以下需要root用户执行&#xff09; 创建用户&#xff1a; 删除用户&#xff1a; 查看用…

掌握 Android JNI 基础

写在前面 最近在看一些底层源码&#xff0c;发现 JNI 这块还是有必要系统的看一下&#xff0c;索性就写一写博客&#xff0c;加深加深印象&#x1f37b; 本文重点聊一聊一些干货&#xff0c;避免长篇大论 JNI 概述 JNI 是什么&#xff1f; 定义&#xff1a;Java Native In…

Mysql流程控制函数

任何一门语言都有顺序结构 分支结构 循环结构 流程处理函数可以根据不同的条件&#xff0c;执行不同的处理流程&#xff0c;可以在 SQL 语句中实现不同的条件选择。 MySQL 中的流程处理函数主要包括 IF() 、 IFNULL() 和 CASE() 函数。 IF(value,value1,value2)函数举例 IF…

Qt无边框窗口拖拽和阴影

先看下效果&#xff1a; 说明 自定义窗口控件的无边框,窗口事件由于没有系统自带边框,无法实现拖拽拉伸等事件的处理,一种方法就是重新重写主窗口的鼠标事件&#xff0c;一种时通过nativeEvent事件处理。重写事件相对繁琐,我们这里推荐nativeEvent处理。注意后续我们在做win平…

微信小程序 仿微信聊天界面

1. 需求效果图 2. 方案 为实现这样的效果&#xff0c;首先要解决两个问题&#xff1a; 2.1.点击输入框弹出软键盘后&#xff0c;将已有的少许聊天内容弹出&#xff0c;导致看不到的问题 点击输入框弹出软键盘后&#xff0c;将已有的少许聊天内容弹出&#xff0c;导致看不到的问…

算法面试八股文『 基础知识篇 』

博客介绍 近期在准备算法面试&#xff0c;网上信息杂乱不规整&#xff0c;出于强迫症就自己整理了算法面试常出现的考题。独乐乐不如众乐乐&#xff0c;与其奖励自己&#xff0c;不如大家一起嗨。以下整理的内容可能有不足之处&#xff0c;欢迎大佬一起讨论。 PS&#xff1a;…

EPSON RC 机器人-第一个程序

创建项目 有机械人且用USB线连接好。可以USB。没有真机的选择 C4 Sample 可以运行程序。 否刚会提示【不能连接到控制器&#xff0c;未安装USB驱动器】 代码 按F5打开运行窗口 再点【开始】 点 【是】&#xff0c;查看运行结果

手把手教你使用Flask搭建ES搜索引擎(实战篇)

目录 一、引言 二、准备工作 三、搭建Flask应用程序 四、创建索引并插入数据 五、运行应用程序和测试搜索功能 一、引言 随着互联网的发展&#xff0c;搜索引擎已经成为我们获取信息的重要工具。然而&#xff0c;传统的搜索引擎如Google、Baidu等&#xff0c;虽然功能强大…

C语言之刷到的怪题(i与sizeof(i)比较大小)

这个题目一般都是选择输出<。为什么呢&#xff1f;因为i是一个全局变量&#xff0c;并且没有初始化&#xff0c;那么i的值就等于0。i--之后就是-1了。而sizeof(i)求出的就是整形变量对应的大小4个字节。-1<4&#xff0c;因此就选择 输出<。其实不然&#xff0c;这个si…

1.迭代与递归 - JS

迭代与递归是函数进阶的第一个门槛。迭代就是对已知变量反复赋值变换&#xff1b;递归就是函数体内调用自身。 迭代 一个迭代是就是一个循环&#xff0c;根据迭代式对变量反复赋值。 求近似根&#xff08;切线法&#xff09;&#xff1b; 迭代描述&#xff1a; x 0 x_0 x0…

mysql之基本查询

基本查询 一、SELECT 查询语句 一、SELECT 查询语句 本文所用案例已上传资源 查询所有列 1 SELECT *FORM emp;查询指定字段 SELECT empno,ename,job FROM emp;给字段取别名 SELECT empno 员工编号 FROM emp; SELECT empno 员工编号,ename 姓名,job 岗位 FROM emp; SELECT …

Servlet过滤器个监听器

过滤器和监听器 过滤器 什么是过滤器 当浏览器向服务器发送请求的时候&#xff0c;过滤器可以将请求拦截下来&#xff0c;完成一些特殊的功能&#xff0c;比如&#xff1a;编码设置、权限校验、日志记录等。 过滤器执行流程 Filter实例 package com.by.servlet;import jav…

Codeforces Round 799 (Div. 4)

目录 A. Marathon B. All Distinct C. Where’s the Bishop? D. The Clock E. Binary Deque F. 3SUM G. 2^Sort H. Gambling A. Marathon 直接模拟 void solve() {int ans0;for(int i1;i<4;i) {cin>>a[i];if(i>1&&a[i]>a[1]) ans;}cout<&l…

开源博客项目Blog .NET Core源码学习(8:EasyCaching使用浅析)

开源博客项目Blog使用EasyCaching模块实现缓存功能&#xff0c;主要是在App.Framwork项目中引用了多类包&#xff0c;包括内存缓存&#xff08;EasyCaching.InMemory&#xff09;、Redis缓存&#xff08;EasyCaching.CSRedis&#xff09;&#xff0c;同时支持多种序列化方式&am…

人脸识别技术在网络安全中有哪些应用前景?

人脸识别技术在网络安全中有广泛的应用前景。以下是一些主要的应用方向&#xff1a; 1. 身份验证和访问控制&#xff1a;人脸识别可以用作一种更安全和方便的身份验证方法。通过将用户的人脸与事先注册的人脸进行比对&#xff0c;可以实现强大的身份验证&#xff0c;避免了传统…

uniapp微信小程序-请求二次封装(直接可用)

一、请求封装优点 代码重用性&#xff1a;通过封装请求&#xff0c;你可以在整个项目中重用相同的请求逻辑。这样一来&#xff0c;如果 API 发生变化或者需要进行优化&#xff0c;你只需在一个地方修改代码&#xff0c;而不是在每个使用这个请求的地方都进行修改。 可维护性&a…

Pytest测试用例参数化

pytest.mark.parametrize(参数名1,参数名2...参数n, [(参数名1_data1,参数名2_data1...参数名n_data1),(参数名1_data2,参数名2_data2...参数名n_data2)]) 场景&#xff1a; 定义一个登录函数test_login,传入参数为name,password&#xff0c;需要用多个账号去测试登录功能 # …

DC-磁盘配额(23国赛真题)

2023全国职业院校技能大赛网络系统管理赛项–模块B&#xff1a;服务部署&#xff08;WindowServer2022&#xff09; 文章目录 DC-磁盘配额题目配置步骤验证查看DC2驱动器C:\的磁盘配额&#xff0c;限制磁盘空间&#xff0c;警告等级等配置 DC-磁盘配额 题目 在DC2驱动器C:\上…