【Seata源码学习 】 篇二 TM与RM初始化过程

【Seata源码学习 】 篇二 TM与RM初始化过程

1.GlobalTransactionScanner 初始化

GlobalTransactionScanner 实现了InitializingBean 接口,在初始化后将执行自定义的初始化方法

io.seata.spring.annotation.GlobalTransactionScanner#afterPropertiesSet

   @Overridepublic void afterPropertiesSet() {//是否禁用了全局事务if (disableGlobalTransaction) {if (LOGGER.isInfoEnabled()) {LOGGER.info("Global transaction is disabled.");}return;}//初始化客户端initClient();}

io.seata.spring.annotation.GlobalTransactionScanner#initClient

private void initClient() {if (LOGGER.isInfoEnabled()) {LOGGER.info("Initializing Global Transaction Clients ... ");}if (DEFAULT_TX_GROUP_OLD.equals(txServiceGroup)) {LOGGER.warn("the default value of seata.tx-service-group: {} has already changed to {} since Seata 1.5, " +"please change your default configuration as soon as possible " +"and we don't recommend you to use default tx-service-group's value provided by seata",DEFAULT_TX_GROUP_OLD, DEFAULT_TX_GROUP);}if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));}//init TM//初始化事务管理器TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);if (LOGGER.isInfoEnabled()) {LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);}//init RM//初始化资源管理器RMClient.init(applicationId, txServiceGroup);if (LOGGER.isInfoEnabled()) {LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);}if (LOGGER.isInfoEnabled()) {LOGGER.info("Global Transaction Clients are initialized. ");}//注册应用上下文关闭回调方法registerSpringShutdownHook();}

image-20231113213409054

2. 初始化事务管理器 TM

流程图

image-20231114222639955

实例化 TmNettyRemotingClient

