结合seata和2PC,简单聊聊seata源码

当前代码分析基于seata1.6.1

整体描述

整体代码流程可以描述为

  1. TM开启全局事务,会调用TC来获取XID。
  2. TC在接收到通知后,会生成XID,然后会将当前全局事务保存到global_table表中,并且返回XID。
  3. 在获取到XID后,会执行业务逻辑。
  4. 执行业务逻辑的时候,如果发生了增删改,则会对增删改语句做增强。
  5. 获取前置镜像数据---执行sql,不提交事务--获取后置镜像---准备undoLog---作为RM向TC提交事务分支---生成undo_log日志---提交本地事务,注意,在这里,本地事务已经提交了。只是有undo_log可用于回滚。
  6. TC接收RM端提交的分支事务,存储到brand_table中。
  7. 当全局分支事务都执行完成,TM会向TC提起全局事务提交的请求。
  8. TC接收到请求后,删除全局事务和分支事务(global_table 和 brand_table)。
  9. TC 通知RM,删除 undo log 日志。

源码解析

系统启动初始化

主要完成两个事情:初始化 TM和RM客户端;创建方法拦截器

在客户端中,核心配置类是SeataAutoConfiguration,在这个类中初始化了一个核心的扫描器GlobalTransactionScanner。

GlobalTransactionScanner 全局事务扫描器,实现了InitializingBean接口,如果继承了该接口,spring会在完成DI之后,调用afterPropertiesSet方法,在该方法中完成了对TM客户端和RM客户端的创建,代码如下

