rocketmq-push模式-消费侧重平衡-类流程图分析

1、观察consumer线程

使用arthas分析
把无关线程排除掉

  • MQClientFactoryScheduledThread 定时任务线程
    定时任务线程,包含如下任务:
    每2分钟更新nameServer列表
    每30秒更新topic的路由信息
    每30秒检查broker的存活,发送心跳请求
    每5秒持久化消费队列的offset。如果是广播模式,持久化在本地;如果是集群模式,反馈给broker
    每分钟调整线程池大小(实际上并没有作用。因为最终执行是空方法)

  • PullMessageService 从broker拉取msg的线程。

  • RebalanceService 重平衡线程。每20秒执行一次
    具体可查看org.apache.rocketmq.client.impl.factory.MQClientInstance#start()

2、重平衡,任务是如何创建的

重平衡,就是在消费者组动态伸缩的时候,自动把队列重新分配。具体工作的线程,就是RebalanceService。如下是整个重平衡的类图流程
在这里插入图片描述
如图,启动时,会触发重平衡任务org.apache.rocketmq.client.impl.consumer.RebalanceService#run()
重平衡的关键点在于如何动态伸缩,重点内容在org.apache.rocketmq.client.impl.consumer.RebalanceImpl#doRebalance

  • 第一步,获取topic对应的队列集合Set<> mqSet
  • 第二步,随机从一个可选broker上,获取所有消费者的集合List cidAll。cid就是消费端的唯一标识。格式如下:“ip@pid#时间戳”,比如127.01.01.01@1723#2926328724786400
  • 第三步,按字段排序mqSet和cidAll。Collections.sort()
