Kafka-消费者-KafkaConsumer分析-ConsumerCoordinator

在前面介绍了Kafka中Rebalance操作的相关方案和原理。

在KafkaConsumer中通过ConsumerCoordinator组件实现与服务端的GroupCoordinator的交互,ConsumerCoordinator继承了AbstractCoordinator抽象类。

下面我们先来介绍AbstractCoordinator的核心字段,如图所示。

在这里插入图片描述- heartbeat:心跳任务的辅助类,其中记录了两次发送心跳消息的间隔(interval字段)、最近发送心跳的时间(lastHeartbeatSend字段)、最后收到心跳响应的时间(lastHeartbeatReceive字段)、过期时间(timeout字段)、心跳任务重置时间(lastSessionReset字段),同时还提供了计算下次发送心跳的时间(timeToNextHeartbeat()方法)、检测是否过期的方法(sessionTimeoutExpired()方法)。

  • heartbeatTask:HeartbeatTask是一个定时任务,负责定时发送心跳请求和心跳响应的处理,会被添加到前面介绍的ConsumerNetworkClient.delayedTasks定时任务队列中。
  • groupld:当前消费者所属的Consumer Group的Id。
  • client:ConsumerNetworkClient对象,负责网络通信和执行定时任务。
  • needsJoinPrepare:标记是否需要执行发送JoinGroupRequest请求前的准备操作。
  • rejoinNeeded:此字段是否重新发送JoinGroupRequest请求的条件之一。

下面先简单了解修改其值的地方和含义,如图所示。

在这里插入图片描述
上图①处是收到正常的JoinGroupResponse响应,将rejoinNeeded设置为false,防止重复发送JoinGroupRequest请求。

②、③、④三处分别是收到异常的SyncGroupResponse或HeartbeatResponse或消费者离开Consumer Group时执行的操作,它们都会将rejoinNeeded设置为true,表示可以重新发送JoinGroupRequest。

  • coordinator:Node类型,记录服务端GroupCoordinator所在的Node节点。
  • memberld:服务端GroupCoordinator返回的分配给消费者的唯一Id。
  • generation:服务端GroupCoordinator返回的年代信息,用来区分两次Rebalance操作。由于网络延迟等问题,在执行Rebalance操作时可能收到上次Rebalance过程的请求,避免这种干扰,每次Rebalance操作都会递增generation的值。

下面是ConsumerCoordinator的核心字段。

  • assignors:PartitionAssignor列表。在消费者发送的JoinGroupRequest请求中包含了消费者自身支持的PartitionAssignor信息,GroupCoordinator从所有消费者都支持的分配策略中选择一个,通知Leader使用此分配策略进行分区分配。此字段的值通过partition.assignment.strategy参数配置,可以配置多个。

  • metadata:记录了Kafka集群的元数据。

  • subscriptions:SubscriptionState对象,参考SubscriptionState小节。

  • autoCommitEnabled:是否开启了自动提交offset。

  • autoCommitTask:自动提交offset的定时任务。

  • interceptors:ConsumerInterceptor集合。

  • excludeInternalTopics:标识是否排除内部Topic。

  • metadataSnapshot:用来存储Metadata的快照信息,主要用来检测Topic是否发生了分区数量的变化。在ConsumerCoordinator的构造方法中,会为Metadata添加一个监听器,当Metadata更新时会做下面几件事。

    • 如果是AUTO_PATTERN模式,则使用用户自定义的正则表达式过滤Topic,得到需要订阅的Topic集合后,设置到SubscriptionState的subscription集合和groupSubscription集合中。

    • 如果是AUTO_PATTERN或AUTO_TOPICS模式,为当前Metadata做一个快照,这个快照底层是使用HashMap记录每个Topic中Partition的个数。将新旧快照进行比较,发生变化的话,则表示消费者订阅的Topic发生分区数量变化,则将SubscriptionState的needsPartitionAssignment字段置为true,需要重新进行分区分配。

    • 使用metadataSnapshot字段记录变化后的新快照。

  • assignmentSnapshot:也是用来存储Metadata的快照信息,不过是用来检测Partition分配的过程中有没有发生分区数量变化。具体是在Leader消费者开始分区分配操作前,使用此字段记录Metadata快照;收到SyncGroupResponse后,会比较此字段记录的快照与当前Metadata是否发生变化。如果发生变化,则要重新进行分区分配。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/240447.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

