消息中间件之RocketMQ源码分析(四)

消费者的Rebalance机制

客户端是通过Rebalance服务做到高可靠的。当发生Broker掉线、消费者实例掉线、
Topic扩容等各种突发情况时,消费者组中的消费者实例是怎么重平衡的,以支持全部队列的正常消费的?

Rebalance服务的类图

在这里插入图片描述

RebalanceImpl的核心属性

在这里插入图片描述

  • ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable:记录MessageQueue和ProcessQueue的关系,MessageQueue可以简单地理解为ConsumeQueue的客户端实现;ProcessQueue是保存Pull消息的本地容器
  • ConcurrentMap<String, Set< MessageQueue >> topicSubscribeInfoTable:Topic路由信息。保存Topic和MessageQueue的关系
  • ConcurrentMap<String(Topic), SubscriptionData> subscriptionInner:真正的订阅关系,保存当前消费者组订阅了哪些Topic的哪些Tag
  • AllocateMessageQueueStrategy allocateMessageQueueStrategy:MessageQUeue消息分配策略的实现
  • MQClientInstance mQClientFactory:client实例对象

RebalanceImpl的核心方法

  • boolean lock():为MessageQueue加锁
    在这里插入图片描述

  • void doRebalance():执行Rebalance操作
    在这里插入图片描述

  • void messageQueueChanged():通知Message发生变化,这个方法在Push和Pull两个类中被重写
    在这里插入图片描述

  • boolean removeUnnecessaryMessageQueue():去掉不再需要的MessageQueue
    在这里插入图片描述

  • void dispatchPullRequest():执行消息拉取请求

+

  • boolean updateProcessQueueTableInRebalance():在Rebalance中更新processQueue
    在这里插入图片描述

Rebalance过程

在这里插入图片描述

  • 消费者实例在收到Broker通知后是怎么执行Reblance的?这个操作是通过调用
    MQClientInstance.rebalanceImmediately()来实现的
    在这里插入图片描述
  • 这种设计是RocketMQ种典型的锁方式,执行wakeup命令后,this.waitForRunning()就会暂停,再执行this.mqClientFactory.doRebalance()
    在这里插入图片描述
  • doRebalance()方法主要有以下几个步骤
    1.查找当前clientId对应的全部的消费者组,全部执行一次Rebalance.
    虽然消费者实现分别为Pull消费和Push消费两种默认实现,调用的是不同实现类的Rebalance方法,但是实现逻辑都差不多
    在这里插入图片描述
    2.判断Rebalance开关,如果没有被暂停,则调用RebalancePushImpl.rebalance()方法
    在这里插入图片描述
    3.在RebalancePushImpl.rebalance()方法中,获取当前消费者全部订阅关系中的Topic,
    循环对每个Topic进行Rebalance.待全部的Rebalance都执行完之后,将不属于当前
    消费者的队列删除
    在这里插入图片描述
    在这里插入图片描述
    4.Topic队列重新分配,这里也就是客户端Rebalance的核心逻辑之处,根据是集群消费还是广播消费分别执行MessageQueue重新分配的逻辑,以集群消费为例分析
    在这里插入图片描述
    4.1.获取当前Topic的全部MessageQueue(代码中是mqSet)和该Topic的所有消费者的clientId(代码中是cidAll),只有当两者都不为空时,才执行Rebalance
    4.2.将全部的MessageQueue(代码中时mqAll)和消费者客户端(cidAll)进行排序。
    由于不是所有消费者的客户端都能彼此通信,所以将mqAll和cidAll排序的目的在于,
    保证所有消费者客户端在做Rebalance的时候,看到的MessageQueue列表和消费者
    客户端都是一样的试图,做Rebalance时才不会分配错
    4.3.按照当前设置的队列分配策略执行Queue分配。队列分配策略接口AllocateMessageQueueStrategy,该接口中,有两个方法allocate()和getName()
    在这里插入图片描述

allocate():执行队列分配操作,该方法必须满足全部队列都能分配到消费者
getName():获取当前分配算法的名字

目前队列分配策略有五种实现:
AllocateMessageQueueAveragely:平均分配,也就是默认使用的策略(强烈推荐)
AllocateMessageQueueAveragelyByCircle:环形分配策略
AllocateMessageQueueByConfig:手动配置
AllocateMessageQueueConsistentHash:一致性Hash分配
AllocateMessageQueueByMachineRoom:机房分配策略

4.4.动态更新ProcessQueue,在队列重新分配后,当前消费者消费的队列可能不会发生变化,也可能发生变化,不管时新增加了队列需要消费,还是减少了队列,都需要执行updateProcessQueueTableInRebalance()方法来更新ProcessQueue,如果有MessageQueue不再分配给当前的消费者消费,则设置ProcessQueue.setDropped(true),表示放其当前MessageQueue的Pull消息
在这里插入图片描述

