项目中用到了多线程去批量处理一些数据,当时想当然认为只要方法上加上@Transactional注解就好了,实际并未达到想要的处理效果。特此去学习了下关于多线程事务回滚相关方案,参考了网上其他资料,这里整理并记录下学习历程。
站在巨人的肩膀上,我们可以看的更远!
多线程事务怎么回滚?
- 一、准备相关基础方法
- 1.线程池配置
- 2.list切分工具类
- 3.SqlSession工具类
- 4.员工实体类
- 5.员工EmployeeMapper
- 6.员工对应EmployeeMapper.xml
- 二、业务处理
- 1.EmployeeService接口
- 2.测试多线程事务实现类
- 3.员工Controller
- 三、方案验证
- 1.数据库表Employee存储1条原始数据,用于验证数据删除后是否被回滚。![在这里插入图片描述](https://img-blog.csdnimg.cn/cb341ba2f3e146e69dc3e913f1b411f8.png)
- 2.EmployeeServiceImpl的saveThreadByTransactional方法
- 3.EmployeeServiceImpl的saveThreadRollBack方法
- 四、方案总结
- 1.方案总结
- 五.项目结构及下载
一、准备相关基础方法
这里以多线程、分批次插入数据库employee表为例子进行演示。
1.线程池配置
/*** 线程池配置*/
@Component
public class ExecutorConfig {private static int maxPoolSize = Runtime.getRuntime().availableProcessors();private volatile static ExecutorService executorService;public static ExecutorService getThreadPool() {if (executorService == null){synchronized (ExecutorConfig.class){if (executorService == null){executorService = newThreadPool();}}}return executorService;}private static ExecutorService newThreadPool(){int queueSize = 1000;int corePool = Math.min(10, maxPoolSize);return new ThreadPoolExecutor(corePool, maxPoolSize, 10000L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(queueSize),new ThreadPoolExecutor.AbortPolicy());}private ExecutorConfig(){}
}
2.list切分工具类
/*** list切分工具类*/
public class ListUtil {/*** 平均拆分list** @param source* @param n* @param <T>* @return*/public static <T> List<List<T>> AverageList(List<T> source, int n) {List<List<T>> result = new ArrayList<>();int remaider = source.size() % n;int number = source.size() / n;//偏移量int offset = 0;for (int i = 0; i < n; i++) {List<T> value;if (remaider > 0) {value = source.subList(i * number + offset, (i + 1) * number + offset + 1);remaider--;offset++;} else {value = source.subList(i * number + offset, (i + 1) * number + offset);}result.add(value);}return result;}
}
3.SqlSession工具类
/*** SqlSession工具类*/
@Component
public class SqlContext {@Resourceprivate SqlSessionTemplate sqlSessionTemplate;public SqlSession getSqlSession(){SqlSessionFactory sqlSessionFactory = sqlSessionTemplate.getSqlSessionFactory();return sqlSessionFactory.openSession();}
}
4.员工实体类
/*** 员工*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@TableName(value = "employee")
public class Employee {@TableField(value = "employee_id")private Integer employeeId;@TableField(value = "employee_name")private String employeeName;@TableField(value = "age")private Integer age;
}
5.员工EmployeeMapper
@Repository
public interface EmployeeMapper extends BaseMapper<Employee> {int saveBatchRollBack(List Employee);
}
6.员工对应EmployeeMapper.xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.it.mapper.EmployeeMapper"><resultMap id="BaseResultMap" type="com.it.entity.Employee"><!--@Table `Employee`--><result column="employee_id" jdbcType="INTEGER" property="employee_id" /><result column="employee_name" jdbcType="VARCHAR" property="employee_name" /><result column="age" jdbcType="INTEGER" property="age" /></resultMap><sql id="Base_Column_List">employee_id, employee_name, age</sql><insert id="saveBatchRollBack">insert intoemployee (employee_id,age,employee_name)values<foreach collection="list" item="item" index="index" separator=",">(#{item.employeeId},#{item.age},#{item.employeeName})</foreach></insert>
</mapper>
二、业务处理
1.EmployeeService接口
public interface EmployeeService extends IService<Employee> {/*** 使用@Transactional测试多线程回滚失败*/void saveThreadByTransactional(List<Employee> employeeList);/*** 使用手动操作事务测试多线程回滚成功*/void saveThreadRollBack(List<Employee> employeeList) throws SQLException;
}
2.测试多线程事务实现类
/*** 测试多线程事务*/
@Service
@Slf4j
public class EmployeeServiceImpl extends ServiceImpl<EmployeeMapper, Employee> implements EmployeeService {@ResourceSqlContext sqlContext;/*** 多线程环境下Transactional失效场景** @param employeeList*/@Override@Transactional(rollbackFor = Exception.class)public void saveThreadByTransactional(List<Employee> employeeList) {try {// 先做删除操作,如果子线程出现异常,此操作不会回滚this.getBaseMapper().delete(null);// 获取线程池ExecutorService executorService = ExecutorConfig.getThreadPool();// 拆分数据,拆分6份List<List<Employee>> lists = ListUtil.AverageList(employeeList, 6);// 执行的线程Thread[] threadArray = new Thread[lists.size()];// 监控子线程执行完毕,再执行主线程,要不然会导致主线程关闭,子线程也会随着关闭CountDownLatch countDownLatch = new CountDownLatch(lists.size());AtomicBoolean atomicBoolean = new AtomicBoolean(true);for (int i = 0; i < lists.size(); i++) {if (i == lists.size() - 1) {// 最后一个atomicBoolean设置为falseatomicBoolean.set(false);}List<Employee> list = lists.get(i);threadArray[i] = new Thread(() -> {try {// 最后一个线程抛出异常if (!atomicBoolean.get()) {throw new RuntimeException("最后一个线程添加时抛出异常");}//批量添加,mybatisPlus中自带的batch方法this.saveBatch(list);} finally {countDownLatch.countDown();}});}for (int i = 0; i < lists.size(); i++) {executorService.execute(threadArray[i]);}// 当子线程执行完毕时,主线程再往下执行countDownLatch.await();System.out.println("employee列表添加完成");} catch (Exception e) {log.info("error", e);throw new RuntimeException("employee列表添加过程出现异常");}}/*** 使用sqlSession控制手动提交事务** @param employeeList*/@Overridepublic void saveThreadRollBack(List<Employee> employeeList) throws SQLException {{// 获取数据库连接,获取会话(内部自有事务)SqlSession sqlSession = sqlContext.getSqlSession();Connection connection = sqlSession.getConnection();try {// 设置手动提交connection.setAutoCommit(false);//获取mapperEmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);//先做删除操作employeeMapper.delete(null);//获取执行器ExecutorService service = ExecutorConfig.getThreadPool();List<Callable<Integer>> callableList = new ArrayList<>();//拆分listList<List<Employee>> lists = ListUtil.AverageList(employeeList, 6);AtomicBoolean atomicBoolean = new AtomicBoolean(true);for (int i = 0; i < lists.size(); i++) {if (i == lists.size() - 1) {atomicBoolean.set(false);}List<Employee> list = lists.get(i);//使用返回结果的callable去执行,Callable<Integer> callable = () -> {//让最后一个线程抛出异常if (!atomicBoolean.get()) {throw new Exception("出现异常");}return employeeMapper.saveBatchRollBack(list);};callableList.add(callable);}//执行子线程List<Future<Integer>> futures = service.invokeAll(callableList);for (Future<Integer> future : futures) {//如果有一个执行不成功,则全部回滚if (future.get() <= 0) {connection.rollback();return;}}connection.commit();System.out.println("添加完毕");} catch (Exception e) {connection.rollback();log.info("error", e);} finally {connection.close();}}
}
3.员工Controller
@RestController
@RequestMapping(value = "/employee")
public class EmployeeController {@AutowiredEmployeeService employeeService;@PostMapping("/saveThreadByTransactional")public ResponseEntity saveThreadByTransactional() {// 模拟需要插入12名员工到数据库List<Employee> list = IntStream.range(0, 12).mapToObj(i -> {Employee employee = new Employee();employee.setEmployeeId(i);employee.setEmployeeName("三丰" + i);employee.setAge(i + 100);return employee;}).collect(Collectors.toList());employeeService.saveThreadByTransactional(list);return new ResponseEntity<>(HttpStatus.OK);}@PostMapping("/saveThreadRollBack")public ResponseEntity saveThreadRollBack() throws SQLException {// 模拟需要插入12名员工到数据库List<Employee> list = IntStream.range(0, 12).mapToObj(i -> {Employee employee = new Employee();employee.setEmployeeId(i);employee.setEmployeeName("三丰" + i);employee.setAge(i + 100);return employee;}).collect(Collectors.toList());employeeService.saveThreadRollBack(list);return new ResponseEntity<>(HttpStatus.OK);}
}
三、方案验证
1.数据库表Employee存储1条原始数据,用于验证数据删除后是否被回滚。
2.EmployeeServiceImpl的saveThreadByTransactional方法
该方法通过使用@Transactional注解尝试处理多线程事务回滚。
利用postman测试saveThreadByTransactional接口
发现控制台显示我们自定义的线程报错
查询数据库Employee表,发现代码中this.getBaseMapper().delete(null);
可以发现子线程组执行时,有一个线程执行失败,其他线程也会抛出异常,但是主线程中执行的删除操作,没有回滚(数据库中表数据也已经被删除完成),则证明@Transactional注解并不能在多线程下进行事务回滚!
3.EmployeeServiceImpl的saveThreadRollBack方法
该方法通过使用sqlSession控制,手动提交事务,在多线程下进行事务回滚。
利用postman测试saveThreadRollBack接口。
发现控制台显示我们自定义的线程报错。
查询数据库Employee表,发现数据并未被删除,证明多线程执行过程中失败了,事务被回滚了。
四、方案总结
1.方案总结
在Spring中可以使用@Transactional注解去控制事务,使出现异常时会进行回滚,在多线程中,这个注解则不会生效。
如果主线程需要先执行一些修改数据库的操作,当子线程在进行处理出现异常时,主线程修改的数据则不会回滚,导致数据错误。
通过使用sqlSession控制手动提交事务,可以达到主线程和子线程数据事务回滚。
五.项目结构及下载
源码地址springboot-cacheable,创作不易,欢迎star哦~
参考资料
支付宝一面:多线程事务怎么回滚?说用 @Transactional 可以回去等通知了!
多线程事务怎么回滚?
多线程如何实现事务回滚?一招帮你搞定!