UML-活动图

提示:大家可以参考我的状态图博客 UML-活动图 一、活动图的基本概念1.开始状态和结束状态2.动作状态和活动状态(活动)3.分支与合并4.分叉与合并5.活动转换(1)转移(2)判定 6.泳道 二、活动图的例…

数据结构:顺序栈

栈是一种先进后出的数据结构,只允许在一端(栈顶)操作,代码中top表示栈顶。 stack.h /* * 文件名称:stack.h * 创 建 者:cxy * 创建日期:2024年01月17日 * 描 述: …

数据分片概述、环境准备、部署MyCAT服务、全局表、分片表、ER表

1 案例1:部署mycat服务 1.1 问题 把主机mysql60 配置为 MySQL59 的从服务器把主机mysql62 配置为 MySQL61 的从服务器把主机mycat63 配置为mycat服务器客户端192.168.88.50访问mycat服务 1.2 方案 准备6台虚拟机,具体配置如表-1 1.3 步骤 实现此案例…

Docker中创建并配置MySQL、nginx、redis等容器

Docker中安装并配置MySQL、nginx、redis等 文章目录 Docker中安装并配置MySQL、nginx、redis等一、创建nginx容器①:拉取镜像②:运行nginx镜像③:从nginx容器中映射nginx配置文件到本地④:重启nginx并重新配置nginx的挂载 二、创建…

React全局状态管理

redux是一个状态管理框架,它可以帮助我们清晰定义state和处理函数,提高可读性,并且redux中的状态是全局共享,规避组件间通过props传递状态等操作。 快速使用 在React应用的根节点,需要借助React的Context机制存放整个…

新定义51单片机(RD8G37)实现测距测速仪

本文描述用新定义51单片机(RD8G37)超声波一体测距传感器实现简单的测距测速仪。 测距仪演示效果 新定义RD8G37Q48RJ开发板 超声波测距模块: 8位并口屏 1、main.c unsigned short timeConsuming0; unsigned int oldDistance;void rectClearS…

jeecgboot 前端bug or 后端 看图

无法显示文本 只能显示value 很恶心 如果用 varchar 就可以 不知道有没有别的方式 用int 解决 ,可能是我没有发现好的方法

Peter算法小课堂—并查集

我们先来看太戈编程467题 攀亲戚 题目描述: 最近你发现自己和古代一个皇帝长得很像:都有两个鼻子一个眼睛,你想知道这皇帝是不是你的远方亲戚,你是不是皇亲国戚。目前你能掌握的信息有m条,关于n个人:第i条…

手机崩溃日志的查找与分析

手机崩溃日志的查找与分析 摘要 本文介绍了一款名为克魔助手的iOS应用日志查看工具,该工具可以方便地查看iPhone设备上应用和系统运行时的实时日志和崩溃日志。同时还提供了崩溃日志的分析查看模块,可以对苹果崩溃日志进行符号化、格式化和分析&#x…

yolov5训练自己的数据

目录 1. 环境搭建2. 数据准备3. 数据标注4. 数据整理4.1 数据集切分4.2 修改数据文件4.3 修改模型文件 5. 训练模型5.1 训练5.2 验证5.3 测试 6. 训练结果分析 1. 环境搭建 安装anaconda、python、 cuda、 cudnn、 pytoch、 torchvision、 torchaudio等等。这里不详述 2. 数据…

软件测试大作业||测试计划+测试用例+性能用例+自动化用例+测试报告

xxx学院 2023—2024 学年度第二学期期末考试 《软件测试》(A)试题(开卷) 题目:以某一 web 系统为测试对象,完成以下文档的编写: (满分 100 分) (1&am…

量化研究员!你应该如何写一手好代码

即使是Quant Researcher, 写一手高质量的代码也是非常重要的。再好的思路,如果不能正确地实现,都是没有意义的。 写一手高质量的代码的意义,对Quant developer来讲就更是自不待言了。这篇笔记就介绍一些python best practice。 始…

QT第二周周三

题目&#xff1a;使用图片绘制出仪表盘 代码&#xff1a; widget.h #ifndef WIDGET_H #define WIDGET_H#include <QWidget>QT_BEGIN_NAMESPACE namespace Ui { class Widget; } QT_END_NAMESPACEclass Widget : public QWidget {Q_OBJECTpublic:Widget(QWidget *paren…

申请开启|成为亚马逊云科技 Community Builder,共建云端社区!

在探索由技术打造的云端世界时&#xff0c;和同行者一起学习&#xff0c;与技术专家共同探讨是开发者成长的最佳助力&#xff01; 亚马逊云科技开发者社区 Community Builders 为技术爱好者和新兴思想领袖提供技术资源、学习和交流机会&#xff0c;帮助开发者探索、分享技术相关…

【车载HMI开发工具--EB GUIDE 与 Unity 合作提供一体化的沉浸式 HMI 设计开发工具链】【转载】

随着车载高性能计算平台的日益普及以及显示器尺寸和数量的不断增加&#xff0c;沉浸式车载人机交互界面&#xff08;HMI&#xff09;的需求也在持续增长。为了将实时 3D 技术带入车载 HMI 领域&#xff0c;Unity 与 Elektrobit (EB)展开了合作&#xff0c;EB 是推进 HMI 功能安…

CC工具箱使用指南:【添加字段(批量)】

一、简介 Arcgis中添加字段是常用的一个操作&#xff0c;软件中也自带有添加字段工具。 如果要给一个要素或表批量添加字段&#xff0c;可以用迭代器或批处理。 但如果理复杂一点&#xff0c;有多个GDB要素、表格&#xff0c;或者是SHP文件&#xff0c;需要给这个要素或表添…

Git将某个文件合并到指定分支

企业开发中&#xff0c;经常会单独拉分支去做自己的需求开发&#xff0c;但是某些时候一些公共的配置我们需要从主线pull&#xff0c;这时候整个分支merge显然不合适 1.切换至待合并文件的分支 git checkout <branch>2.将目标分支的单个文件合并到当前分支 git checkou…

.NET国产化改造探索(三)、银河麒麟安装.NET 8环境

随着时代的发展以及近年来信创工作和…废话就不多说了&#xff0c;这个系列就是为.NET遇到国产化需求的一个闭坑系列。接下来&#xff0c;看操作。 上一篇介绍了如何在银河麒麟操作系统上安装人大金仓数据库&#xff0c;这篇文章详细介绍下在银河麒麟操作系统上安装.NET8环境。…

最新 生成pdf文字和表格

生成pdf文字和表格 先看效果 介绍 java项目&#xff0c;使用apache的pdfbox工具&#xff0c;可分页&#xff0c;自定义列 依赖 <dependency><groupId>org.apache.pdfbox</groupId><artifactId>pdfbox</artifactId><version>2.0.22<…

第一讲_HarmonyOS应用开发环境准备

HarmonyOS应用开发环境准备 1. 知识储备2. 环境搭建2.1 安装node.js2.2 配置node.js2.3 安装命令行工具2.4 安装DevEco Studio2.5 配置DevEco Studio 1. 知识储备 HarmonyOS提供了一套UI开发框架&#xff0c;即方舟开发框架&#xff08;ArkUI框架&#xff09;。方舟开发框架可…