如果在重新分配MessageQueue后,新增加了MessageQueue,
则添加一个对应的ProcessQueue,查询Queue拉取位点,包装一个新的PullRequest
来Pull消息;同理如果减少了MessageQueue,则将其对应的ProcessQueue删除,
不管MessageQueue时新增还是减少,都会设置changed为True,表示当前消费者
消费的MessageQueue有变化,源码中是分别两个集合遍历来判断是新增还是减少的。
在这里插入图片描述

PullRequest初始化的具体实现,新增的PullRequest对象将被分配出去拉取MessageQueue中的消息

在这里插入图片描述
4.5.执行messageQueueChanged()方法,如果有MessageQueue订阅关系发生变化,
则更新本地订阅关系版本,修改本地消费者有限流的一些参数,然后发送心跳,
通知所有Broker,当前订阅关系发生了改变
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/249529.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Springboot项目基础配置:小白也能快速上手!

推荐文章 给软件行业带来了春天——揭秘Spring究竟是何方神圣&#xff08;一&#xff09; 给软件行业带来了春天——揭秘Spring究竟是何方神圣&#xff08;二&#xff09; 给软件行业带来了春天——揭秘Spring究竟是何方神圣&#xff08;三&#xff09; 给软件行业带来了春天—…

Windows IIS服务如何配置并制作web站点结合内网穿透实现公网访问

文章目录 1. 安装IIS必要WebDav组件2. 客户端测试3. cpolar内网穿透3.1 打开Web-UI管理界面3.2 创建隧道3.3 查看在线隧道列表3.4 浏览器访问测试 4. 安装Raidrive客户端4.1 连接WebDav服务器4.2 连接成功4.2 连接成功总结&#xff1a; 自己用Windows Server搭建了家用NAS主机&…

【思科】 GRE VPN 的实验配置

【思科】GRE VPN 的实验配置 前言报文格式 实验需求配置拓扑GRE配置步骤R1基础配置GRE 配置 ISP_R2基础配置 R3基础配置GRE 配置 PCPC1PC2 抓包检查OSPF建立GRE隧道建立 配置文档 前言 VPN &#xff1a;&#xff08;Virtual Private Network&#xff09;&#xff0c;即“虚拟专…

系统架构17 - 软件工程(5)

软件工程 软件测试测试原则测试方法静态测试动态测试黑盒测试白盒测试灰盒测试自动化测试 测试阶段单元测试集成测试系统测试性能测试验收测试其它测试AB测试Web测试链接测试表单测试 测试用例设计黑盒测试用例白盒测试用例 调试 系统维护遗留系统系统转换转换方式数据转换与迁…

虹科干货 | 如何使用nProbe Cento构建100 Gbit NetFlow 传感器

本文是一份全面的指南&#xff0c;解释了如何使用nProbe Cento构建一个高效的100 Gbit NetFlow传感器。旨在帮助大家充分利用NetFlow技术&#xff0c;以监控和分析高速网络流量。 当需要监控分布式网络&#xff0c;了解流经上行链路或关键网段的网络流量时&#xff0c;NetFlow…

个人建站前端篇(一)项目准备初始化以及远程仓库连接

云风的知识库 云风网前端重构&#xff0c;采用vue3.0vite antd框架&#xff0c;实现前后端分离&#xff0c;实现网站的SEO优化&#xff0c;实现网站的性能优化 vite创建vue项目以及前期准备 Vite 需要 Node.js 版本 18&#xff0c;20。然而&#xff0c;有些模板需要依赖更高…

Arduino Uno R3通过ESP-01S连接网络

一、材料准备 Arduino Uno R3开发板 1 USB串口通信数据线&#xff08;Uno开发板使用&#xff09; 1 ESP8266-01S Wi-Fi模块 1 ESP8266固件烧录下载器&#xff08;烧录固件使用&#xff09; 1 WiFi无线收发转接板&#xff08;适用于ESP-01S、ESP-01&#xff09; 杜邦线…

京津冀科技盛会:2024北京国际智能科技产业展会(世亚智博会)

随着中国经济的快速发展&#xff0c;我们已逐渐从高速增长阶段转向高质量发展阶段。这一转变不仅是经济发展的必然趋势&#xff0c;也是全面建设社会主义现代化国家的首要任务。在这个过程中&#xff0c;数字经济以其高创新性、强渗透性和广覆盖性的特点&#xff0c;成为了构建…

《Docker技术革命:从虚拟机到容器化,全面解析Docker的原理与应用-上篇》

