Rocketmq 探索MQClientFactoryScheduledThread线程工作

线程池包含多个任务

代码如下
org.apache.rocketmq.client.impl.factory.MQClientInstance#startScheduledTask

private void startScheduledTask() {if (null == this.clientConfig.getNamesrvAddr()) {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();} catch (Exception e) {log.error("ScheduledTask fetchNameServerAddr exception", e);}}}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);}this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.updateTopicRouteInfoFromNameServer();} catch (Exception e) {log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);}}}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.cleanOfflineBroker();MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();} catch (Exception e) {log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);}}}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.persistAllConsumerOffset();} catch (Exception e) {log.error("ScheduledTask persistAllConsumerOffset exception", e);}}}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.adjustThreadPool();} catch (Exception e) {log.error("ScheduledTask adjustThreadPool exception", e);}}}, 1, 1, TimeUnit.MINUTES);}

任务1、更新nameSrvAddr

当nameSrvAddr 部署是云端动态的,比如是aws的。那么需要每两分钟更新一次nameSrvAddr。固定写死的,该任务不会跑。注意源码的if条件。

任务2、每30秒更新一次topicRoute

这个任务,我在之前的文章已有描述,查看rocketmq-push模式-消费侧重平衡-类流程图分析

任务3、每30秒清理失效broker && 发送心跳给ALL broker

  • 清理失效的broker
    具体看org.apache.rocketmq.client.impl.factory.MQClientInstance#cleanOfflineBroker
    目的是更新org.apache.rocketmq.client.impl.factory.MQClientInstance#brokerAddrTable
    更新的方法是
    org.apache.rocketmq.client.impl.factory.MQClientInstance#isBrokerAddrExistInTopicRouteTable
    通过任务2,更新了org.apache.rocketmq.client.impl.factory.MQClientInstance#topicRouteTable
    遍历topicRouteTable,查看broker是否还存在,不存在则移除。
    处理流程就是依赖任务2,只需要在内存中做更新

  • 发送心跳给broker
    遍历org.apache.rocketmq.client.impl.factory.MQClientInstance#brokerAddrTable

这里解析下这个成员变量
ConcurrentMap<String/* Broker Name /, HashMap<Long/ brokerId /, String/ address */>> brokerAddrTable
存储的是一个brokerName,对应的主从集群。master节点,brokerId是0,slave是正数

遍历brokerAddrTable,组装HeartbeatData数据,发送给这些broker。
HeartbeatData由三个成员变量组成。反映该client,生产者group消费者group的信息。

public class HeartbeatData extends RemotingSerializable {private String clientID;private Set<ProducerData> producerDataSet = new HashSet<ProducerData>();private Set<ConsumerData> consumerDataSet = new HashSet<ConsumerData>();

任务4、每5秒持久化一次offset,通知给broker

调用方法是org.apache.rocketmq.client.impl.factory.MQClientInstance#persistAllConsumerOffset
这里,根据消费者实现类的不同,分pull和push的 不同实现

  • push模式
    org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#persistConsumerOffset
public void persistConsumerOffset() {try {this.makeSureStateOK();Set<MessageQueue> mqs = new HashSet<MessageQueue>();Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();mqs.addAll(allocateMq);this.offsetStore.persistAll(mqs);} catch (Exception e) {log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);}}

从rebalanceImpl上获取MessageQueue

  • pull 模式
    org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl#persistConsumerOffset
public void persistConsumerOffset() {try {checkServiceState();Set<MessageQueue> mqs = new HashSet<MessageQueue>();if (this.subscriptionType == SubscriptionType.SUBSCRIBE) {Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();mqs.addAll(allocateMq);} else if (this.subscriptionType == SubscriptionType.ASSIGN) {Set<MessageQueue> assignedMessageQueue = this.assignedMessageQueue.getAssignedMessageQueues();mqs.addAll(assignedMessageQueue);}this.offsetStore.persistAll(mqs);} catch (Exception e) {log.error("Persist consumer offset error for group: {} ", this.defaultLitePullConsumer.getConsumerGroup(), e);}
}

根据订阅类型不同,获取MessageQueue的方式有不同。后续需要熟悉下SUBSCRIBE和ASSIGN模式的区别

对于集群模式,采用的实现类方法是
org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#persistAll
该方法就是遍历org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#offsetTable成员变量,处理上一步获取到的MessageQueue
然后往broker的master节点上发送队列的offset。至于主从节点如何同步offset,猜测要在broker上查看。

