当前代码分析基于seata1.6.1
整体描述
整体代码流程可以描述为
- TM开启全局事务,会调用TC来获取XID。
- TC在接收到通知后,会生成XID,然后会将当前全局事务保存到global_table表中,并且返回XID。
- 在获取到XID后,会执行业务逻辑。
- 执行业务逻辑的时候,如果发生了增删改,则会对增删改语句做增强。
- 获取前置镜像数据---执行sql,不提交事务--获取后置镜像---准备undoLog---作为RM向TC提交事务分支---生成undo_log日志---提交本地事务,注意,在这里,本地事务已经提交了。只是有undo_log可用于回滚。
- TC接收RM端提交的分支事务,存储到brand_table中。
- 当全局分支事务都执行完成,TM会向TC提起全局事务提交的请求。
- TC接收到请求后,删除全局事务和分支事务(global_table 和 brand_table)。
- TC 通知RM,删除 undo log 日志。
源码解析
系统启动初始化
主要完成两个事情:初始化 TM和RM客户端;创建方法拦截器
在客户端中,核心配置类是SeataAutoConfiguration,在这个类中初始化了一个核心的扫描器GlobalTransactionScanner。
GlobalTransactionScanner 全局事务扫描器,实现了InitializingBean接口,如果继承了该接口,spring会在完成DI之后,调用afterPropertiesSet方法,在该方法中完成了对TM客户端和RM客户端的创建,代码如下
@Override
public void afterPropertiesSet() {if (disableGlobalTransaction) {if (LOGGER.isInfoEnabled()) {LOGGER.info("Global transaction is disabled.");}ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,(ConfigurationChangeListener)this);return;}if (initialized.compareAndSet(false, true)) {//创建客户端的方法initClient();}
}private void initClient() {...其他校验//创建TM客户端并且初始化TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);...//创建RM客户端并且初始化RMClient.init(applicationId, txServiceGroup);...}
同时,GlobalTransactionScanner 继承了AbstractAutoProxyCreator 抽象类,在类完成初始化之后,会调用父类的 postProcessAfterInitialization方法,在父类的方法中,会调用该类重写的一个wrapIfNecessary方法。
wrapIfNecessary 方法会生成一个 GlobalTransactionalInterceptor 全局事务拦截器。
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {// do checkersif (!doCheckers(bean, beanName)) {return bean;}try {synchronized (PROXYED_SET) {if (PROXYED_SET.contains(beanName)) {return bean;}interceptor = null;//check TCC proxyif (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {...} else {...//生成一个全局事务处理的拦截器,if (globalTransactionalInterceptor == null) {globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,(ConfigurationChangeListener)globalTransactionalInterceptor);}interceptor = globalTransactionalInterceptor;}...} catch (Exception exx) {throw new RuntimeException(exx);}}
TM端开启全局事务
开启全局事务
GlobalTransactionalInterceptor中的逻辑
忽略中间调用过程,最终会走到io.seata.tm.api.TransactionalTemplate#execute
在该类中,核心代码如下
public Object execute(TransactionalExecutor business) throws Throwable {...try {...try {//开启全局事务beginTransaction(txInfo, tx);Object rs;try {//进入业务代码,执行业务逻辑rs = business.execute();} catch (Throwable ex) {//当出现业务逻辑异常进行回滚completeTransactionAfterThrowing(txInfo, tx, ex);throw ex;}//所有分支事务无异常,提交全局事务commitTransaction(tx, txInfo);return rs;} finally {...}} finally {...}
}
开启全局事务的方法就在beginTransaction中,继续往下会去TC中获取一个XID,就是全局事务id
TC端接收全局事务请求后
记录全局事务
在server层的代码中,全局事务的入口方法为 io.seata.server.coordinator.DefaultCoordinator#doGlobalBegin
在该方法中会去调用core.begin方法,进入xid获取流程
@Override
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)throws TransactionException {//开始获取xidresponse.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),request.getTransactionName(), request.getTimeout()));if (LOGGER.isInfoEnabled()) {LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());}
}
忽略其他流程,关注核心,其调用往下的链路为
core.begin--session.begin--lifecycleListener.onBegin--找到子类方法--this.addGlobalSession--找子类方法
在这里,可以找到三个实现类,分别是代表了数据库、文件和redis实现,文件是默认实现,其他几种都需要进行配置,我们针对数据库实现进行描述。
方法路径为:io.seata.server.storage.db.session.DataBaseSessionManager#addGlobalSession
然后调用方法transactionStoreManager.writeSession,transactionStoreManager是一个接口,同样有三种实现
在writeSession方法中,会插入全局事务表数据,代码如下
/*** 插入全局事务表* 表为:global_table* @param globalTransactionDO the global transaction do* @return*/
@Override
public boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertGlobalTransactionSQL(globalTable);Connection conn = null;PreparedStatement ps = null;try {int index = 1;conn = logStoreDataSource.getConnection();conn.setAutoCommit(true);ps = conn.prepareStatement(sql);//插入xidps.setString(index++, globalTransactionDO.getXid());//插入事务idps.setLong(index++, globalTransactionDO.getTransactionId());//插入事务状态,begin = 1ps.setInt(index++, globalTransactionDO.getStatus());//插入应用id,一般是服务名ps.setString(index++, globalTransactionDO.getApplicationId());//插入事务组ps.setString(index++, globalTransactionDO.getTransactionServiceGroup());String transactionName = globalTransactionDO.getTransactionName();transactionName = transactionName.length() > transactionNameColumnSize ?transactionName.substring(0, transactionNameColumnSize) :transactionName;//插入事务名称ps.setString(index++, transactionName);//插入超时时间ps.setInt(index++, globalTransactionDO.getTimeout());//插入事务开始时间ps.setLong(index++, globalTransactionDO.getBeginTime());ps.setString(index++, globalTransactionDO.getApplicationData());return ps.executeUpdate() > 0;} catch (SQLException e) {throw new StoreException(e);} finally {IOUtil.close(ps, conn);}
}
RM执行业务代码,并且提交事务
代理数据源,生成undo log,并且通知TC
在seata中,需要配置数据源代理,这个代理会在执行增删改查的时候,对操作进行增强
这里核心需要关注的方法是io.seata.rm.datasource.exec.AbstractDMLBaseExecutor#doExecute
@Override
public T doExecute(Object... args) throws Throwable {AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();//一般在初始状态下,这个autoCommit是trueif (connectionProxy.getAutoCommit()) {return executeAutoCommitTrue(args);} else {return executeAutoCommitFalse(args);}
}
然后会调用 executeAutoCommitTrue 方法,该方法主要做了几个事情,分别是:获取前置镜像和后置镜像,并且制作undo_log;执行目标sql和插入undo_log;作为RM和TC进行交互,提交分支事务;以及提交事务。代码如下
executeAutoCommitTrue 方法代码
protected T executeAutoCommitTrue(Object[] args) throws Throwable {ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();try {//设置提交方式为手动提交connectionProxy.changeAutoCommit();return new LockRetryPolicy(connectionProxy).execute(() -> {//执行sql,并且准备前置镜像和后置镜像T result = executeAutoCommitFalse(args);//提交本地事务(内部会和RM进行交互)connectionProxy.commit();return result;});} catch (Exception e) {...异常处理} finally {connectionProxy.getContext().reset();connectionProxy.setAutoCommit(true);}
}
executeAutoCommitFalse方法代码
protected T executeAutoCommitFalse(Object[] args) throws Exception {//获取前置镜像TableRecords beforeImage = beforeImage();//执行目标sql,注意,这边执行完后,事务是未提交的T result = statementCallback.execute(statementProxy.getTargetStatement(), args);//获取后置镜像TableRecords afterImage = afterImage(beforeImage);//准备undo_logprepareUndoLog(beforeImage, afterImage);return result;
}
processGlobalTransactionCommit 方法,该方法就是connectionProxy.commit()最终指向的方法
private void processGlobalTransactionCommit() throws SQLException {try {//作为RM,向TC发起请求,注册分支事务,会插入数据到TC的mysql表中register();} catch (TransactionException e) {recognizeLockKeyConflictException(e, context.buildLockKeys());}try {//生成undo_log日志,用于事务回滚UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);//提交undo_log 回滚日志和本地事务,事务在这里已经提交了targetConnection.commit();} catch (Throwable ex) {...异常处理}...其他处理
}
TM提交全局事务
进行提交就是向TC发起请求,相关代码如下
@Override
public void commit() throws TransactionException {//判断当前角色,只有TM才能执行if (role == GlobalTransactionRole.Participant) {...return;}//XID不能为空assertXIDNotNull();int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;try {//可重试的执行,最多可执行5次while (retry > 0) {try {retry--;//向tc发起调用status = transactionManager.commit(xid);break;} catch (Throwable ex) {...}}} finally {...}...
}
TC处理全局事务
TC在接收到提交请求后,会由方法 io.seata.server.coordinator.DefaultCoordinator#doGlobalCommit 进行处理。
protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)throws TransactionException {MDC.put(RootContext.MDC_KEY_XID, request.getXid());//设置状态为异步提交状态response.setGlobalStatus(core.commit(request.getXid()));}
在AT模式下,事务的提交为异步的方式
public GlobalStatus commit(String xid) throws TransactionException {//获取全局session,不同模式获取方式不同,如果是db,则会从数据库获取GlobalSession globalSession = SessionHolder.findGlobalSession(xid);if (globalSession == null) {//如果获取不到session,返回已完成状态,一般在调用超时的时候会发生,这样也可以保证幂等return GlobalStatus.Finished;}...boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {if (globalSession.getStatus() == GlobalStatus.Begin) {// Highlight: Firstly, close the session, then no more branch can be registered.globalSession.closeAndClean();//判断是否可以异步提交,AT模式下可以异步提交if (globalSession.canBeCommittedAsync()) {//AT模式下异步事务提交globalSession.asyncCommit();MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.Committed, false, false);return false;} else {...}}return false;});...
}
最终将事务状态设置为异步提交
public void asyncCommit() throws TransactionException {this.addSessionLifecycleListener(SessionHolder.getAsyncCommittingSessionManager());//设置事务状态为异步提交,这里在设置为异步提交后就不管了this.setStatus(GlobalStatus.AsyncCommitting);SessionHolder.getAsyncCommittingSessionManager().addGlobalSession(this);}
TC异步执行全局事务commit
核心逻辑为 io.seata.server.coordinator.DefaultCoordinator#init
该方法会异步的去进行处理,每秒执行一次
/*** Init.*/public void init() {...//异步处理的部分//会从global中,每次取100条进行处理,并且删除这100条数据,然后遍历brand_table,根据global_table取删除//操作完了之后,向RM进行通知,进行undo_log的删除asyncCommitting.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(ASYNC_COMMITTING, this::handleAsyncCommitting), 0,ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);...}
未完待续...
以下是经过注释的源码地址:seata: Seata 是一款开源的分布式事务解决方案,提供高性能和简单易用的分布式事务服务 - Gitee.com