我们上篇简单梳理了下TM、RM的一些流程(离现在过得挺久的了,这篇我们这篇来梳理下TC的内容。
TC (Transaction Coordinator) - 事务协调者
维护全局和分支事务的状态,驱动全局事务提交或回滚。
TM (Transaction Manager) - 事务管理器
定义全局事务的范围:开始全局事务、提交或回滚全局事务。
RM (Resource Manager) - 资源管理器
管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
一、TC的定义
TC (Transaction Coordinator) - 事务协调者:维护全局和分支事务的状态,驱动全局事务提交或回滚
这个是官网给出的定义,就是说其实全局事务的协调。具体来说例如:当某个TM(事务管理器)需要开启事务的时候,就需要一个协调者来驱动全局事务或提交,或回滚,这个概念我们开始的时候很容易与TM混淆。我们看到这两个一个是开始,一个是驱动,具体通过demo来说。
1、一些基本逻辑信息
用户购买商品的业务逻辑。整个业务逻辑由3个微服务提供支持:
- 仓储服务(Storage):对给定的商品扣除仓储数量。
- 订单服务(Order):根据采购需求创建订单。
- 帐户服务(Account):从用户帐户中扣除余额。
- 业务服务(Business):发起整个业务逻辑。
这个官网给出的例子,就是Business服务发起本次业务逻辑,也就是通过@GlobalTransactional
来开始全局事务,然后后面的也就是TM的一些逻辑处理,这个我们上篇有说过,就是去调TC表示自己要开始全局事务,然后TC就会生成全局事务的XID
,之后其调其他的分支事务Stock、Order的话就带上这个全局事务XID
。然后这里TC就是一个协调者的角色,其他的TM、RM都想TC注册。例如如果后面如果@GlobalTransactional
方法正常执行的话,就事务提交,其就向TC发送消息,然后TC去协调分支事务的提交。
下面我们就来具体看下在TC中这些注册、提交、回滚等的操作。
二、TC的一些逻辑处理
1、前置内容
首先在TC这边,关于对应请求的处理,入口都是RemotingProcessor
接口的具体实现
其的子类:
2、RM通道注册
RM(资源管理器)注册。我们上面有介绍过,Seata会代理服务关于数据库的操作。然后其数据源初始化的时候,就会向TC注册。
但我们需要注意,这个目前还只是channel的通道连接,不是具体的RM资源注册,具体的RM事务资源注册是在全局事务开始然后操作数据库的时候。
对应到TC这边就是RegRmProcessor
在处理,这个主要是保存Channel通道,与Netty相关的内容,目前还没具体研究,但影响不大。先不深入这个。
3、TM通道注册
这个与前面的RM是类似的。
4、ServerOnRequestProcessor
然后这个是一个核心的处理类,其用来处理各种类型的信息。
我们可以看到,像分支注册、全局事务的开始、回滚都是这个Process处理的。
public class ServerOnRequestProcessor implements RemotingProcessor {private static final Logger LOGGER = LoggerFactory.getLogger(ServerOnRequestProcessor.class);private RemotingServer remotingServer;private TransactionMessageHandler transactionMessageHandler;public ServerOnRequestProcessor(RemotingServer remotingServer, TransactionMessageHandler transactionMessageHandler) {this.remotingServer = remotingServer;this.transactionMessageHandler = transactionMessageHandler;}@Overridepublic void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {if (ChannelManager.isRegistered(ctx.channel())) {onRequestMessage(ctx, rpcMessage);} else {........if (LOGGER.isInfoEnabled()) {LOGGER.info(String.format("close a unhandled connection! [%s]", ctx.channel().toString()));}}}private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {Object message = rpcMessage.getBody();RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());...........if (message instanceof MergedWarpMessage) {AbstractResultMessage[] results = new AbstractResultMessage[((MergedWarpMessage) message).msgs.size()];for (int i = 0; i < results.length; i++) {final AbstractMessage subMessage = ((MergedWarpMessage) message).msgs.get(i);results[i] = transactionMessageHandler.onRequest(subMessage, rpcContext);}MergeResultMessage resultMessage = new MergeResultMessage();resultMessage.setMsgs(results);remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resultMessage);} else {// the single send request messagefinal AbstractMessage msg = (AbstractMessage) message;AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);}}}
然后处理的核心就是DefaultCoordinator
,上面的AbstractRMHandler
是RM的处理
5、DefaultCoordinator
这里的主要用来处理TC的逻辑就是TCInboundHandler
接口:
通过这些Request我们也能找到其主要是用来处理全局事务的开启、分支事务的注册、全局事务的提交、回滚、状态这些。下面我们就来主要看下看下GlobalBeginRequest
、BranchRegisterRequest
、GlobalCommitRequest
的处理。
1、GlobalBeginRequest的处理
通过这里我们可以找到,在GlobalBegin开启全局事务的主要处理就是生成全局事务Xid返回。
public static GlobalSession createGlobalSession(String applicationId, String txServiceGroup, String txName,int timeout) {GlobalSession session = new GlobalSession(applicationId, txServiceGroup, txName, timeout);return session;
}
public GlobalSession(String applicationId, String transactionServiceGroup, String transactionName, int timeout) {this.transactionId = UUIDGenerator.generateUUID();this.status = GlobalStatus.Begin;this.applicationId = applicationId;this.transactionServiceGroup = transactionServiceGroup;this.transactionName = transactionName;this.timeout = timeout;this.xid = XID.generateXID(transactionId);}
这里是全局事务的session,然后再通过session.begin
来进行对应处理,例如将全局事务session保存到数据库:
我们可以看到这里有多种处理方式、例如保存到文件、redis这些。目前我们是保存到数据库中,处理的就是DataBaseSessionManager
public class DataBaseSessionManager extends AbstractSessionManagerimplements Initialize {............@Overridepublic void addGlobalSession(GlobalSession session) throws TransactionException {if (StringUtils.isBlank(taskName)) {boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session);if (!ret) {throw new StoreException("addGlobalSession failed.");}} else {boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session);if (!ret) {throw new StoreException("addGlobalSession failed.");}}}
这里之后就是DataBaseTransactionStoreManager
处理入库逻辑
2、BranchRegisterRequest分支事务注册的处理
再之后我们来看下分支事务的注册处理逻辑。分支事务的注册就是每个RM对应开始操作DB的时候(这个RM相关的内容上篇有提到过),向TC发起分支事务注册,代码逻辑就是放在seata代理的jdbc的ConnectionProxy
然后TC这边就会处理这个请求,可以看到这里就是生成并返回分支事务ID
同时会将当前分支事务保存到数据库中
3、GlobalCommitRequest全局事务提交的处理
我们上篇有提到,再TM发起全局事务后,如果各个分支都执行成功后,其会向TC发起全局事务的提交。
对于我们官方给到的demo。我们看下关于事务相关的表的数据。
然后各个分支都执行完成后,由TM来提交全局事务
在TM提交后,我们来看下TC的处理:
![外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传](https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=F%3A%5CMarkDown-File%5C%E7%9F%A5%E8%AF%86%E7%82%B9%E8%B5%84%E6%96%99%5CSeata%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90-TC.assets%5C
&pos_id=img-hmn2nU2b-1695536657221)
这里我们主要需要注意SessionHelper.forEach(globalSession.getSortedBranches(), branchSession
对于每个branchSession
的循环调用,也就是每个分支事务的session,在这里就会循环,然后调用每个分支进行分支事务的本地提交,也就是RM
本地提交的处理。
然后RM的处理主要就是RMHandlerAT
这个类在处理,因为我们当前是AT模式
然后ResourceManager
就是DataSourceManager
这里主要是添加到异步队列中。
然后主要处理就是删除数据库的undo
记录,就我们全局事务成功了,不需要使用undo
来回滚本地事务了。