org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#updateConsumeOffsetToBroker

public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,MQBrokerException, InterruptedException, MQClientException {FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);if (null == findBrokerResult) {this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, false);}if (findBrokerResult != null) {UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();requestHeader.setTopic(mq.getTopic());requestHeader.setConsumerGroup(this.groupName);requestHeader.setQueueId(mq.getQueueId());requestHeader.setCommitOffset(offset);requestHeader.setBname(mq.getBrokerName());if (isOneway) {this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);} else {this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);}} else {throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);}}

任务5、每分钟调整一次ThreadPool

目前调深入,都是空方法,代码没有作用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/502945.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

《C++11》各种初始化方式的详细列举与对比

在 C 中&#xff0c;初始化对象的方式多种多样。随着 C 标准的演进&#xff0c;特别是 C11 的引入&#xff0c;初始化方式得到了显著的扩展和改进。本文将详细列举 C 中的各种初始化方式&#xff0c;并对它们进行对比&#xff0c;帮助开发者更好地理解和应用这些特性。 1. C98…

前端小案例——520表白信封

前言&#xff1a;我们在学习完了HTML和CSS之后&#xff0c;就会想着使用这两个东西去做一些小案例&#xff0c;不过又没有什么好的案例让我们去练手&#xff0c;本篇文章就提供里一个案例——520表白信封 ✨✨✨这里是秋刀鱼不做梦的BLOG ✨✨✨想要了解更多内容可以访问我的主…

【Vim Masterclass 笔记05】第 4 章:Vim 的帮助系统与同步练习(L14+L15+L16)

文章目录 Section 4&#xff1a;The Vim Help System&#xff08;Vim 帮助系统&#xff09;S04L14 Getting Help1 打开帮助系统2 退出帮助系统3 查看具体命令的帮助文档4 查看帮助文档中的主题5 帮助文档间的上翻、下翻6 关于 linewise7 查看光标所在术语名词的帮助文档8 关于退…

10-C语言项目池

C语言项目池 《个人通讯录》 《火车订票系统》 管理员用户1录入火车票信息区间查询/购票2显示火车票信息打印购票信息3查询火车票信息退票4修改火车票信息5添加火车票信息 《学生学籍管理系统》 1录入学生信息2添加学生信息3显示学生信息4查找学生信息5删除学生信息6修改学…

Android 绘制学习总结

1、刷新率介绍 我们先来理一下基本的概念&#xff1a; 1、60 fps 的意思是说&#xff0c;画面每秒更新 60 次 2、这 60 次更新&#xff0c;是要均匀更新的&#xff0c;不是说一会快&#xff0c;一会慢&#xff0c;那样视觉上也会觉得不流畅 3、每秒 60 次&#xff0c;也就是 1…

每日一题:BM1 反转链表

文章目录 [toc]问题描述数据范围示例 C代码实现使用栈实现&#xff08;不符合要求&#xff0c;仅作为思路&#xff09; 解题思路 - 原地反转链表步骤 C语言代码实现 以前只用过C刷过代码题目&#xff0c;现在试着用C语言刷下 问题描述 给定一个单链表的头结点 pHead&#xff…

78、使用爱芯派2_AX630C开发板 3.2T高有效算力 低功耗 支持AI-ISP真黑光实验

基本思想:使用爱心元智最新的版本开发板进行实验 AX630C、AX620Q 都是 620E 这一代 一、参考这个官方教程,先把代码在本地交叉编译完成 https://github.com/AXERA-TECH/ax620e_bsp_sdk 然后在拷贝到620c设备上 root@ax630c:~/ax620e_bsp_sdk/msp/out/arm64_glibc/bin# ./…

【Redis经典面试题七】Redis的事务机制是怎样的?

目录 一、Redis的事务机制 二、什么是Redis的Pipeline&#xff1f;和事务有什么区别&#xff1f; 三、Redis的事务和Lua之间有哪些区别&#xff1f; 3.1 原子性保证 3.2 交互次数 3.3 前后依赖 3.4 流程编排 四、为什么Lua脚本可以保证原子性&#xff1f; 五、为什么R…

企业网络性能监控

什么是网络性能监控 网络性能监控&#xff08;NPM&#xff09;是指对计算机网络的性能进行持续测量、分析和管理的过程&#xff0c;通过监控流量、延迟、数据包丢失、带宽利用率和正常运行时间等关键指标&#xff0c;确保网络高效、安全地运行&#xff0c;并将停机时间降至最低…