if (mqSet != null && cidAll != null) {List<MessageQueue> mqAll = new ArrayList<MessageQueue>();mqAll.addAll(mqSet);Collections.sort(mqAll);Collections.sort(cidAll);AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;List<MessageQueue> allocateResult = null;try {allocateResult = strategy.allocate(this.consumerGroup,this.mQClientFactory.getClientId(),mqAll,cidAll);} catch (Throwable e) {log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),e);return;}Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();if (allocateResult != null) {allocateResultSet.addAll(allocateResult);}boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);if (changed) {log.info("rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),allocateResultSet.size(), allocateResultSet);this.messageQueueChanged(topic, mqSet, allocateResultSet);}
}
  • 第四步、选择重平衡策略
    总共有6种。常用的是AllocateMessageQueueAveragely 平均hash队列算法;AllocateMessageQueueAveragelyByCircle 圆周平均hash队列算法
  • 第五步,判断当前consumer节点是否有伸缩变更。有则创建PullRequest请求体
  • 第六步,将PullRequest请求体put到PullMessageService拉取任务的队列pullRequestQueue

topic的路由信息,是如何更新的

回看上一节的第一步。队列集合Set<> mqSet。在重平衡线程中,是直接获取这个集合的。这个集合,其实是启动时和定时任务线程MQClientFactoryScheduledThread 更新的
每30秒更新topic的路由信息
详细如图:
在这里插入图片描述

总结

首先启动时,会从nameserver获取topic的所有queue。这些queue分布在多个broker上。
构建了mqSet。再随机从一个broker上,获取当前消费者组,包含的所有消费者List<> cidAll。
将两者排序,根据分配策略,分配当前消费者负责的队列。(比如总共12个队列,4个消费者。当前消费者,负责 4,5,6队列)。如此看,是客户端重平衡。通过排序然后策略分配的方式,实现消费者互不通信的条件下协同合作
启动时,内存都是空,所以会触发构建PullRequest请求体。将请求体,put进拉取线程PullMessageService的队列。
每过20秒,会做一次重平衡;
每过30秒,会更新一次路由信息;

后续分析:

  • 拉取线程PullMessageService的工作;
  • 运行过程中,没有重平衡的情况,RebalanceService是不会再创建PullRequest请求体的。如何重复构建PullRequest请求体,循环拉取?(这块代码在PullMessageService中实现)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/496725.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

群落生态学研究进展▌Hmsc包对于群落生态学假说的解读、Hmsc包开展单物种和多物种分析的技术细节及Hmsc包的实际应用

HMSC&#xff08;Hierarchical Species Distribution Models&#xff09;是一种用于预测物种分布的统计模型。它在群落生态学中的应用广泛&#xff0c;可以帮助科学家研究物种在不同环境条件下的分布规律&#xff0c;以及预测物种在未来环境变化下的潜在分布范围。 举例来说&a…

影视仓最新接口+内置本包方法的研究(2024.12.27)

近日喜欢上了研究影视的本地仓库内置&#xff0c;也做了一个分享到了群里。 内置本地仓库包的好处很明显&#xff0c;当前线路接口都是依赖网络上的代码站存放&#xff0c;如果维护者删除那就GG。 虽然有高手制作了很多本地包&#xff0c;但推送本地包到APP&#xff0c;难倒一片…

教育元宇宙的优势与核心功能解析

随着科技的飞速发展&#xff0c;教育领域正迎来一场前所未有的变革。教育元宇宙作为新兴的教育形态&#xff0c;以其独特的优势和丰富的功能&#xff0c;正在逐步改变我们的学习方式。本文将深入探讨教育元宇宙的优势以及其核心功能&#xff0c;为您揭示这一未来教育的新趋势。…

多个微服务 Mybatis 过程中出现了Invalid bound statement (not found)的特殊问题

针对多个微服务的场景&#xff0c;记录一下这个特殊问题&#xff1a; 如果启动类上用了这个MapperScan注解 在resource 目录下必须建相同的 com.demo.biz.mapper 目录结构&#xff0c;否则会加载不到XML资源文件 。 并且切记是com/demo/biz 这样的格式创建&#xff0c;不要使用…

Java基础知识(四) -- 面向对象(下)

1.类变量和类方法 1.1 类变量背景 有一群小孩在玩堆雪人,不时有新的小孩加入,请问如何知道现在共有多少人在玩? 思路分析: 核心在于如何让变量count被所有对象共享 public class Child {private String name;// 定义静态变量(所有Child对象共享)public static int count 0;p…

Linux系统之stat命令的基本使用

Linux系统之stat命令的基本使用 一、stat命令 介绍二、stat命令帮助2.1 查询帮助信息2.2 stat命令的帮助解释 三、stat命令的基本使用3.1 查询文件信息3.2 查看文件系统状态3.3 使用格式化输出3.4 以简洁形式打印信息 四、注意事项 一、stat命令 介绍 stat 命令用于显示文件或文…

雷池 WAF 搭配阿里云 CDN 使用教程

雷池 WAF&#xff08;Web Application Firewall&#xff09;是一款强大的网络安全防护产品&#xff0c;通过实时流量分析和精准规则拦截&#xff0c;有效抵御各种网络攻击。在部署雷池 WAF 的同时&#xff0c;结合阿里云 CDN&#xff08;内容分发网络&#xff09;可以显著提升网…

蓝桥杯速成教程{三}(adc,i2c,uart)

目录 一、adc 原理图​编辑引脚配置 Adc通道使能配置 实例测试 ​编辑效果显示 案例程序 badc 按键相关函数 测量频率占空比 main 按键的过程 显示界面的过程 二、IIC通信-eeprom 原理图AT24C02 引脚配置 不可用状态&#xff0c;用的软件IIC 官方库移植 At24c02手册 ​编辑…

Semantic Segmentation Editor标注工具

https://github.com/Hitachi-Automotive-And-Industry-Lab/semantic-segmentation-editor https://docs.meteor.com/about/install.html https://v2-docs.meteor.com/install.html 安装指定版本的meteor curl https://install.meteor.com/\?release\2.12 | sh ubuntu18 安…

攻防世界web新手第四题easyphp

<?php highlight_file(__FILE__); $key1 0; $key2 0;$a $_GET[a]; $b $_GET[b];if(isset($a) && intval($a) > 6000000 && strlen($a) < 3){if(isset($b) && 8b184b substr(md5($b),-6,6)){$key1 1;}else{die("Emmm...再想想&quo…

vxe-table 实现跨行按钮同时控制两行的编辑状态

vxe-table 写可编辑表格用起来很爽吧&#xff01;有没有遇到下面这种要用一个跨行按钮&#xff0c;控制两行编辑框是否可编辑的情况。是不是官网的方法不好实现了&#xff1f;那么这个应该怎么实现呢。最近刚好碰到这个问题。说下个人的实现思路。 其实也简单&#xff0c;既然官…

ES 磁盘使用率检查及处理方法

文章目录 1. 检查原因2. 检查方法3. 处理方法3.1 清理数据3.2 再次检查磁盘使用率 1. 检查原因 磁盘使用率在 85%以下&#xff0c;ES 可正常运行&#xff0c;达到 85%及以上会影响 PEIM 数据存储。 在 ES 磁盘分配分片控制策略中&#xff0c;为了保护数据节点的安全&#xff0…

论文解读 | EMNLP2024 一种用于大语言模型版本更新的学习率路径切换训练范式

点击蓝字 关注我们 AI TIME欢迎每一位AI爱好者的加入&#xff01; 点击 阅读原文 观看作者讲解回放&#xff01; 作者简介 王志豪&#xff0c;厦门大学博士生 刘诗雨&#xff0c;厦门大学硕士生 内容简介 新数据的不断涌现使版本更新成为大型语言模型&#xff08;LLMs&#xff…

【Linux 系统负载详情解析】

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c; 忍不住分享一下给大家。点击跳转到网站 学习总结 1、掌握 JAVA入门到进阶知识(持续写作中……&#xff09; 2、学会Oracle数据库入门到入土用法(创作中……&#xff09; 3、手把…

欲海航舟:探寻天性驱动下的欲望演变与人生驾驭

欲海航舟&#xff1a;探寻天性驱动下的欲望演变与人生驾驭。 欲望之源起&#xff0c;本乎天性。 鸿蒙初辟&#xff0c;生灵乍现&#xff0c;欲望即随人之性灵而生&#xff0c;如花木之根柢&#xff0c;虽隐匿于地下&#xff0c;却为生长之根基。 人之初诞&#xff0c;懵懂无…

WebRTC 环境搭建

主题 本文主要描述webrtc开发过程中所需的环境搭建 环境&#xff1a; 运行环境&#xff1a;ubuntu20.04 Node.js环境搭建 安装编译 Node.js 所需的依赖包: sudo apt-get updatesudo apt-get install -y build-essential libssl-dev下载 Node.js 源码: curl -sL https://…

QT-------认识QT

QT简介 QT是一个跨平台的C图形用户界面应用程序框架&#xff0c;由挪威Trolltech公司于1991年开发并发布。它为开发者提供了一套丰富的类库和工具&#xff0c;用于创建各种类型的应用程序&#xff0c;包括桌面应用、移动应用、嵌入式系统应用等。QT具有高度的可定制性和可扩展…

SpringBoot配置文件、热部署、YAML语法、配置文件值注入

SpringBoot的配置文件 文章目录 SpringBoot的配置文件1.SpringBoot的热部署2.配置文件2.1配置文件的作用2.2YAML配置文件&#xff1a;2.3YAML 与 JSON 和 XML 的对比 3.YAML语法3.1键值对3.2值的写法3.3对象、Map&#xff08;属性和值&#xff09;&#xff08;键值对&#xff0…

基于BiTCN双向时间卷积网络实现电力负荷多元时序预测(PyTorch版)

Bidirectional Temporal Convolutional Network \begin{aligned} &\text{\Large \color{#CDA59E}Bidirectional Temporal Convolutional Network}\\ \end{aligned} ​Bidirectional Temporal Convolutional Network​ Bidirectional Temporal Convolutional Network (BiTC…

【JavaEE】Spring Web MVC

目录 一、Spring Web MVC简介 1.1 MVC简介1.2 Spring MVC1.3 RequestMapping注解1.3.1 使用1.3.2 RequestMapping的请求设置 1.3.2.1 方法11.3.2.2 方法2 二、Postman介绍 2.1 创建请求2.2 界面如下&#xff1a;2.3 传参介绍 一、Spring Web MVC简介 官方文档介绍&#xff…