io.seata.tm.TMClient#init(java.lang.String, java.lang.String, java.lang.String, java.lang.String)

 public static void init(String applicationId, String transactionServiceGroup) {//TM进行netty网络通信的客户端// applicationId 当前应用id  transactionServiceGroup 事务分组名称TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);tmNettyRemotingClient.init();}

首先看下获取实例的方法

io.seata.core.rpc.netty.TmNettyRemotingClient#getInstance(java.lang.String, java.lang.String)

public static TmNettyRemotingClient getInstance(String applicationId, String transactionServiceGroup) {TmNettyRemotingClient tmNettyRemotingClient = getInstance();tmNettyRemotingClient.setApplicationId(applicationId);tmNettyRemotingClient.setTransactionServiceGroup(transactionServiceGroup);return tmNettyRemotingClient;}

io.seata.core.rpc.netty.TmNettyRemotingClient#getInstance()

    public static TmNettyRemotingClient getInstance() {//双检锁,保证只有一个实例if (instance == null) {synchronized (TmNettyRemotingClient.class) {if (instance == null) {//netty的配置NettyClientConfig nettyClientConfig = new NettyClientConfig();//消息处理线程池//核心线程和最大线程都是16 没有非核心线程//有界的阻塞队列 容量为200final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(nettyClientConfig.getClientWorkerThreads(), nettyClientConfig.getClientWorkerThreads(),KEEP_ALIVE_TIME, TimeUnit.SECONDS,new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),new NamedThreadFactory(nettyClientConfig.getTmDispatchThreadPrefix(),nettyClientConfig.getClientWorkerThreads()),RejectedPolicies.runsOldestTaskPolicy());//创建实例instance = new TmNettyRemotingClient(nettyClientConfig, null, messageExecutor);}}}return instance;}

io.seata.core.rpc.netty.TmNettyRemotingClient#TmNettyRemotingClient

private TmNettyRemotingClient(NettyClientConfig nettyClientConfig,EventExecutorGroup eventExecutorGroup,ThreadPoolExecutor messageExecutor) {//调用父类构造器super(nettyClientConfig, eventExecutorGroup, messageExecutor, NettyPoolKey.TransactionRole.TMROLE);//基于SPI机制加载鉴权签名组件 AuthSignerthis.signer = EnhancedServiceLoader.load(AuthSigner.class);// set enableClientBatchSendRequest// 是否开启了批量发送请求对配置。默认 falsethis.enableClientBatchSendRequest = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.ENABLE_TM_CLIENT_BATCH_SEND_REQUEST,DefaultValues.DEFAULT_ENABLE_TM_CLIENT_BATCH_SEND_REQUEST);//监听配置是否有变化ConfigurationCache.addConfigListener(ConfigurationKeys.ENABLE_TM_CLIENT_BATCH_SEND_REQUEST, new ConfigurationChangeListener() {@Overridepublic void onChangeEvent(ConfigurationChangeEvent event) {String dataId = event.getDataId();String newValue = event.getNewValue();if (ConfigurationKeys.ENABLE_TM_CLIENT_BATCH_SEND_REQUEST.equals(dataId) && StringUtils.isNotBlank(newValue)) {enableClientBatchSendRequest = Boolean.parseBoolean(newValue);}}});}

实例化 AbstractNettyRemotingClient

io.seata.core.rpc.netty.AbstractNettyRemotingClient#AbstractNettyRemotingClient

 public AbstractNettyRemotingClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup,ThreadPoolExecutor messageExecutor, NettyPoolKey.TransactionRole transactionRole) {//调用父类构造器 用于处理消息的线程池super(messageExecutor);//当前事务角色this.transactionRole = transactionRole;//创建 NettyClientBootstrap 实例 clientBootstrap = new NettyClientBootstrap(nettyClientConfig, eventExecutorGroup, transactionRole);//消息处理器  clientBootstrap.setChannelHandlers(new ClientHandler());//channel管理器clientChannelManager = new NettyClientChannelManager(new NettyPoolableFactory(this, clientBootstrap), getPoolKeyFunction(), nettyClientConfig);}

实例化 AbstractNettyRemoting

io.seata.core.rpc.netty.AbstractNettyRemoting#AbstractNettyRemoting

  public AbstractNettyRemoting(ThreadPoolExecutor messageExecutor) {//设置处理消息的线程池this.messageExecutor = messageExecutor;}

初始化 TmNettyRemotingClient

回到

io.seata.tm.TMClient#init(java.lang.String, java.lang.String, java.lang.String, java.lang.String)

创建完成 TmNettyRemotingClient 实例后,调用init方法

public static void init(String applicationId, String transactionServiceGroup, String accessKey, String secretKey) {TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup, accessKey, secretKey);tmNettyRemotingClient.init();}
    @Overridepublic void init() {// registry processor//注册请求处理器registerProcessor();if (initialized.compareAndSet(false, true)) {//调用父类初始化方法 // 1. 定时重连// 2. 超时检测super.init();//如果事务分组不为空if (io.seata.common.util.StringUtils.isNotBlank(transactionServiceGroup)) {//通过channel管理器建立链接getClientChannelManager().reconnect(transactionServiceGroup);}}}

io.seata.core.rpc.netty.TmNettyRemotingClient#registerProcessor

   private void registerProcessor() {//根据不同的消息类型,使用不同的消息处理器//两个处理器 一种对消息进行处理//还有一种是处理心跳// 1.registry TC response processorClientOnResponseProcessor onResponseProcessor =new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());//注册就是将 消息处理器与线程池封装成一对pair,然后在进一步封装成map,map对key为消息类型,value为封装对pairsuper.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_REG_CLT_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_BATCH_RESULT_MSG, onResponseProcessor, null);// 2.registry heartbeat message processorClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);}
 @Overridepublic void registerProcessor(int requestCode, RemotingProcessor processor, ExecutorService executor) {Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, executor);this.processorTable.put(requestCode, pair);}

初始化 AbstractNettyRemotingClient

io.seata.core.rpc.netty.AbstractNettyRemotingClient#init

    @Overridepublic void init() {//周期线程池 第一次在60秒后通过连接管理器重新建立链接,之后每10秒重新建立一次链接timerExecutor.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {clientChannelManager.reconnect(getTransactionServiceGroup());}}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);if (this.isEnableClientBatchSendRequest()) {mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,MAX_MERGE_SEND_THREAD,KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));mergeSendExecutorService.submit(new MergedSendRunnable());}//启动一个周期线程池,每3秒检查一次请求是否超时super.init();//启动netty客户端clientBootstrap.start();}

io.seata.core.rpc.netty.NettyClientBootstrap#start

启动过程中一共设置了4个消息处理器

  1. IdleStateHandler 处理心跳
  2. ProtocolV1Decoder 消息解码
  3. ProtocolV1Encoder 消息编码
  4. ClientHandler 消息处理
 @Overridepublic void start() {if (this.defaultEventExecutorGroup == null) {this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(),new NamedThreadFactory(getThreadPrefix(nettyClientConfig.getClientWorkerThreadPrefix()),nettyClientConfig.getClientWorkerThreads()));}this.bootstrap.group(this.eventLoopGroupWorker).channel(nettyClientConfig.getClientChannelClazz()).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()).option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()).option(ChannelOption.SO_RCVBUF,nettyClientConfig.getClientSocketRcvBufSize());if (nettyClientConfig.enableNative()) {if (PlatformDependent.isOsx()) {if (LOGGER.isInfoEnabled()) {LOGGER.info("client run on macOS");}} else {bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED).option(EpollChannelOption.TCP_QUICKACK, true);}}bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),nettyClientConfig.getChannelMaxWriteIdleSeconds(),nettyClientConfig.getChannelMaxAllIdleSeconds())).addLast(new ProtocolV1Decoder()).addLast(new ProtocolV1Encoder());if (channelHandlers != null) {addChannelPipelineLast(ch, channelHandlers);}}});if (initialized.compareAndSet(false, true) && LOGGER.isInfoEnabled()) {LOGGER.info("NettyClientBootstrap has started");}}