【开源】创建自动签到系统—QD框架

1. 介绍 QD是一个 基于 HAR 编辑器和 Tornado 服务端的 HTTP 定时任务自动执行 Web 框架。 主要通过抓包获取到HAR来制作任务模板&#xff0c;从而实现异步响应和发起HTTP请求 2. 需要环境 2.1 硬件需求 CPU&#xff1a;至少1核 内存&#xff1a;推荐 ≥ 1G 硬盘&#xff1a;推…

SUB输入5V升压充电16.8V芯片HU5912

HU5912芯片&#xff0c;作为航誉微电子有限公司推出的一款高性能升压充电管理IC&#xff0c;自其面世以来&#xff0c;便以其出色的性能和广泛的应用领域&#xff0c;受到了业界的高度关注和赞誉。本文将详细介绍HU5912芯片的技术特点、应用优势、市场定位以及其在各类电子设备…

练习(继承)

大家好&#xff0c;今天我们写几道题来巩固一下我们所学的知识&#xff0c;以便我们更好的学习新内容。 方法重写&#xff1a; 继承&#xff1a; 注&#xff1a;java中只能继承一个类 那么今天分享就到这里&#xff0c;谢谢大家&#xff01;&#xff01;&#xff01;

计算机网络 (28)虚拟专用网VPN

前言 虚拟专用网络&#xff08;VPN&#xff09;是一种在公共网络上建立私有网络连接的技术&#xff0c;它允许远程用户通过加密通道访问内部网络资源&#xff0c;实现远程办公和安全通信。 一、基本概念 定义&#xff1a;VPN是一种通过公共网络&#xff08;如互联网&#xff09…

04-spring-理-ApplicationContext的实现

实现1&#xff1a;ClassPathXmlApplicationContext 1、内部维护了 DefaultListableBeanFactory 2、通过XmlBeanDefinitionReader 读取配置文件将结果加入到 DefaultListableBeanFactory 3、没有维护 bean后置处理器 &#xff0c;可以通过在xml配置 <context:annotation-c…

STM32的LED点亮教程:使用HAL库与Proteus仿真

学习目标&#xff1a;掌握使用STM32 HAL库点亮LED灯&#xff0c;并通过Proteus进行仿真验证&#xff01; 建立HAL库标准工程 1.新建工程文件夹 新建工程文件夹建议路径尽量为中文。建立文件夹的目的为了更好分类去管理项目工程中需要的各类工程文件。 首先需要在某个位置建立工…

回归预测 | MATLAB实ELM-Adaboost多输入单输出回归预测

回归预测 | MATLAB实ELM-Adaboost多输入单输出回归预测 目录 回归预测 | MATLAB实ELM-Adaboost多输入单输出回归预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 一、极限学习机&#xff08;ELM&#xff09; 极限学习机是一种单层前馈神经网络&#xff0c;具有训练速…

实现AVL树

目录 AVL树概念 AVL树结构 AVL树插入 LL型 - 右单旋 RR型 - 左单旋 LR型 - 左右双旋 RL型 - 右左双旋 插入代码实现 AVL树测试 附AVL树实现完整代码 AVL树概念 前面的博客介绍了搜索二叉树&#xff0c;二叉搜索树-CSDN博客 在某些特定的情况下&#xff0c;⼆叉搜索树…

unity学习11:地图相关的一些基础

目录 1 需要从 unity的 Asset Store 下载资源 1.1 下载资源 1.2 然后可以从 package Manager 里选择下载好的包&#xff0c;import到项目里 2 创建地形 2.1 创建地形 2.2 地形 Terrain大小 2.3 各种网格的尺寸大小 2.4 比较这个地形尺寸和创建的其他物体的大小对比 3 …

【vue】晋升路线图、蛇形进度条

一、效果图&#xff08;参考链接&#xff09; 代码实现 <template><div class"only-content"><h1 class"text-center my-3">讲师晋升路线</h1><!--时间轴线显示--><div class"time-line"><div class&qu…

VisionPro软件Image Stitch拼接算法

2D图像拼接的3种情景 1.一只相机取像位置固定&#xff0c;或者多只相机固定位置拍图&#xff0c;硬拷贝拼图&#xff0c;采用CopyRegion工具实现 2.一只或多只相机在多个位置拍照&#xff0c;相机视野互相重叠&#xff0c;基于Patmax特征定位后&#xff0c;无缝 拼图&#xff…