【Seata源码学习 】篇五 注册分支事务

【Seata源码学习 】篇五 分支事务注册

1.远程服务调用绑定XID

回到事务模版方法类TransactionalTemplate中

                beginTransaction(txInfo, tx);Object rs;try {// Do Your Business// 执行执行拦截器链路rs = business.execute();} catch (Throwable ex) {// 3. The needed business exception to rollback.// 如果抛出异常,判断异常是否在指定的范围中(默认为Throwable类及其子类)// 执行异常回滚的前后钩子方法// 如果当前事务的角色是 launcher 也就是 TM ,通过TmNettyRemotingClient  向TC发送一个 GlobalRollbackRequest 同步消息// 并记录TC返回的当前事务状态StatuscompleteTransactionAfterThrowing(txInfo, tx, ex);throw ex;}

beginTransaction 开启全局事务我们已经在上一篇看过了,最终会将TC返回到XID绑定到RootContext中,并且TC会将全局事务会话持久化,通常我们使用DB,那么将会往seata数据库的global_table中插入一条数据。

接下来就要执行拦截器链路,最后一个拦截器执行完毕后如果无异常就要执行目标方法。调用目标方法通常会使用到第三方的RPC框架,例如 Fegin,RestTemplate等等

在引入的spring-cloud-seata包下,基于SpringbootStater机制,将会自动装配四个配置类,我们就看最常用的SeataFeignClientAutoConfiguration

image-20231123095810260

public class SeataFeignClientAutoConfiguration {public SeataFeignClientAutoConfiguration() {}@Bean@Scope("prototype")@ConditionalOnClass(name = {"com.netflix.hystrix.HystrixCommand"})@ConditionalOnProperty(name = {"feign.hystrix.enabled"},havingValue = "true")Feign.Builder feignHystrixBuilder(BeanFactory beanFactory) {return SeataHystrixFeignBuilder.builder(beanFactory);}@Bean@Scope("prototype")@ConditionalOnClass(name = {"com.alibaba.csp.sentinel.SphU"})@ConditionalOnProperty(name = {"feign.sentinel.enabled"},havingValue = "true")Feign.Builder feignSentinelBuilder(BeanFactory beanFactory) {return SeataSentinelFeignBuilder.builder(beanFactory);}@Bean@ConditionalOnMissingBean@Scope("prototype")Feign.Builder feignBuilder(BeanFactory beanFactory) {return SeataFeignBuilder.builder(beanFactory);}
}

实际上无论是否集成hystrix或者sentinel , 最终构建的都是 SeataFeignClient 对象

static Feign.Builder builder(BeanFactory beanFactory) {return SentinelFeign.builder().retryer(Retryer.NEVER_RETRY).client(new SeataFeignClient(beanFactory));}

而SeataFeignClient在发送请求时,会将Request进行修改

com.alibaba.cloud.seata.feign.SeataFeignClient#execute

 public Response execute(Request request, Request.Options options) throws IOException {//修改请求信息Request modifiedRequest = this.getModifyRequest(request);return this.delegate.execute(modifiedRequest, options);}
    private Request getModifyRequest(Request request) {//获取开启全局事务时绑定到RootContext中到XIDString xid = RootContext.getXID();if (StringUtils.isEmpty(xid)) {return request;} else {// 放到请求头中Map<String, Collection<String>> headers = new HashMap(16);headers.putAll(request.headers());List<String> seataXid = new ArrayList();seataXid.add(xid);headers.put("TX_XID", seataXid);// 返回添加了 TX_XID 请求头的新Requestreturn Request.create(request.method(), request.url(), headers, request.body(), request.charset());}}

这里需要注意一点,在发起RPC调用时,需要保证跟开启全局事务的方法在同一个线程中,因为RootContext是将XID存储在ThreadLocal中,每个Thread都绑定了自己的ThreadLocal.ThreadLocalMap,而ThreadLocal.get就是从当前线程中获取ThreadLocalMap,再去获取存储在线程中的变量。

2.数据源代理

image-20231123104326151

public class SeataDataSourceAutoConfiguration {/*** The bean seataAutoDataSourceProxyCreator.*/@Bean(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR)@ConditionalOnMissingBean(SeataAutoDataSourceProxyCreator.class)public SeataAutoDataSourceProxyCreator seataAutoDataSourceProxyCreator(SeataProperties seataProperties) {// spring上下文中添加SeataAutoDataSourceProxyCreatorreturn new SeataAutoDataSourceProxyCreator(seataProperties.isUseJdkProxy(),seataProperties.getExcludesForAutoProxying(), seataProperties.getDataSourceProxyMode());}
}

SeataAutoDataSourceProxyCreator 继承了 AbstractAutoProxyCreator

    private Object[] buildAdvisors(String dataSourceProxyMode) {// 使用 SeataAutoDataSourceProxyAdvice 对目标方法进行拦截Advice advice = new SeataAutoDataSourceProxyAdvice(dataSourceProxyMode);return new Object[]{new DefaultIntroductionAdvisor(advice)};}@Overrideprotected Object[] getAdvicesAndAdvisorsForBean(Class<?> beanClass, String beanName, TargetSource customTargetSource) {return advisors;}@Overrideprotected boolean shouldSkip(Class<?> beanClass, String beanName) {//排除指定列表的beanif (excludes.contains(beanClass.getName())) {return true;}//对数据源对象进行拦截return SeataProxy.class.isAssignableFrom(beanClass);}protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {// we only care DataSource beanif (!(bean instanceof DataSource)) {return bean;}// when this bean is just a simple DataSource, not SeataDataSourceProxyif (!(bean instanceof SeataDataSourceProxy)) {Object enhancer = super.wrapIfNecessary(bean, beanName, cacheKey);// this mean this bean is either excluded by user or had been proxy beforeif (bean == enhancer) {return bean;}// else, build proxy,  put <origin, proxy> to holder and return enhancerDataSource origin = (DataSource) bean;// 创建代理类 AT模式为 DataSourceProxy  XA模式为 DataSourceProxyXASeataDataSourceProxy proxy = buildProxy(origin, dataSourceProxyMode);// 将 DataSource 与  SeataDataSourceProxy 映射缓存DataSourceProxyHolder.put(origin, proxy);return enhancer;}

io.seata.spring.annotation.datasource.SeataAutoDataSourceProxyAdvice#invoke

    public Object invoke(MethodInvocation invocation) throws Throwable {// check whether current context is expectedif (!inExpectedContext()) {return invocation.proceed();}Method method = invocation.getMethod();String name = method.getName();Class<?>[] parameterTypes = method.getParameterTypes();Method declared;try {declared = DataSource.class.getDeclaredMethod(name, parameterTypes);} catch (NoSuchMethodException e) {// mean this method is not declared by DataSourcereturn invocation.proceed();}// switch invoke instance to its proxyDataSource origin = (DataSource) invocation.getThis();SeataDataSourceProxy proxy = DataSourceProxyHolder.get(origin);Object[] args = invocation.getArguments();// 方法由 SeataDataSourceProxy 代理类执行return declared.invoke(proxy, args);}

获取Connection对象

假设当前为seata的全局事务模式为AT,当原始数据源调用getConnection方法获取连接时,实际上调用的是DataSourceProxy.getConnection方法,拿到的是ConnectionProxy对象

    @Overridepublic ConnectionProxy getConnection() throws SQLException {Connection targetConnection = targetDataSource.getConnection();return new ConnectionProxy(this, targetConnection);}@Overridepublic ConnectionProxy getConnection(String username, String password) throws SQLException {Connection targetConnection = targetDataSource.getConnection(username, password);return new ConnectionProxy(this, targetConnection);}

获取Statement对象

AbstractConnectionProxy

 public Statement createStatement() throws SQLException {Statement targetStatement = getTargetConnection().createStatement();return new StatementProxy(this, targetStatement);}@Overridepublic PreparedStatement prepareStatement(String sql) throws SQLException {String dbType = getDbType();// support oracle 10.2+PreparedStatement targetPreparedStatement = null;if (BranchType.AT == RootContext.getBranchType()) {List<SQLRecognizer> sqlRecognizers = SQLVisitorFactory.get(sql, dbType);if (sqlRecognizers != null && sqlRecognizers.size() == 1) {SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);if (sqlRecognizer != null && sqlRecognizer.getSQLType() == SQLType.INSERT) {TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dbType).getTableMeta(getTargetConnection(),sqlRecognizer.getTableName(), getDataSourceProxy().getResourceId());String[] pkNameArray = new String[tableMeta.getPrimaryKeyOnlyName().size()];tableMeta.getPrimaryKeyOnlyName().toArray(pkNameArray);targetPreparedStatement = getTargetConnection().prepareStatement(sql,pkNameArray);}}}if (targetPreparedStatement == null) {targetPreparedStatement = getTargetConnection().prepareStatement(sql);}return new PreparedStatementProxy(this, targetPreparedStatement, sql);}

而获取的Statement 与 PreparedStatement 分别是 StatementProxy 与 PreparedStatementProxy,而两者都是由ExecuteTemplate来执行sql语句,假设RM此时要执行的业务是更新操作

   public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,StatementProxy<S> statementProxy,StatementCallback<T, S> statementCallback,Object... args) throws SQLException {// 如果当前即不需要全局锁 并且事务类型不为AT模式,那么直接执行原生SQLif (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {// Just work as original statementreturn statementCallback.execute(statementProxy.getTargetStatement(), args);}// 获取数据源类型String dbType = statementProxy.getConnectionProxy().getDbType();if (CollectionUtils.isEmpty(sqlRecognizers)) {sqlRecognizers = SQLVisitorFactory.get(statementProxy.getTargetSQL(),dbType);}Executor<T> executor;if (CollectionUtils.isEmpty(sqlRecognizers)) {executor = new PlainExecutor<>(statementProxy, statementCallback);} else {if (sqlRecognizers.size() == 1) {SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);// 区分当前执行的SQL语句类型switch (sqlRecognizer.getSQLType()) {case INSERT:executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},new Object[]{statementProxy, statementCallback, sqlRecognizer});break;// 假设当前执行的是UPDATAE语句case UPDATE:executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);break;case DELETE:executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);break;case SELECT_FOR_UPDATE:executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);break;case INSERT_ON_DUPLICATE_UPDATE:switch (dbType) {case JdbcConstants.MYSQL:case JdbcConstants.MARIADB:executor =new MySQLInsertOrUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);break;default:throw new NotSupportYetException(dbType + " not support to INSERT_ON_DUPLICATE_UPDATE");}break;default:executor = new PlainExecutor<>(statementProxy, statementCallback);break;}} else {executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);}}T rs;try {// 执行器执行SQL语句rs = executor.execute(args);} catch (Throwable ex) {if (!(ex instanceof SQLException)) {// Turn other exception into SQLExceptionex = new SQLException(ex);}throw (SQLException) ex;}return rs;}

UPDATE语句使用的是UpdateExecutor执行器,而UpdateExecutor的类关系图如下

image-20231126101103175

由父类BaseTransactionalExecutor 调用execute方法,将RootContext中的XID绑定到StatementProxy中,并标识是否需要全局锁

   public T execute(Object... args) throws Throwable {String xid = RootContext.getXID();if (xid != null) {statementProxy.getConnectionProxy().bind(xid);}statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());return doExecute(args);}

关闭自动提交事务并执行sql

接着调用子类AbstractDMLBaseExecutor的doExecute方法,主要是关闭自动提交,然后执行 executeAutoCommitFalse 方法

    public T doExecute(Object... args) throws Throwable {AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();// connectionProxy.getAutoCommit() 会判断原始数据源对象是否是自动提交事务 (JDBC默认提交事务)// 但在seata中 即使原始数据源是自动化提交,在 executeAutoCommitTrue 方法中也会关闭自动提交事务if (connectionProxy.getAutoCommit()) {// 将自动提交设置为false,再调用executeAutoCommitFalsereturn executeAutoCommitTrue(args);} else {return executeAutoCommitFalse(args);}}

io.seata.rm.datasource.exec.AbstractDMLBaseExecutor#executeAutoCommitFalse

    protected T executeAutoCommitFalse(Object[] args) throws Exception {// 要求当前数据源是MYSQL, 且不能是多主键if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {throw new NotSupportYetException("multi pk only support mysql!");}// 封装前镜像数据 TableRecords 记录了每一个字段对应的值,且获取的是当前最新的数据TableRecords beforeImage = beforeImage();// 执行sql语句T result = statementCallback.execute(statementProxy.getTargetStatement(), args);int updateCount = statementProxy.getUpdateCount();//修改成功if (updateCount > 0) {//封装后镜像数据TableRecords afterImage = afterImage(beforeImage);//将前后镜像打包成回滚日志 其中包括全局锁信息prepareUndoLog(beforeImage, afterImage);}return result;}

博主使用的seata版本是1.5.2,此版本不支持多主键.

为了保证前镜像获取的数据是最新的,创建SQL查询语句时会拼接 FOR UPDATE ,使用当前读的方式。成功修改数据后,会将修改后的数据也构建成镜像数据,最后由前后镜像一起组成回滚日志。

提交事务

由于我们获取的数据源连接对象时ConnectionProxy,因此提交事务时也时此对象进行提交

io.seata.rm.datasource.ConnectionProxy#commit

    public void commit() throws SQLException {try {//由lockRetryPolicy手动提交事务 支持全局锁获取失败重试机制lockRetryPolicy.execute(() -> {doCommit();return null;});} catch (SQLException e) {if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {// 如果出现SQLException 异常 回滚本地事务,向TC报告,并移除RootContext中的全局事务XIDrollback();}throw e;} catch (Exception e) {throw new SQLException(e);}}

io.seata.rm.datasource.ConnectionProxy#doCommit

    private void doCommit() throws SQLException {//全局事务XID不为空if (context.inGlobalTransaction()) {processGlobalTransactionCommit();//要求全局锁} else if (context.isGlobalLockRequire()) {processLocalCommitWithGlobalLocks();} else {//都不是直接提交targetConnection.commit();}}

io.seata.rm.datasource.ConnectionProxy#processGlobalTransactionCommit

    public void commit() throws SQLException {try {//由lockRetryPolicy手动提交事务 支持全局锁获取失败重试机制// 如果是获取全局锁失败,将会重试lockRetryPolicy.execute(() -> {doCommit();return null;});} catch (SQLException e) {if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {// 如果出现SQLException 异常 回滚本地事务,向TC报告,并移除RootContext中的全局事务XIDrollback();}throw e;} catch (Exception e) {throw new SQLException(e);}}

由lockRetryPolicy手动提交事务,而如果出现全局锁冲突,将会在线程休眠后重试

io.seata.rm.datasource.ConnectionProxy.LockRetryPolicy#execute

        protected <T> T doRetryOnLockConflict(Callable<T> callable) throws Exception {LockRetryController lockRetryController = new LockRetryController();while (true) {try {return callable.call();} catch (LockConflictException lockConflict) {//先回滚本地事务onException(lockConflict);// AbstractDMLBaseExecutor#executeAutoCommitTrue the local lock is releasedif (connection.getContext().isAutoCommitChanged()&& lockConflict.getCode() == TransactionExceptionCode.LockKeyConflictFailFast) {lockConflict.setCode(TransactionExceptionCode.LockKeyConflict);}//休眠10秒收重试lockRetryController.sleep(lockConflict);} catch (Exception e) {onException(e);throw e;}}}

io.seata.rm.datasource.ConnectionProxy#processGlobalTransactionCommit

private void processGlobalTransactionCommit() throws SQLException {try {//在回滚日志已经准备好的情况下 向TC注册分支事务,并将分支事务ID保存到ConnectionContext中register();} catch (TransactionException e) {recognizeLockKeyConflictException(e, context.buildLockKeys());}try {// 将回滚日志刷新到数据库中 等待和本地事务一起提交UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);//本地事务提交targetConnection.commit();} catch (Throwable ex) {LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);report(false);throw new SQLException(ex);}if (IS_REPORT_SUCCESS_ENABLE) {report(true);}context.reset();}

Undo回滚日志将和本地事务一起提交,但在提交前,需要先注册分支事务ID

io.seata.rm.AbstractResourceManager#branchRegister

    public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {try {BranchRegisterRequest request = new BranchRegisterRequest();request.setXid(xid);request.setLockKey(lockKeys);request.setResourceId(resourceId);request.setBranchType(branchType);request.setApplicationData(applicationData);BranchRegisterResponse response = (BranchRegisterResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);if (response.getResultCode() == ResultCode.Failed) {throw new RmTransactionException(response.getTransactionExceptionCode(), String.format("Response[ %s ]", response.getMsg()));}return response.getBranchId();} catch (TimeoutException toe) {throw new RmTransactionException(TransactionExceptionCode.IO, "RPC Timeout", toe);} catch (RuntimeException rex) {throw new RmTransactionException(TransactionExceptionCode.BranchRegisterFailed, "Runtime", rex);}}

注册分支事务需要将当前全局事务的XID,全局锁,资源信息,分支事务类型以及应用数据,一起构建成BranchRegisterRequest消息发送给TC,具体如下数据所示

{"branchType": "AT","lockKey": "product_table:1","resourceId": "jdbc:mysql://localhost:3306/seata-test","typeCode": 11,"xid": "xxx.xx.xx.xxx:8091:1378562425205329921"
}

3.TC处理 BranchRegisterRequest 消息

TC在处理 BranchRegisterRequest消息 依然也是 ServerOnRequestProcessor,最终交由AbstractTCInboundHandle进行处理

io.seata.server.AbstractTCInboundHandler#handle(io.seata.core.protocol.transaction.BranchRegisterRequest, io.seata.core.rpc.RpcContext)

 public BranchRegisterResponse handle(BranchRegisterRequest request, final RpcContext rpcContext) {BranchRegisterResponse response = new BranchRegisterResponse();exceptionHandleTemplate(new AbstractCallback<BranchRegisterRequest, BranchRegisterResponse>() {@Overridepublic void execute(BranchRegisterRequest request, BranchRegisterResponse response)throws TransactionException {try {//真正处理分支注册doBranchRegister(request, response, rpcContext);} catch (StoreException e) {throw new TransactionException(TransactionExceptionCode.FailedStore, String.format("branch register request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()), e);}}}, request, response);return response;}

io.seata.server.coordinator.DefaultCoordinator#doBranchRegister

 @Overrideprotected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response,RpcContext rpcContext) throws TransactionException {MDC.put(RootContext.MDC_KEY_XID, request.getXid());// 获取分支IDresponse.setBranchId(core.branchRegister(request.getBranchType(), request.getResourceId(), rpcContext.getClientId(),request.getXid(), request.getApplicationData(), request.getLockKey()));}

io.seata.server.coordinator.DefaultCore#branchRegister

 @Overridepublic Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,String applicationData, String lockKeys) throws TransactionException {// 当前事务类型为AT模式,返回 ATCore 在调用 branchRegister 方法return getCore(branchType).branchRegister(branchType, resourceId, clientId, xid,applicationData, lockKeys);}

io.seata.server.coordinator.AbstractCore#branchRegister

 public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,String applicationData, String lockKeys) throws TransactionException {GlobalSession globalSession = assertGlobalSessionNotNull(xid, false);return SessionHolder.lockAndExecute(globalSession, () -> {// 全局事务当前必须是激活且是beging状态 否则抛出异常globalSessionStatusCheck(globalSession);globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());//注册分支事务回话(将分支事务信息绑定上全局事务XID)BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,applicationData, lockKeys, clientId);MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));//加全局锁branchSessionLock(globalSession, branchSession);try {//将分支事务会话添加到全局事务会话到集合中,并向监听器添加onAddBranch事件,将分支事务会话持久到数据库中globalSession.addBranch(branchSession);} catch (RuntimeException ex) {//如果出现异常,从数据库中删除分支事务信息branchSessionUnlock(branchSession);throw new BranchTransactionException(FailedToAddBranch, String.format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(),branchSession.getBranchId()), ex);}if (LOGGER.isInfoEnabled()) {LOGGER.info("Register branch successfully, xid = {}, branchId = {}, resourceId = {} ,lockKeys = {}",globalSession.getXid(), branchSession.getBranchId(), resourceId, lockKeys);}return branchSession.getBranchId();});}

首先会判断当前全局事务的状态,接着创建一个新的BranchSession对象,生成branchId,并绑定上XID;接着就是尝试加全局锁,加锁失败会抛出异常;然后再将分支事务会话添加到全局事务会话到集合中,并向监听器添加onAddBranch事件,将分支事务会话持久到数据库中,并返回分支事务ID。如果此过程出现异常,则会删除分支事务信息。

判断全局事务状态

            // 全局事务当前必须是激活且是beging状态 否则抛出异常globalSessionStatusCheck(globalSession);

io.seata.server.coordinator.AbstractCore#globalSessionStatusCheck

protected void globalSessionStatusCheck(GlobalSession globalSession) throws GlobalTransactionException {//全局事务必须是激活状态if (!globalSession.isActive()) {throw new GlobalTransactionException(GlobalTransactionNotActive, String.format("Could not register branch into global session xid = %s status = %s, cause by globalSession not active",globalSession.getXid(), globalSession.getStatus()));}//全局事务必须是begin状态if (globalSession.getStatus() != GlobalStatus.Begin) {throw new GlobalTransactionException(GlobalTransactionStatusInvalid, String.format("Could not register branch into global session xid = %s status = %s while expecting %s",globalSession.getXid(), globalSession.getStatus(), GlobalStatus.Begin));}}

全局事务当前必须是激活且是beging状态 否则抛出异常

注册分支事务

BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,applicationData, lockKeys, clientId);

io.seata.server.session.SessionHelper#newBranchByGlobal(io.seata.server.session.GlobalSession, io.seata.core.model.BranchType, java.lang.String, java.lang.String, java.lang.String, java.lang.String)

 public static BranchSession newBranchByGlobal(GlobalSession globalSession, BranchType branchType, String resourceId,String applicationData, String lockKeys, String clientId) {//创建新的BranchSession对象BranchSession branchSession = new BranchSession();//分别绑定XID,事务ID,新生成的分支事务id 并设置事务类型与资源信息,锁信息,RM客户端信息(服务名和id)branchSession.setXid(globalSession.getXid());branchSession.setTransactionId(globalSession.getTransactionId());branchSession.setBranchId(UUIDGenerator.generateUUID());branchSession.setBranchType(branchType);branchSession.setResourceId(resourceId);branchSession.setLockKey(lockKeys);branchSession.setClientId(clientId);branchSession.setApplicationData(applicationData);return branchSession;}

最终的分支信息如图所示

image-20231126172153705

添加全局锁

//加全局锁branchSessionLock(globalSession, branchSession);

io.seata.server.transaction.at.ATCore#branchSessionLock

 @Overrideprotected void branchSessionLock(GlobalSession globalSession, BranchSession branchSession)throws TransactionException {String applicationData = branchSession.getApplicationData();boolean autoCommit = true;boolean skipCheckLock = false;//applicationData为空 往下走if (StringUtils.isNotBlank(applicationData)) {if (objectMapper == null) {objectMapper = new ObjectMapper();}try {Map<String, Object> data = objectMapper.readValue(applicationData, HashMap.class);Object clientAutoCommit = data.get(AUTO_COMMIT);if (clientAutoCommit != null && !(boolean)clientAutoCommit) {autoCommit = (boolean)clientAutoCommit;}Object clientSkipCheckLock = data.get(SKIP_CHECK_LOCK);if (clientSkipCheckLock instanceof Boolean) {skipCheckLock = (boolean)clientSkipCheckLock;}} catch (IOException e) {LOGGER.error("failed to get application data: {}", e.getMessage(), e);}}try {//加全局锁if (!branchSession.lock(autoCommit, skipCheckLock)) {throw new BranchTransactionException(LockKeyConflict,String.format("Global lock acquire failed xid = %s branchId = %s", globalSession.getXid(),branchSession.getBranchId()));}} catch (StoreException e) {if (e.getCause() instanceof BranchTransactionException) {throw new BranchTransactionException(((BranchTransactionException)e.getCause()).getCode(),String.format("Global lock acquire failed xid = %s branchId = %s", globalSession.getXid(),branchSession.getBranchId()));}throw e;}}

io.seata.server.session.BranchSession#lock(boolean, boolean)

public boolean lock(boolean autoCommit, boolean skipCheckLock) throws TransactionException {if (this.getBranchType().equals(BranchType.AT)) {//获取锁资源管理器执行加速操作return LockerManagerFactory.getLockManager().acquireLock(this, autoCommit, skipCheckLock);}return true;}

无论是哪种锁资源管理器,其调用acquireLock都是走的父类AbstractLockManager的方法

io.seata.server.lock.AbstractLockManager#acquireLock(io.seata.server.session.BranchSession, boolean, boolean)

 @Overridepublic boolean acquireLock(BranchSession branchSession, boolean autoCommit, boolean skipCheckLock) throws TransactionException {if (branchSession == null) {throw new IllegalArgumentException("branchSession can't be null for memory/file locker.");}// lockKey通常是由 表名 + 主键组成String lockKey = branchSession.getLockKey();if (StringUtils.isNullOrEmpty(lockKey)) {// no lockreturn true;}// get locks of branch// 使用 ; 分割lockKey 获取RowLock集合,RowLock封装了 XID,分支事务ID,表民,主键等信息List<RowLock> locks = collectRowLocks(branchSession);if (CollectionUtils.isEmpty(locks)) {// no lockreturn true;}// 获取枷锁操作对象,调用acquireLock方法return getLocker(branchSession).acquireLock(locks, autoCommit, skipCheckLock);}

我们就看 DataBaseLocker 是怎么加锁的

    public boolean acquireLock(List<RowLock> locks, boolean autoCommit, boolean skipCheckLock) {// 首先RowLock集合对象不能为空if (CollectionUtils.isEmpty(locks)) {// no lockreturn true;}try {return lockStore.acquireLock(convertToLockDO(locks), autoCommit, skipCheckLock);} catch (StoreException e) {throw e;} catch (Exception t) {LOGGER.error("AcquireLock error, locks:{}", CollectionUtils.toString(locks), t);return false;}}

io.seata.server.storage.db.lock.LockStoreDataBaseDAO#acquireLock(java.util.List<io.seata.core.store.LockDO>, boolean, boolean)

 public boolean acquireLock(List<LockDO> lockDOs, boolean autoCommit, boolean skipCheckLock) {Connection conn = null;PreparedStatement ps = null;ResultSet rs = null;Set<String> dbExistedRowKeys = new HashSet<>();boolean originalAutoCommit = true;if (lockDOs.size() > 1) {lockDOs = lockDOs.stream().filter(LambdaUtils.distinctByKey(LockDO::getRowKey)).collect(Collectors.toList());}try {conn = lockStoreDataSource.getConnection();if (originalAutoCommit = conn.getAutoCommit()) {conn.setAutoCommit(false);}List<LockDO> unrepeatedLockDOs = lockDOs;//check lockif (!skipCheckLock) {boolean canLock = true;//query// select * from lock_table where row_key in ( ? ) order by status desc// RowKey 由数据库资源信息 + 表名 + 主键组成String checkLockSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getCheckLockableSql(lockTable, lockDOs.size());ps = conn.prepareStatement(checkLockSQL);for (int i = 0; i < lockDOs.size(); i++) {ps.setString(i + 1, lockDOs.get(i).getRowKey());}rs = ps.executeQuery();String currentXID = lockDOs.get(0).getXid();boolean failFast = false;// 如果在lock_tabel中根据  RowKey 找到了记录,需要判断当前xid是否与数据库中的xid是否一致// 如果不一致 加速失败while (rs.next()) {String dbXID = rs.getString(ServerTableColumnsName.LOCK_TABLE_XID);if (!StringUtils.equals(dbXID, currentXID)) {if (LOGGER.isInfoEnabled()) {String dbPk = rs.getString(ServerTableColumnsName.LOCK_TABLE_PK);String dbTableName = rs.getString(ServerTableColumnsName.LOCK_TABLE_TABLE_NAME);long dbBranchId = rs.getLong(ServerTableColumnsName.LOCK_TABLE_BRANCH_ID);LOGGER.info("Global lock on [{}:{}] is holding by xid {} branchId {}", dbTableName, dbPk, dbXID, dbBranchId);}if (!autoCommit) {int status = rs.getInt(ServerTableColumnsName.LOCK_TABLE_STATUS);if (status == LockStatus.Rollbacking.getCode()) {failFast = true;}}canLock = false;break;}// 将已经存在的全局锁缓冲起来dbExistedRowKeys.add(rs.getString(ServerTableColumnsName.LOCK_TABLE_ROW_KEY));}if (!canLock) {// 加锁失败 回滚保存分支事务信息的sqlconn.rollback();if (failFast) {throw new StoreException(new BranchTransactionException(LockKeyConflictFailFast));}return false;}// If the lock has been exists in db, remove it from the lockDOsif (CollectionUtils.isNotEmpty(dbExistedRowKeys)) {unrepeatedLockDOs = lockDOs.stream().filter(lockDO -> !dbExistedRowKeys.contains(lockDO.getRowKey())).collect(Collectors.toList());}if (CollectionUtils.isEmpty(unrepeatedLockDOs)) {conn.rollback();return true;}}// lockif (unrepeatedLockDOs.size() == 1) {LockDO lockDO = unrepeatedLockDOs.get(0);// 生成sql语句,将全局锁信息插入到lock_table表中if (!doAcquireLock(conn, lockDO)) {if (LOGGER.isInfoEnabled()) {LOGGER.info("Global lock acquire failed, xid {} branchId {} pk {}", lockDO.getXid(), lockDO.getBranchId(), lockDO.getPk());}conn.rollback();return false;}} else {if (!doAcquireLocks(conn, unrepeatedLockDOs)) {if (LOGGER.isInfoEnabled()) {LOGGER.info("Global lock batch acquire failed, xid {} branchId {} pks {}", unrepeatedLockDOs.get(0).getXid(),unrepeatedLockDOs.get(0).getBranchId(), unrepeatedLockDOs.stream().map(lockDO -> lockDO.getPk()).collect(Collectors.toList()));}conn.rollback();return false;}}conn.commit();return true;} catch (SQLException e) {throw new StoreException(e);} finally {IOUtil.close(rs, ps);if (conn != null) {try {if (originalAutoCommit) {conn.setAutoCommit(true);}conn.close();} catch (SQLException e) {}}}}

注册全局锁之前会根据RowKey查询全局锁信息是否已经存在,生成的查询语句如下

select * from lock_table where row_key in ( ? ) order by status desc

RowKey如下所示

jdbc:mysql://localhost:3306/seata-test^^^product_table^^^1

如果locktable表中已经有数据了,则比较数据中XID与当前当前的xid是否一致,如果不一致说明已经被其他全局事务加了锁,则标记当前加锁失败,并抛出异常;如果一致,则将当前的RowKey缓冲到set集合中。最后调用LockStoreDataBaseDAO.doAcquireLock方法将锁信息保存到数据库lock_table表中。

持久化分支信息

成功添加上全局锁后,就要将分支事务会话添加到全局事务会话到集合中,并向监听器添加onAddBranch事件,将分支事务会话持久到数据库中

globalSession.addBranch(branchSession);

io.seata.server.session.GlobalSession#addBranch

    public void addBranch(BranchSession branchSession) throws TransactionException {for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {// 分支事务持久化lifecycleListener.onAddBranch(this, branchSession);}// 将分支事务标记为已注册状态branchSession.setStatus(BranchStatus.Registered);// 将分支事务添加到GlobalSession到List集合中add(branchSession);}

返回分支事务ID

 return branchSession.getBranchId();

最后携带刚生成的分支事务ID返回BranchRegisterResponse消息给到RM

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/205353.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Diffusion:通过扩散和逆扩散过程生成图像的生成式模型

在当今人工智能大火的时代&#xff0c;AIGC 可以帮助用户完成各种任务。作为 AIGC 主流模型的 DDPM&#xff0c;也时常在各种论文中被提起。DDPM 本质就是一种扩散模型&#xff0c;可以用来生成图片或者为图片去噪。 扩散模型定义了一个扩散的马尔科夫过程&#xff0c;每一步逐…

Day01 嵌入式 -----流水灯

一、简单介绍 嵌入式系统中的流水灯是一种常见的示例项目&#xff0c;通常用于演示嵌入式系统的基本功能和控制能力。流水灯由多个发光二极管&#xff08;LED&#xff09;组成&#xff0c;这些LED按照一定的顺序依次点亮和熄灭&#xff0c;形成一种像水流一样的流动效果。 二、…

VsCode连接远程Linux编译环境的便捷处理

1.免输登录密码 免输命令的正确方法是使用公钥和私鈅在研发设备&#xff0c;和linux服务器上校验身份。公钥和私钥可在windows系统上生成。公钥要发送到linux服务器。私钥需要通知给本地的ssh客户端程序&#xff0c;相关的操作如下&#xff1a; 生成 SSH Key&#xff1a; 打开…

uniapp小程序项目连接微信客服【最新/最全教程】

目录 文档微信官网文档图片微信小程序客服配置官网 效果图聊天地址手机微信电脑端 微信聊天功能实现微信小程序后台添加客服微信号以及配置代码实现参考最后 文档 微信官网文档 微信官网文档 图片 微信小程序客服配置官网 微信小程序客服配置官网 效果图 聊天地址 地址 手…

解决PDF预览时,电子签章、日期等不显示问题

文章目录 问题描述问题排查问题解决 问题描述 在预览PDF时&#xff0c;部分签章或控件没有显示。如下图&#xff1a; 正确应该要这样&#xff1a; 问题排查 根据网上搜索&#xff0c;排查&#xff0c;我先看看&#xff0c;pdf.worker.js 里的这三行代码&#xff0c;是否已经注…

将原生Spring项目中某些配置文件中的易变内容参数化

&#x1f609;&#x1f609; 学习交流群&#xff1a; ✅✅1&#xff1a;这是孙哥suns给大家的福利&#xff01; ✨✨2&#xff1a;我们免费分享Netty、Dubbo、k8s、Mybatis、Spring...应用和源码级别的视频资料 &#x1f96d;&#x1f96d;3&#xff1a;QQ群&#xff1a;583783…

获得文件MD5——校验完整性 window 和 Linux下操作

目录 引出window下获得文件MD5Linux下获得文件MD5单个文件整个目录下所有文件检查MD5 总结 引出 1.Windows 10 自带了一个命令行程序 certutil可以 获取文件的 MD5 值&#xff1b; 2.Linux下md5sum命令获得文件MD5值&#xff1b; window下获得文件MD5 Windows 10 自带了一个命…

王道p150 14.假设二叉树采用二叉链表存储结构,设计一个算法,求非空二叉树 b的宽度(即具有结点数最多的那一层的结点个数) (c语言代码实现)

采用层次遍历的方法求出所有结点的层次&#xff0c;并将所有结点和对应的层次放在一个队列中。然后通过扫描队列求出各层的结点总数&#xff0c;最大的层结点总数即为二叉树的宽度。 /* A B C D E F …

Java实现通过经纬度求两个任意地点在球面上的距离

我们在实际开发中会获取对应的经纬度&#xff0c;可以使用ES大数据搜索引擎进行计算对应区域的数据&#xff0c;那我们在如何根据两个经纬度获取对应的球面距离&#xff0c;就是在地球上从一个地点到另一个地点的直线距离 工具类如下: public class GeoUtils {// 地球半径&am…

ANN人工神经网络:从基础认知到现实理解

什么是神经网络&#xff1f; 神经网络的再认知 前面我们了解过&#xff0c;人工神经网络&#xff08;Artificial Neural Network&#xff0c;ANN&#xff09;是人类为了模仿人大脑的神经网络结构创建出来的一种计算机系统结构。但如果仔细深入到神经网络当中&#xff0c;会慢…

数据库管理-第118期 记一次开启附加日志导致的性能问题(202301129)

数据库管理-第118期 记一次开启附加日志导致的性能问题&#xff08;202301129&#xff09; 本周二凌晨&#xff0c;为了配合某国产数据库从Oracle数据库能够实时同步数据&#xff0c;在X9M那套一体机上做了开启附加日志的操作&#xff0c;也正是因为这个操作带来了一些小问题。…

【机器学习 | 可视化系列】可视化系列 之 决策树可视化

&#x1f935;‍♂️ 个人主页: AI_magician &#x1f4e1;主页地址&#xff1a; 作者简介&#xff1a;CSDN内容合伙人&#xff0c;全栈领域优质创作者。 &#x1f468;‍&#x1f4bb;景愿&#xff1a;旨在于能和更多的热爱计算机的伙伴一起成长&#xff01;&#xff01;&…

MySQL(免密登录)

简介: MySQL免密登录是一种允许用户在没有输入密码的情况下直接登录到MySQL服务器的配置。这通常是通过在登录时跳过密码验证来实现的。 1、修改MySQL的配置文件 使用vi /etc/my.cnf&#xff0c;添加到【mysqld】后面 skip-grant-tables #配置项告诉mysql跳过权限验证&#…

Python基础语法之学习数据转换

Python基础语法之学习数据转换 一、代码二、效果 一、代码 #数字转换成字符串 num_str str(11) print(type(num_str))#字符串转整数 numint("11") print(type(num),num)#浮点数转整数 float_num int(11.1) print(type(float_num),float_num)#整数转浮点数 num_flo…

Jmeter工具学习三——CSV文件、关联、断言

Jmeter学习三——CSV文件和关联 jmeter做功能测试和做性能测试的区别CSV数据文件设置&#xff08;读取外部文件&#xff0c;进行分数据驱动&#xff09;文件设置字段介绍&#xff1a;文件名文件编码如果出现编码问题导致的乱码&#xff0c;如何解决&#xff1f; 变量名忽略首行…

品牌全渠道营销系统如何与不同经销商ERP打通

品牌商在与各经销商ERP系统打通方面面临的挑战。传统的ERP系统往往使得数据收集和合作变得繁琐且低效&#xff0c;导致市场响应迟缓&#xff0c;影响整体的供应链管理和市场决策。我们的解决方案旨在破解这一难题&#xff0c;提供一个全渠道营销系统&#xff0c;它能自动与各类…

Openlayer【三】—— 绘制多边形GeoJson边界绘制

1.1、绘制多边形 在绘制多边形和前面绘制线有异曲同工之妙&#xff0c;多边形本质上就是由多个点组成的线然后连接组成的面&#xff0c;这个面就是最终的结果&#xff0c;那么这里使用到的是Polygon对象&#xff0c;而传给这个对象的值也是多个坐标&#xff0c;坐标会一个个的…

Oracle Linux 9.3 发布

导读Oracle Linux 9 系列发布了第 3 个版本更新&#xff0c;支持 64 位 Intel 和 AMD (x86_64) 以及 64 位 Arm (aarch64) 平台。与所有的 Oracle Linux 版本一样&#xff0c;此版本与相应 RHEL 版本 100% 应用二进制兼容。 对于 x86_64 和 aarch64 架构&#xff0c;Oracle Li…

rabbitmq-server-3.11.10.exe

rabbitmq需要erlang环境 otp_win64_25.1.exe erlang-CSDN博客 https://www.rabbitmq.com/download.htmlhttps://www.rabbitmq.com/install-windows.htmlhttps://github.com/rabbitmq/rabbitmq-server/releases/download/v3.11.10/rabbitmq-server-3.11.10.exe C:\Users\Admi…

【蓝桥杯选拔赛真题25】C++两个数比大小 第十三届蓝桥杯青少年创意编程大赛C++编程选拔赛真题解析

目录 C/C++两个数比大小 一、题目要求 1、编程实现 2、输入输出 二、算法分析