io.seata.core.rpc.netty.AbstractNettyRemotingClient.ClientHandler

  @Overridepublic void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {if (!(msg instanceof RpcMessage)) {return;}processMessage(ctx, (RpcMessage) msg);}

io.seata.core.rpc.netty.AbstractNettyRemoting#processMessage

protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {if (LOGGER.isDebugEnabled()) {LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));}Object body = rpcMessage.getBody();//顶层接口 MessageTypeAwareif (body instanceof MessageTypeAware) {MessageTypeAware messageTypeAware = (MessageTypeAware) body;//根据消息的类型获取不同的RemotingProcessor进行处理final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());if (pair != null) {//如果线程池不为空,使用线程池执行 前面封装pair时,线程池都是nullif (pair.getSecond() != null) {try {pair.getSecond().execute(() -> {try {pair.getFirst().process(ctx, rpcMessage);} catch (Throwable th) {LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);} finally {MDC.clear();}});} catch (RejectedExecutionException e) {LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),"thread pool is full, current max pool size is " + messageExecutor.getActiveCount());if (allowDumpStack) {String name = ManagementFactory.getRuntimeMXBean().getName();String pid = name.split("@")[0];long idx = System.currentTimeMillis();try {String jstackFile = idx + ".log";LOGGER.info("jstack command will dump to " + jstackFile);Runtime.getRuntime().exec(String.format("jstack %s > %s", pid, jstackFile));} catch (IOException exx) {LOGGER.error(exx.getMessage());}allowDumpStack = false;}}} else {//如果消息处理器对应的线程池是空的,则直接处理try {pair.getFirst().process(ctx, rpcMessage);} catch (Throwable th) {LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);}}} else {LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());}} else {LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);}}

初始化 AbstractNettyRemoting

io.seata.core.rpc.netty.AbstractNettyRemoting#init