文章目录 Docker为什么会出现总结 Docker的思想Docker历史总结 Docker能干嘛虚拟机技术虚拟机技术的缺点 容器化技术Docker和虚拟机技术的区别 Docker概念Docker的基本组成镜像&#xff08;image)容器&#xff08;container&#xff09;仓科&#xff08;repository&#xff09;…

怎样做好Code Review

Code Review方案 定义 Code Review代码评审是指在软件开发过程中&#xff0c;通过对源代码进行系统性检查的过程。通常的目的是查找各种缺陷&#xff0c;包括代码缺陷、功能实现问题、编码合理性、性能优化等&#xff1b;保证软件总体质量和提高开发者自身水平 code review …

AI绘画探索人工智能的未来

&#x1f308;个人主页: Aileen_0v0 &#x1f525;热门专栏: 华为鸿蒙系统学习|计算机网络|数据结构与算法 ​&#x1f4ab;个人格言:“没有罗马,那就自己创造罗马~” #mermaid-svg-8fL64RHWVzwpzR6m {font-family:"trebuchet ms",verdana,arial,sans-serif;font-siz…

C++引用、内联函数、auto关键字介绍以及C++中无法使用NULL的原因

文章目录 一、引用1.1 引用概念1.2 引用特性1.3 常引用1.4 使用场景1.4.1 做参数1.4.2做返回值 1.5 引用和指针的区别1.6 小结一下 二、内联函数2.1 内联的概念2.2 内联的特性2.3 【面试题】 三、auto关键字(C11)3.1 类型别名思考3.2 auto简介 四、auto的使用细则4.1 基于范围的…

海外短剧系统国际短剧源码h5多语言版app挂载tiktok油管ins

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目 前言 一、海外短剧系统是什么&#xff1f; 二、海外短剧系统功能与运营方式介绍 1.系统功能 2.短剧APP运营方式 总结 前言 本文简单介绍海外短剧系统的功能&#xff…

leetcode hot100跳跃游戏Ⅱ

本题和上一题还是有不一样的地方&#xff0c;这个题中&#xff0c;我们需要记录我们跳跃的步数并尽可能的满足最小的跳跃步数到达终点。 那么我们还是采用覆盖范围的概念&#xff0c;但是我们需要两个&#xff0c;一个是在当前位置的覆盖范围&#xff0c;另一个是下一步的覆盖…

SpringCloud Gateway(4.1.0) 返回503:原因分析与解决方案

文章目录 一、环境版本二、原因分析三、解决方案 一、环境版本 Versionspring-cloud-dependencies2023.0.0spring-cloud-starter-gateway4.1.0Nacosv2.3.0 二、原因分析 在 Spring Cloud Gateway 的早期版本中&#xff0c;Ribbon 被用作默认的负载均衡器。随着Spring Cloud的…

Windows 7 x64 SP1 安装 Google Chrome 109.0.5414.120 (正式版本) (64 位)

1 使用 IE 浏览器 输入网址 Google Chrome 网络浏览器得益于 Google 智能工具&#xff0c;Chrome 现在更易用、更安全、更快速。https://www.google.cn/chrome/&#xff0c;点击下载 Chrome。 2 点击 接受并安装。 3 提示。 4 保存。 5 双击 运行 ChromeSetup.exe。 6 等待安…

【Spark系列1】DAG中Stage和Task的划分全流程

一、整体流程 每个Aciton操作会创建一个JOB&#xff0c;JOB会提交给DAGScheduler&#xff0c;DAGScheduler根据RDD依赖的关系划分为多个Stage&#xff0c;每个Stage又会创建多个TaskSet&#xff0c;每个TaskSet包含多个Task&#xff0c;这个Task就是每个分区的并行计算的任务。…

学习鸿蒙基础(2)

arkts是声名式UI DevEcoStudio的右侧预览器可以预览。有个TT的图标可以看布局的大小。和html的布局浏览很像。 上图布局对应的代码&#xff1a; Entry //入口 Component struct Index {State message: string Hello Harmonyos //State 数据改变了也刷新的标签build() {Row()…

PyFlink使用教程,Flink,Python,Java

环境准备 环境要求 Java 11 Python 3.7, 3.8, 3.9 or 3.10文档&#xff1a;https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/dev/python/installation/ 打开 Anaconda3 Prompt > java -version java version "11.0.22" 2024-01-16 LTS J…

自动驾驶:Apollo如何塑造人类的未来出行

前言 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家&#xff1a;https://www.captainbed.cn/z ChatGPT体验地址 文章目录 前言1. 什么是自定义指令&#xff1f;2. Apollo中的自定义指令2.1 查询中的自定…