@Override
public void afterPropertiesSet() {if (disableGlobalTransaction) {if (LOGGER.isInfoEnabled()) {LOGGER.info("Global transaction is disabled.");}ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,(ConfigurationChangeListener)this);return;}if (initialized.compareAndSet(false, true)) {//创建客户端的方法initClient();}
}private void initClient() {...其他校验//创建TM客户端并且初始化TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);...//创建RM客户端并且初始化RMClient.init(applicationId, txServiceGroup);...}

同时,GlobalTransactionScanner 继承了AbstractAutoProxyCreator 抽象类,在类完成初始化之后,会调用父类的 postProcessAfterInitialization方法,在父类的方法中,会调用该类重写的一个wrapIfNecessary方法。

wrapIfNecessary 方法会生成一个 GlobalTransactionalInterceptor 全局事务拦截器。

protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {// do checkersif (!doCheckers(bean, beanName)) {return bean;}try {synchronized (PROXYED_SET) {if (PROXYED_SET.contains(beanName)) {return bean;}interceptor = null;//check TCC proxyif (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {...} else {...//生成一个全局事务处理的拦截器,if (globalTransactionalInterceptor == null) {globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,(ConfigurationChangeListener)globalTransactionalInterceptor);}interceptor = globalTransactionalInterceptor;}...} catch (Exception exx) {throw new RuntimeException(exx);}}

TM端开启全局事务

开启全局事务

GlobalTransactionalInterceptor中的逻辑

忽略中间调用过程,最终会走到io.seata.tm.api.TransactionalTemplate#execute

在该类中,核心代码如下

public Object execute(TransactionalExecutor business) throws Throwable {...try {...try {//开启全局事务beginTransaction(txInfo, tx);Object rs;try {//进入业务代码,执行业务逻辑rs = business.execute();} catch (Throwable ex) {//当出现业务逻辑异常进行回滚completeTransactionAfterThrowing(txInfo, tx, ex);throw ex;}//所有分支事务无异常,提交全局事务commitTransaction(tx, txInfo);return rs;} finally {...}} finally {...}
}

开启全局事务的方法就在beginTransaction中,继续往下会去TC中获取一个XID,就是全局事务id

TC端接收全局事务请求后

记录全局事务

在server层的代码中,全局事务的入口方法为 io.seata.server.coordinator.DefaultCoordinator#doGlobalBegin

在该方法中会去调用core.begin方法,进入xid获取流程

@Override
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)throws TransactionException {//开始获取xidresponse.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),request.getTransactionName(), request.getTimeout()));if (LOGGER.isInfoEnabled()) {LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());}
}

忽略其他流程,关注核心,其调用往下的链路为

core.begin--session.begin--lifecycleListener.onBegin--找到子类方法--this.addGlobalSession--找子类方法

在这里,可以找到三个实现类,分别是代表了数据库、文件和redis实现,文件是默认实现,其他几种都需要进行配置,我们针对数据库实现进行描述。

方法路径为:io.seata.server.storage.db.session.DataBaseSessionManager#addGlobalSession

然后调用方法transactionStoreManager.writeSession,transactionStoreManager是一个接口,同样有三种实现

在writeSession方法中,会插入全局事务表数据,代码如下

/*** 插入全局事务表* 表为:global_table* @param globalTransactionDO the global transaction do* @return*/
@Override
public boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertGlobalTransactionSQL(globalTable);Connection conn = null;PreparedStatement ps = null;try {int index = 1;conn = logStoreDataSource.getConnection();conn.setAutoCommit(true);ps = conn.prepareStatement(sql);//插入xidps.setString(index++, globalTransactionDO.getXid());//插入事务idps.setLong(index++, globalTransactionDO.getTransactionId());//插入事务状态,begin  = 1ps.setInt(index++, globalTransactionDO.getStatus());//插入应用id,一般是服务名ps.setString(index++, globalTransactionDO.getApplicationId());//插入事务组ps.setString(index++, globalTransactionDO.getTransactionServiceGroup());String transactionName = globalTransactionDO.getTransactionName();transactionName = transactionName.length() > transactionNameColumnSize ?transactionName.substring(0, transactionNameColumnSize) :transactionName;//插入事务名称ps.setString(index++, transactionName);//插入超时时间ps.setInt(index++, globalTransactionDO.getTimeout());//插入事务开始时间ps.setLong(index++, globalTransactionDO.getBeginTime());ps.setString(index++, globalTransactionDO.getApplicationData());return ps.executeUpdate() > 0;} catch (SQLException e) {throw new StoreException(e);} finally {IOUtil.close(ps, conn);}
}

RM执行业务代码,并且提交事务

代理数据源,生成undo log,并且通知TC

在seata中,需要配置数据源代理,这个代理会在执行增删改查的时候,对操作进行增强

这里核心需要关注的方法是io.seata.rm.datasource.exec.AbstractDMLBaseExecutor#doExecute

@Override
public T doExecute(Object... args) throws Throwable {AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();//一般在初始状态下,这个autoCommit是trueif (connectionProxy.getAutoCommit()) {return executeAutoCommitTrue(args);} else {return executeAutoCommitFalse(args);}
}

然后会调用 executeAutoCommitTrue 方法,该方法主要做了几个事情,分别是:获取前置镜像和后置镜像,并且制作undo_log;执行目标sql和插入undo_log;作为RM和TC进行交互,提交分支事务;以及提交事务。代码如下

executeAutoCommitTrue 方法代码

 protected T executeAutoCommitTrue(Object[] args) throws Throwable {ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();try {//设置提交方式为手动提交connectionProxy.changeAutoCommit();return new LockRetryPolicy(connectionProxy).execute(() -> {//执行sql,并且准备前置镜像和后置镜像T result = executeAutoCommitFalse(args);//提交本地事务(内部会和RM进行交互)connectionProxy.commit();return result;});} catch (Exception e) {...异常处理} finally {connectionProxy.getContext().reset();connectionProxy.setAutoCommit(true);}
}

executeAutoCommitFalse方法代码

protected T executeAutoCommitFalse(Object[] args) throws Exception {//获取前置镜像TableRecords beforeImage = beforeImage();//执行目标sql,注意,这边执行完后,事务是未提交的T result = statementCallback.execute(statementProxy.getTargetStatement(), args);//获取后置镜像TableRecords afterImage = afterImage(beforeImage);//准备undo_logprepareUndoLog(beforeImage, afterImage);return result;
}

processGlobalTransactionCommit 方法,该方法就是connectionProxy.commit()最终指向的方法

 private void processGlobalTransactionCommit() throws SQLException {try {//作为RM,向TC发起请求,注册分支事务,会插入数据到TC的mysql表中register();} catch (TransactionException e) {recognizeLockKeyConflictException(e, context.buildLockKeys());}try {//生成undo_log日志,用于事务回滚UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);//提交undo_log 回滚日志和本地事务,事务在这里已经提交了targetConnection.commit();} catch (Throwable ex) {...异常处理}...其他处理
}

TM提交全局事务

进行提交就是向TC发起请求,相关代码如下

@Override
public void commit() throws TransactionException {//判断当前角色,只有TM才能执行if (role == GlobalTransactionRole.Participant) {...return;}//XID不能为空assertXIDNotNull();int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;try {//可重试的执行,最多可执行5次while (retry > 0) {try {retry--;//向tc发起调用status = transactionManager.commit(xid);break;} catch (Throwable ex) {...}}} finally {...}...
}

TC处理全局事务

TC在接收到提交请求后,会由方法 io.seata.server.coordinator.DefaultCoordinator#doGlobalCommit 进行处理。

    protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)throws TransactionException {MDC.put(RootContext.MDC_KEY_XID, request.getXid());//设置状态为异步提交状态response.setGlobalStatus(core.commit(request.getXid()));}

在AT模式下,事务的提交为异步的方式

public GlobalStatus commit(String xid) throws TransactionException {//获取全局session,不同模式获取方式不同,如果是db,则会从数据库获取GlobalSession globalSession = SessionHolder.findGlobalSession(xid);if (globalSession == null) {//如果获取不到session,返回已完成状态,一般在调用超时的时候会发生,这样也可以保证幂等return GlobalStatus.Finished;}...boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {if (globalSession.getStatus() == GlobalStatus.Begin) {// Highlight: Firstly, close the session, then no more branch can be registered.globalSession.closeAndClean();//判断是否可以异步提交,AT模式下可以异步提交if (globalSession.canBeCommittedAsync()) {//AT模式下异步事务提交globalSession.asyncCommit();MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.Committed, false, false);return false;} else {...}}return false;});...
}

最终将事务状态设置为异步提交

    public void asyncCommit() throws TransactionException {this.addSessionLifecycleListener(SessionHolder.getAsyncCommittingSessionManager());//设置事务状态为异步提交,这里在设置为异步提交后就不管了this.setStatus(GlobalStatus.AsyncCommitting);SessionHolder.getAsyncCommittingSessionManager().addGlobalSession(this);}

TC异步执行全局事务commit

核心逻辑为 io.seata.server.coordinator.DefaultCoordinator#init

该方法会异步的去进行处理,每秒执行一次

    /*** Init.*/public void init() {...//异步处理的部分//会从global中,每次取100条进行处理,并且删除这100条数据,然后遍历brand_table,根据global_table取删除//操作完了之后,向RM进行通知,进行undo_log的删除asyncCommitting.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(ASYNC_COMMITTING, this::handleAsyncCommitting), 0,ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);...}

未完待续...

以下是经过注释的源码地址:seata: Seata 是一款开源的分布式事务解决方案,提供高性能和简单易用的分布式事务服务 - Gitee.com

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/448156.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

conda创建的新环境不干净!一定要注意!

总是出现明明是不同的环境&#xff0c;但是总是出现包交叉混用的问题&#xff0c;导致跑很多模型总是出现改了这个环境的包&#xff0c;那个环境又用不了了。就像下面这样&#xff0c;明明激活的是pyskl&#xff0c;安装mediapipe包显示在thwircamera中索引到就显示Requirement…

postgresql 安装

一、下载 PostgreSQL: File Browser 下载地址 PostgreSQL: File Browser 上传到服务器,并解压 二、安装依赖 yum install -y perl-ExtUtils-Embed readline-devel zlib-devel pam-devel libxml2-devel libxslt-devel openldap-devel 创建postgresql 和目录 useradd …

『Mysql集群』Mysql高可用集群之主从复制 (一)

Mysql主从复制模式 主从复制有一主一从、主主复制、一主多从、多主一从等多种模式. 我们可以根据它们的优缺点选择适合自身企业情况的主从复制模式进行搭建 . 一主一从 主主复制 (互为主从模式): 实现Mysql多活部署 一主多从: 提高整个集群的读能力 多主一从: 提高整个集群的…

一、定时器的时钟来源

计数器的时钟选择8个时钟源&#xff0c;可以分成4类: 一、来自RCC的内部时钟TIMx CLK 二、芯片内部其他定时器的触发输入ITR 使用某一个定时器作为另外一个定时器的分频 ITR1、ITR2、ITR3和ITR4 三、外部时钟源模式1&#xff1a; 外部捕获引脚上的边沿信号 TI1FP…

【jeston】torch相关环境安装

参考&#xff1a;玩转NVIDIA Jetson &#xff08;25&#xff09;— jetson 安装pytorch和torchvision 我的jeston信息&#xff1a; torch install 安装环境 conda create -n your_env python3.8 conda activate your_envpytorch_for_jeston 安装.whl文件 验证&#xff1…

循环神经网络(Recurrent Neural Network,RNN)

简介&#xff1a;个人学习分享&#xff0c;如有错误&#xff0c;欢迎批评指正。 一. 核心理念 循环神经网络&#xff08;Recurrent Neural Network&#xff0c;RNN&#xff09;是一类专门用于处理序列数据的神经网络架构。其独特之处在于能够处理输入序列中元素的时序关系&…

STM32定时器

目录 STM32定时器概述 STM32基本定时器 基本定时器的功能 STM32基本定时器的寄存器 STM32通用定时器 STM32定时器HAL库函数 STM32定时器概述 从本质上讲定时器就是“数字电路”课程中学过的计数器&#xff08;Counter&#xff09;&#xff0c;它像“闹钟”一样忠实地为处…

41 C 语言共用体:共用体数据类型、共用体变量、访问共用体成员、与结构体的区别

目录 1 什么是共用体 2 共用体与结构体的区别 3 声明共用体类型 4 声明共用体变量 5 共用体内存分析 6 共用体成员的获取和赋值 7 综合案例 7.1 共同体特点演示 7.2 使用共用体存储学生和教师信息 1 什么是共用体 共用体&#xff08;Union&#xff09;是一种特殊的数据…

大型企业软件开发是什么样子的? - Web Dev Cody

引用自大型企业软件开发是什么样子的&#xff1f; - Web Dev Cody_哔哩哔哩_bilibili 一般来说 学技术的时候 我们会关注 开发语言特性 &#xff0c;各种高级语法糖&#xff0c;底层技术 但是很少有关注到企业里面的开发流程&#xff0c;本着以终为始&#xff08;以就业为导向…

OpenCV高级图形用户界面(8)在指定的窗口中显示一幅图像函数imshow()的使用

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 在指定的窗口中显示一幅图像。 函数 imshow 在指定的窗口中显示一幅图像。如果窗口是以 cv::WINDOW_AUTOSIZE 标志创建的&#xff0c;图像将以原…

An.如何在an中截取音频片段

如何在an中截取音频片段 在an动画制作过程中&#xff0c;部分片段需要插入音乐&#xff0c;如果想要插入一首歌曲的其中一小节&#xff0c;打开音频编辑软件操作就很麻烦&#xff0c;不妨直接在an中操作&#xff1a; 以这首节气歌为例&#xff0c;前奏太长需要剪掉前面的部分 …

TOGAF 9.2 与 TOGAF 10 的对比分析:架构演进之路

TOGAF 9.2 与 TOGAF 10 的对比分析&#xff1a;架构演进之路 前言 TOGAF&#xff08;The Open Group Architecture Framework&#xff09;自诞生以来&#xff0c;已成为企业架构&#xff08;EA&#xff09;领域的全球标准框架。随着时代的发展&#xff0c;TOGAF也在不断进化&…

基于SpringBoot+Vue+uniapp的在线招聘平台的详细设计和实现

详细视频演示 请联系我获取更详细的演示视频 项目运行截图 技术框架 后端采用SpringBoot框架 Spring Boot 是一个用于快速开发基于 Spring 框架的应用程序的开源框架。它采用约定大于配置的理念&#xff0c;提供了一套默认的配置&#xff0c;让开发者可以更专注于业务逻辑而不…

第二课:Python入门学习之开发工具的安装

今天我们进行Python开发工具的安装&#xff0c;其实网上的开发工具是很多的&#xff0c;每个人的习惯也都不一样&#xff0c;我们先去官网下载一个比较常用的吧。废话不多开始操作。 第一步&#xff1a;打开官网地址&#xff1a;https://www.jetbrains.com.cn/ 第二步&#xf…

面试题:Redis(三)

1. 面试题 背景 问题&#xff0c;上面业务逻辑你用java代码如何写&#xff1f; 2. 缓存双写一致性谈谈你的理解&#xff1f; 3. 双检加锁策略 多个线程同时去查询数据库的这条数据&#xff0c;那么我们可以在第一个查询数据的请求上使用一个 互斥锁来锁住它。 其他的线程走到这…

微知-Mellanox驱动中的iSCSI是什么?有哪三种网络存储有哪三种?iSER是什么?(iSCSI协议(总线),SAN 存储区域网络)

背景 本文根据Mellanox网卡驱动中关于iSCSI模块&#xff0c;来介绍iSCSI是什么&#xff1f;该技术发展演进背景&#xff1f; 关于iSCSI iSCSI是一种协议&#xff0c;SCSI是总线。比如常说的SAS&#xff08;Serial Attach SCSI&#xff09;存储盘对比与家用的SATA&#xff0…

Uiautomator2与weditor配置一直报错咋办

作者在配置这两个的时候绞尽脑汁了&#xff0c;u2的init总是报错并且无法自动在手机上安装atx&#xff0c;weditor可以打开但是只要对元素操作或者任意操作就会让你去重新init&#xff0c;搞得作者焦头烂额&#xff0c;而且网上各种各样的报错信息眼花缭乱&#xff0c;作者几乎…

NVIDIA NIM平台如何打造AI图表识别系统

NVIDIA NIM是一套易于使用的推理微服务&#xff0c;旨在加速企业中性能优化的生成式 AI 模型的部署。 NIM 推理微服务可以部署在任何地方&#xff0c;从工作站和本地到云&#xff0c;提供企业控制自己的部署选择并确保数据安全。它还提供行业领先的延迟和吞吐量&#xff0c;实现…

使用Windows创建一个MFC应用【带界面】

MFC使用教程【对初学者保姆型友好&#xff01;】 目录 前提条件 1&#xff1a;创建MFC应用程序 2. 项目结构解读 引用 外部依赖项 头文件 源文件 资源文件 文件功能详解 项目的主要流程 步骤2&#xff1a;配置OpenCV 安装OpenCV 包含目录与库文件 步骤3&#xff1…

云卓H30:引领科技与性能的完美融合!

在科技日新月异的今天&#xff0c;一款集高性能与便捷操作于一体的手持地面站成为了无人机、机器人等智能设备的得力助手。云卓H30手持地面站&#xff0c;凭借其搭载的高通骁龙660处理器&#xff0c;在多个适用场景中展现出了卓越的实力。 高通骁龙660&#xff0c;这款先进的移…