public void init() {//每3秒检查一次请求是否超时timerExecutor.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {for (Map.Entry<Integer, MessageFuture> entry : futures.entrySet()) {MessageFuture future = entry.getValue();if (future.isTimeout()) {futures.remove(entry.getKey());RpcMessage rpcMessage = future.getRequestMessage();future.setResultMessage(new TimeoutException(String.format("msgId: %s ,msgType: %s ,msg: %s ,request timeout", rpcMessage.getId(), String.valueOf(rpcMessage.getMessageType()), rpcMessage.getBody().toString())));if (LOGGER.isDebugEnabled()) {LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody());}}}nowMills = System.currentTimeMillis();}}, TIMEOUT_CHECK_INTERVAL, TIMEOUT_CHECK_INTERVAL, TimeUnit.MILLISECONDS);}

TM客户端与 TC 建立连接

io.seata.core.rpc.netty.NettyClientChannelManager#reconnect

void reconnect(String transactionServiceGroup) {List<String> availList = null;try {//根据事务分组名称找seata服务端地址列表 默认根据File配置映射关系查找//tx-service-group 事务分组名//vgroup-mapping.事务分组名=分组seata服务列表名//seata.service.grouplist.分组seata服务列表名=seata服务地址availList = getAvailServerList(transactionServiceGroup);} catch (Exception e) {LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);return;}if (CollectionUtils.isEmpty(availList)) {RegistryService registryService = RegistryFactory.getInstance();String clusterName = registryService.getServiceGroup(transactionServiceGroup);//如果找不到任何seata server 服务配置列表,抛出异常if (StringUtils.isBlank(clusterName)) {LOGGER.error("can not get cluster name in registry config '{}{}', please make sure registry config correct",ConfigurationKeys.SERVICE_GROUP_MAPPING_PREFIX,transactionServiceGroup);return;}if (!(registryService instanceof FileRegistryServiceImpl)) {LOGGER.error("no available service found in cluster '{}', please make sure registry config correct and keep your seata server running", clusterName);}return;}Set<String> channelAddress = new HashSet<>(availList.size());try {for (String serverAddress : availList) {try {//与所有seata server建立长连接acquireChannel(serverAddress);channelAddress.add(serverAddress);} catch (Exception e) {LOGGER.error("{} can not connect to {} cause:{}", FrameworkErrorCode.NetConnect.getErrCode(),serverAddress, e.getMessage(), e);}}} finally {if (CollectionUtils.isNotEmpty(channelAddress)) {List<InetSocketAddress> aliveAddress = new ArrayList<>(channelAddress.size());for (String address : channelAddress) {String[] array = address.split(":");aliveAddress.add(new InetSocketAddress(array[0], Integer.parseInt(array[1])));}RegistryFactory.getInstance().refreshAliveLookup(transactionServiceGroup, aliveAddress);} else {RegistryFactory.getInstance().refreshAliveLookup(transactionServiceGroup, Collections.emptyList());}}}

3.初始化资源管理器 RM

io.seata.rm.RMClient#init

    public static void init(String applicationId, String transactionServiceGroup) {//获取RmNettyRemotingClient实例RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);//设置资源管理器rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());//设置资源事务管理器rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());//初始化RmNettyRemotingClientrmNettyRemotingClient.init();}

实例化 RmNettyRemotingClient

io.seata.core.rpc.netty.RmNettyRemotingClient#getInstance(java.lang.String, java.lang.String)

  public static RmNettyRemotingClient getInstance(String applicationId, String transactionServiceGroup) {//获取实例,并创建消息处理线程池RmNettyRemotingClient rmNettyRemotingClient = getInstance();//设置应用idrmNettyRemotingClient.setApplicationId(applicationId);//设置事务分组名rmNettyRemotingClient.setTransactionServiceGroup(transactionServiceGroup);return rmNettyRemotingClient;}

io.seata.core.rpc.netty.RmNettyRemotingClient#getInstance()

public static RmNettyRemotingClient getInstance() {//双检锁创建实例 保证单例if (instance == null) {synchronized (RmNettyRemotingClient.class) {if (instance == null) {//netty客户端配置NettyClientConfig nettyClientConfig = new NettyClientConfig();//消息处理线程池final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(nettyClientConfig.getClientWorkerThreads(), nettyClientConfig.getClientWorkerThreads(),KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),new NamedThreadFactory(nettyClientConfig.getRmDispatchThreadPrefix(),nettyClientConfig.getClientWorkerThreads()), new ThreadPoolExecutor.CallerRunsPolicy());instance = new RmNettyRemotingClient(nettyClientConfig, null, messageExecutor);}}}return instance;}

RmNettyRemotingClient 及其父类的实例化过程都与TM是一致的,我们可以看下继承关系图

截屏2023-11-16 21.15.23

真正有区别的地方在于TM客户端初始化的过程与RM客户端初始化的过程

初始化 RmNettyRemotingClient

//获取RmNettyRemotingClient实例RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);//设置资源管理器rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());//设置资源事务管理器rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());//初始化RmNettyRemotingClientrmNettyRemotingClient.init();

io.seata.core.rpc.netty.RmNettyRemotingClient#init

    public void init() {// 注册消息处理器registerProcessor();if (initialized.compareAndSet(false, true)) {super.init();// Found one or more resources that were registered before initialization// 与TC建立连接前 会先判断资源是否存在if (resourceManager != null&& !resourceManager.getManagedResources().isEmpty()&& StringUtils.isNotBlank(transactionServiceGroup)) {getClientChannelManager().reconnect(transactionServiceGroup);}}}

io.seata.core.rpc.netty.RmNettyRemotingClient#registerProcessor

private void registerProcessor() {//注册类五种不同的消息处理器// 1.registry rm client handle branch commit processor// 分支事务提交RmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(getTransactionMessageHandler(), this);super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor);// 2.registry rm client handle branch rollback processor// 分支事务回滚RmBranchRollbackProcessor rmBranchRollbackProcessor = new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor);// 3.registry rm handler undo log processor// 回滚日志处理RmUndoLogProcessor rmUndoLogProcessor = new RmUndoLogProcessor(getTransactionMessageHandler());super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor, messageExecutor);// 4.registry TC response processor// TC响应处理ClientOnResponseProcessor onResponseProcessor =new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_BATCH_RESULT_MSG, onResponseProcessor, null);// 5.registry heartbeat message processor// 心跳信息处理ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);}

以上就是TM和RM实例化的过程,至于不同的消息处理器的实现我们放到后面去看

4. TM和RM初始化总结

截屏2023-11-16 21.51.27

两者其实过程是一致的,TM客户端对象TMClient主要是实例并初始化TmNettyRemotingClient,RM客户端对象RMClient主要是实例并初始化RmNettyRemotingClient。两者类继承关系如下所示

截屏2023-11-16 21.15.23

NettyRemotingClient 对象主要是在初始化方法中消息处理器并与TC服务端建立长连接,TM与RM注册的消息处理器是不同的,并且RM在与TC建立连接前会先判断数据库资源是否存在。TmNettyRemotingClient与RmNettyRemotingClient都将共同的方法放到抽象父类 AbstractNettyRemotingClient 中 。

父类 AbstractNettyRemotingClient 封装了原生的Netty信息,用于创建Netty客户端对象,并在初始化方法中启动一个周期线程去定期重新发起连接请求

AbstractNettyRemotingClient 的父类 AbstractNettyRemoting 主要是在执行初始化方法时启动 一个周期线程池,每隔3秒检测一次发送的消息集合中是否有消息超时,默认的超时时间为30秒

io.seata.common.DefaultValues#DEFAULT_RPC_RM_REQUEST_TIMEOUT

    long DEFAULT_RPC_RM_REQUEST_TIMEOUT = Duration.ofSeconds(30).toMillis();

我们可以通过设置 transport.rpcRmRequestTimeout (毫秒)去改变这个默认的值

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/222505.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

环境搭建及源码运行_java环境搭建_idea版本下载及安装

1、介绍 Idea是一款被广泛使用的Java集成开发环境&#xff0c;它提供了丰富的功能和工具来帮助开发人员更高效地编写和调试代码。作为一款开源软件&#xff0c;Idea不仅提供了基本的代码编辑、自动完成和调试功能&#xff0c;还支持大量的插件和扩展&#xff0c;可为开发人员提…

文件传输软件SecureFX mac支持多种协议

SecureFX mac是一款文件传输客户端&#xff0c;可在 Mac 操作系统上使用。它由 VanDyke Software 公司开发&#xff0c;旨在为用户提供安全、可靠、高效的文件传输服务。 SecureFX 支持多种协议&#xff0c;包括 SFTP、SCP、FTP、FTP over SSL/TLS 和 HTTP/S。它使用强大的加密…

Python代码示例 | 时间序列数据的组成

时间序列数据是以固定的时间间隔记录或收集的数据点序列。它是一种跟踪变量随时间演变的数据&#xff0c;如销售&#xff0c;股票价格&#xff0c;温度等。定期的时间间隔可以是每天&#xff0c;每周&#xff0c;每月&#xff0c;每季度或每年&#xff0c;数据通常表示为线图或…

Sublime Text 3配置 C# 开发环境

Sublime Text 3配置 C# 开发环境 一、引言二、主要内容1. 初识 Sublime Text 32. 初识 C#3. 接入 .NET Framework3.1 下载 .NET Framework3.2 环境变量配置 4. 配置 C# 开发环境5. 编写 C# 代码6. 运行 C# 代码 三、总结 一、引言 C# 是一种面向对象的编程语言&#xff0c;由微…

【华为数据之道学习笔记】6-5数据地图的核心价值

数据供应者与消费者之间往往存在一种矛盾&#xff1a;供应者做了大量的数据治理工作、提供了大量的数据&#xff0c;但数据消费者却仍然不满意&#xff0c;他们始终认为在使用数据之前存在两个重大困难。 1&#xff09;找数难 企业的数据分散存储在上千个数据库、上百万张物理表…

配置https环境

为什么要配置https环境 在使用 HTML5 的 API 时&#xff0c;很多 API 只能在 https 保证安全的情况下才能开启。这就要求我们在本地开发环境也能够配置 https&#xff0c;否则你需要每次部署到配有 https 的测试环境中才能看到预览效果&#xff0c;这对开发的敏捷度造成了极大…

项目进度管理:常用项目管理工具推荐

工欲善其事必先利其器&#xff0c;借助项目管理工具可以帮助项目经理更好的管理项目&#xff0c;起到事半功倍的效果。 使用项目管理工具来管理项目&#xff0c;有助于事情的快速落地&#xff0c;提升做事效率&#xff0c;也能让事情做的更周到全面 选择项目管理工具时可以参…

通过几个基本概念说一下为什么openGauss是当下之选?

Database、Schema、User都是数据库的基本概念&#xff0c;SQL标准中也有明确规范。但不同数据库的具体实现也不尽相同&#xff0c;有些甚至大相径庭。这就导致用户在做国产化选型和数据库迁移时可能会遇到种种困难。本文从这几个基本概念展开&#xff0c;说说为什么openGauss系…

数据结构之进阶二叉树(二叉搜索树和AVL树、红黑树的实现)超详细解析,附实操图和搜索二叉树的实现过程图

绪论​ “生命有如铁砧&#xff0c;愈被敲打&#xff0c;愈能发出火花。——伽利略”&#xff1b;本章主要是数据结构 二叉树的进阶知识&#xff0c;若之前没学过二叉树建议看看这篇文章一篇掌握二叉树&#xff0c;本章的知识从浅到深的对搜索二叉树的使用进行了介绍和对其底层…

linux 内核的 lru_list 的结构

在linux的slab分配的入口slab_alloc有一个传入参数lru&#xff0c;它的作用是使每个slab对象在unused&#xff0c;但可能后面继续使用的时候&#xff0c;不需要free&#xff0c;可以先放在lru_list上。lru_list的结构为&#xff1a; struct list_lru {struct list_lru_node *n…

DiffUtil + RecyclerView 在 Kotlin中的使用

很惭愧, 做了多年的Android开发还没有使用过DiffUtil这样解放双手的工具。 文章目录 1 DiffUtil 用来解决什么问题?2 DiffUtil 是什么?3 DiffUtil的使用4 参考文章 1 DiffUtil 用来解决什么问题? List发生变化, 我们使用 RecyclerView.Adapter.notifyDataChanged很熟练了 …

WiFi+蓝牙物联网定制方案——五大核心难点

WiFi蓝牙物联网定制方案可以根据具体需求进行定制&#xff1a; 1、设备连接方案&#xff1a;采用WiFi和蓝牙技术&#xff0c;将物联网设备与智能手机、平板电脑等设备进行连接&#xff0c;实现数据传输和远程控制。 2、数据传输方案&#xff1a;通过WiFi和蓝牙技术&#xff0c;…

Vue表格中鼠标移入移出input显示隐藏 ,有输入值不再隐藏

Vue表格中鼠标移入移出input显示隐藏 , 不再隐藏的效果 <el-tableref"table":data"tableDatas"borderstyle"width: 100%":span-method"arraySpanMethod"id"table"row-key"id"cell-mouse-enter"editCell&q…

Laravel框架使用phpstudy本地安装的composer用Laravel 安装器进行安装搭建

一、首先需要安装Laravel 安装器 composer global require laravel/installer 二、安装器安装好后&#xff0c;可以使用如下命令创建项目 laravel new sys 三、本地运行 php artisan serve 四、 使用Composer快速安装Laravel5.8框架 安装指定版本的最新版本&#xff08;推荐&a…

C#合并多个Word文档(微软官方免费openxml接口)

g /// <summary>/// 合并多个word文档&#xff08;合并到第一文件&#xff09;/// </summary>/// <param name"as_word_paths">word文档完整路径</param>/// <param name"breakNewPage">true(默认值)&#xff0c;合并下一个…

Linux:ACL 权限控制

ACL 概述 ACL&#xff08;Access Control List&#xff09;&#xff0c;主要作用可以提供除属主、属组、其他人的 rwx 权限之外的 细节权限设定。 ACL 的权限控制 &#xff08;1&#xff09;使用者&#xff08;user&#xff09; &#xff08;2&#xff09;群组&#xff08;grou…

如何使用 Helm 在 K8s 上集成 Prometheus 和 Grafana|Part 1

本系列将分成三个部分&#xff0c;您将学习如何使用 Helm 在 Kubernetes 上集成 Prometheus 和 Grafana&#xff0c;以及如何在 Grafana 上创建一个简单的控制面板。Prometheus 和 Grafana 是 Kubernetes 最受欢迎的两种开源监控工具。学习如何使用 Helm 集成这两个工具&#x…

类和对象

1 类定义&#xff1a; class ChecksumAccumulator {// class definition goes here } 你就能创建 ChecksumAccumulator 对象&#xff1a;new CheckSumAccumulator 注&#xff1a;1scala类中成员默认是public类型&#xff0c;若设为私有属性则必须加private关键字。在scala中是…

NLP论文阅读记录 - | 使用 BRIO 训练范式进行抽象文本摘要

文章目录 前言0、论文摘要一、Introduction二.相关工作三.本文方法四 实验效果4.1数据集4.2 对比模型4.3实施细节4.4评估指标4.5 实验结果标准抽象模型微调抽象模型微调抽象模型和 BRIO微调抽象模型和 BRIO-Loop 五 总结结论局限 前言 Abstractive Text Summarization Using th…

OpenCV | 告别人工目检:深度学习技术引领工业品缺陷检测新时代

文章目录 机器视觉缺陷检测工业上常见缺陷检测方法内容简介作者简介目录读者对象如何阅读本书获取方式 机器视觉 机器视觉是使用各种工业相机&#xff0c;结合传感器跟电气信号实现替代传统人工&#xff0c;完成对象识别、计数、测量、缺陷检测、引导定位与抓取等任务。其中工…