Kafka消费者组重平衡(二)

文章目录

    • 概要
    • 重平衡通知机制
    • 消费组组状态
    • 消费端重平衡流程
    • Broker端重平衡流程

概要

上一篇Kafka消费者组重平衡主要介绍了重平衡相关的概念,本篇主要梳理重平衡发生的流程。

为了更好地观察,数据准备如下:
kafka版本:kafka_2.13-3.2.1
控制台创建topic (2个分区1个副本):
bin/kafka-topics.sh --create --bootstrap-server node1:9092 --replication-factor 1 --partitions 2 --topic test-rebalance

本地启动两个SpringBoot项目实例,代码如下

@KafkaListener(topics = "test-rebalance", groupId = "test-group")
public void rebalanceConsumer(ConsumerRecord<String, String> recordInfo) {int partition = recordInfo.partition();System.out.println("partition:" + partition + " value:" + recordInfo.value());
}

重平衡通知机制

Kafka Java 消费者需要定期地发送心跳请求(Heartbeat Request)到 Broker 端的协调者,以表明它还存活着。在 Kafka 0.10.1.0 版本之前,发送心跳请求是在消费者主线程完成的,也就是调用 KafkaConsumer.poll 方法的那个线程。
这样的设计存在弊端,一旦消息处理消耗了过长的时间,心跳请求将无法及时发到协调者那里,导致协调者“错误地”认为该消费者已“死”。自 0.10.1.0 版本开始,Kafka引入了一个单独的心跳线程来专门执行心跳请求发送,避免了这个问题。

重平衡的通知机制正是通过心跳线程来完成的。当协调者决定开启新一轮重平衡后,它会将“REBALANCE_IN_PROGRESS”封装进心跳请求的响应中,发还给消费者实例。当消费者实例发现心跳响应中包含了“REBALANCE_IN_PROGRESS”,就能立马知道重平衡又开始了,这就是重平衡的通知机制。

消费组组状态

Kafka 为消费者组定义了 5 种状态,它们分别是:Empty、Dead、PreparingRebalance、CompletingRebalance 和 Stable。那么,这 5 种状态的含义如下:

状态含义
Empty组内没有任何成员
Dead组内没有任何成员,但组的元数据已经在协调者端删除
PreparingRebalance消费组组准备开启重平衡,此时所有成员都要重新请求加入消费者组
CompletingRebalance消费者组下所有成员已经加入,各个成员正在等待分配方案
Stable消费者组的稳定状态,该状态表明重平衡已经完成,组内各成员都能够正常消费数据了

以下是各个状态的流转:

在这里插入图片描述

一个消费者组最开始是 Empty 状态,当重平衡过程开启后,它会被置于PreparingRebalance 状态等待成员加入,之后变更到CompletingRebalance 状态等待分配方案,最后流转到 Stable 状态完成重平衡。当有新成员加入或已有成员退出时,消费者组的状态从 Stable 直接跳到PreparingRebalance 状态,此时,所有现存成员就必须重新申请加入组。

创建一个topic并逐步启动两个消费者实例:
服务端日志:

INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group test-group in Empty state. Created a new member id consumer-1-a6c11c9f-ec26-4e66-adeb-832f699f1247 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
INFO [GroupCoordinator 0]: Preparing to rebalance group test-group in state PreparingRebalance with old generation 3 (__consumer_offsets-12) (reason: Adding new member consumer-1-a6c11c9f-ec26-4e66-adeb-832f699f1247 with group instance id None; client reason: not provided) (kafka.coordinator.group.GroupCoordinator)
INFO [GroupCoordinator 0]: Stabilized group test-group generation 4 (__consumer_offsets-12) with 1 members (kafka.coordinator.group.GroupCoordinator)
INFO [GroupCoordinator 0]: Assignment received from leader consumer-1-a6c11c9f-ec26-4e66-adeb-832f699f1247 for group test-group for generation 4. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)

客户端一启动日志

o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] (Re-)joining group
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] (Re-)joining group
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Successfully joined group with generation 4
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Setting newly assigned partitions: test-rebalance-1, test-rebalance-0
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Setting offset for partition test-rebalance-1 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=node1:9092 (id: 0 rack: null), epoch=0}}
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Setting offset for partition test-rebalance-0 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=node1:9092 (id: 0 rack: null), epoch=0}}
o.s.k.l.KafkaMessageListenerContainer    : test-group: partitions assigned: [test-rebalance-1, test-rebalance-0]

有新成员加入后broker日志

INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group test-group in Stable state. Created a new member id consumer-1-97eba1d4-7f5c-4d78-b979-ba6ab9c82395 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
INFO [GroupCoordinator 0]: Preparing to rebalance group test-group in state PreparingRebalance with old generation 4 (__consumer_offsets-12) (reason: Adding new member consumer-1-97eba1d4-7f5c-4d78-b979-ba6ab9c82395 with group instance id None; client reason: not provided) (kafka.coordinator.group.GroupCoordinator)
INFO [GroupCoordinator 0]: Stabilized group test-group generation 5 (__consumer_offsets-12) with 2 members (kafka.coordinator.group.GroupCoordinator)
INFO [GroupCoordinator 0]: Assignment received from leader consumer-1-a6c11c9f-ec26-4e66-adeb-832f699f1247 for group test-group for generation 5. The group has 2 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)

接下来启动客户端二

客户端一日志

 o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] (Re-)joining group
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] (Re-)joining group
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Successfully joined group with generation 5
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Setting newly assigned partitions: test-rebalance-0
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Setting offset for partition test-rebalance-0 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=node1:9092 (id: 0 rack: null), epoch=0}}
o.s.k.l.KafkaMessageListenerContainer    : test-group: partitions assigned: [test-rebalance-0]

客户端二日志


o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Attempt to heartbeat failed since group is rebalancing
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Revoking previously assigned partitions [test-rebalance-1, test-rebalance-0]
o.s.k.l.KafkaMessageListenerContainer    : test-group: partitions revoked: [test-rebalance-1, test-rebalance-0]
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] (Re-)joining group
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Successfully joined group with generation 5
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Setting newly assigned partitions: test-rebalance-1
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=test-group] Setting offset for partition test-rebalance-1 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, o.s.k.l.KafkaMessageListenerContainer    : test-group: partitions assigned: [test-rebalance-1]

当所有成员都退出组后,消费者组状态变更为 Empty。Kafka 定期自动删除过期位移的条件就是,组要处于 Empty 状态。因此,如果你的消费者组停掉了很长时间(超过 7 天),那么 Kafka 很可能就把该组的位移数据删除了。

消费端重平衡流程

重平衡的完整流程需要消费者端和协调者组件(什么是协调者)共同参与才能完成。下面先梳理消费者端的重平衡流程。主要分为3各阶段。

第一阶段:确定组协调器
关于如何确定组协调器,参考Kafka消费者重平衡(一)

第二阶段:JoinGroup

在此阶段的消费者会向Group Coordinator 发送 JoinGroupRequest 请求,并处理响应。然后GroupCoordinator 从一个consumer group中选择第一个(通常情况下)加入group的consumer作为leader(消费组协调器),把consumer group情况发送给这个leader,接着这个leader会负责制定分区方案。

在这一步中,领导者向协调者发送 SyncGroup 请求,将刚刚做出的分配方案发给协调者。值得注意的是,其他成员也会向协调者发送 SyncGroup 请求,只不过请求体中并没有实际的内容。这一步的主要目的是让协调者接收分配方案,然后统一以 SyncGroup 响应的方式分发给所有成员,这样组内所有成员就都知道自己该消费哪些分区了。

在这里插入图片描述

第三阶段:SyncGroup

待领导者制定好分配方案后,重平衡流程进入到 SyncGroup 请求阶段

在这里插入图片描述
SyncGroup 请求的主要目的,就是让协调者把领导者制定的分配方案下发给各个组内成员。当所有成员都成功接收到分配方案后,消费者组进入到 Stable 状态,即开始正常的消费工作。

Broker端重平衡流程

下面只要从几个常见的场景梳理Broker端重平衡的流程。

新成员加入

当协调者收到新的 JoinGroup 请求后,它会通过心跳请求响应的方式通知组内现有的所有成员,强制它们开启新一轮的重平衡。
具体流程如下:

在这里插入图片描述

组成员主动离组

消费者实例所在线程或进程调用 close() 方法时,就会主动通知协调者要退出组。以下时具体的流程

在这里插入图片描述

组成员崩溃离组

崩溃离组是指消费者实例出现严重故障,突然宕机导致的离组。它和主动离组是有区别的,因为后者是主动发起的离组,协调者能马上感知并处理。但崩溃离组是被动的,协调者通常需要等待一段时间才能感知到,这段时间一般是由消费者端参数 session.timeout.ms 控制。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/132816.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

lvs负载均衡、LVS集群部署

四&#xff1a;LVS集群部署 lvs给nginx做负载均衡项目 218lvs&#xff08;DR 负载均衡器&#xff09; yum -y install ipvsadm&#xff08;安装这个工具来管理lvs&#xff09; 设置VIP192.168.142.120 创建ipvsadm的文件用来存放lvs的规则 定义策略 ipvsadm -C //清空现有…

Java经典问题解答(9题)

文章目录 1、通关jwt靶场的其中任意两关&#xff08;该题与Java无关&#xff09;启动环境第4关第5关第7关 2、java是如何跨平台通信的3、java为什么需要类名和文件名一致4、main函数的作用是什么5、.class文件和.java是什么关系6、java在编写函数的时候void是什么意思7、java声…

慢查询SQL如何优化

一.什么是慢SQL? 慢SQL指的是Mysql中执行比较慢的SQL,排查慢SQL最常用的方法是通过慢查询日志来查找慢SQL。Mysql的慢查询日志是Mysql提供的一种日志记录&#xff0c;它用来记录Mysql中响应时间超过long_query_time值的sql,long_query_time的默认时间为10s. 二.查看慢SQL是否…

模拟信号电压或电流信号转变频器频率传感器信号隔离变送器0-5V/0-10V/0-20mA/4-20mA转0-5KHz/0-10KHz/1-5KHz

主要特性: 精度等级&#xff1a;0.1 级、0.2 级。产品出厂前已检验校正&#xff0c;用户可以直接使用输 入 &#xff1a;0-5V/0-10V/1-5V,0-10mA/0-20mA/4-20mA 等输出信号&#xff1a;0-5KHz/0-10KHz/1-5KHz 等标准信号辅助电源&#xff1a;5V、9V、12V、15V 或 24V 直流单电…

使用branch and bound分支定界算法选择UTXO

BnB算法原理 分支定界算法始终围绕着一颗搜索树进行的&#xff0c;我们将原问题看作搜索树的根节点&#xff0c;从这里出发&#xff0c;分支的含义就是将大的问题分割成小的问题。 大问题可以看成是搜索树的父节点&#xff0c;那么从大问题分割出来的小问题就是父节点的子节点…

怎么裁剪图片?总结了下面几个方法

怎么裁剪图片&#xff1f;在日常的生活中&#xff0c;图片已经成为了我们不可或缺的一部分。或许你正在整理自己的相册时&#xff0c;或者我们需要向互联网上发布一些图片的时候&#xff0c;总之我们随时都可能会遇到一张需要进行裁剪的图片。比如说&#xff0c;一些图片上存在…

每日一博 - 反向代理、API 网关、负载均衡

文章目录 概述图解 概述 反向代理、API网关和负载均衡是在网络和服务器架构中用于不同目的的重要组件&#xff0c;它们有不同的功能和应用场景。以下是它们之间的区别和联系&#xff1a; 反向代理&#xff08;Reverse Proxy&#xff09;&#xff1a; 功能&#xff1a;反向代理…

Xilinx FPGA未使用管脚上下拉状态配置(ISE和Vivado环境)

文章目录 ISE开发环境Vivado开发环境方式1&#xff1a;XDC文件约束方式2&#xff1a;生成选项配置 ISE开发环境 ISE开发环境&#xff0c;可在如下Bit流文件生成选项中配置。 右键点击Generate Programming File&#xff0c;选择Process Properties&#xff0c; 在弹出的窗口选…

分类预测 | MATLAB实现基于SVM-Adaboost支持向量机结合AdaBoost多输入分类预测

分类预测 | MATLAB实现基于SVM-Adaboost支持向量机结合AdaBoost多输入分类预测 目录 分类预测 | MATLAB实现基于SVM-Adaboost支持向量机结合AdaBoost多输入分类预测预测效果基本介绍模型描述程序设计参考资料 预测效果 基本介绍 1.MATLAB实现基于SVM-Adaboost支持向量机结合Ada…

软件开发代码审查(review)工具

软件开发代码审查&#xff08;Code Review&#xff09;是一个重要的质量保证实践&#xff0c;旨在发现和修复潜在的问题、缺陷和安全漏洞。为了进行有效的代码审查&#xff0c;开发团队通常使用各种代码审查工具。以下是一些常见的软件开发代码审查工具及其特点&#xff0c;希望…

数据结构之洗牌算法

洗牌算法 1.买一副牌(生成一副牌)2.洗牌3.揭牌完整代码 1.买一副牌(生成一副牌) 2.洗牌 3.揭牌 完整代码 card中的代码: cardDemo中的代码 测试类代码

【日积月累】SpringBoot启动流程

目录 SpringBoot启动流程 1.前言2.构造一个SpringApplication的实例&#xff0c;完成初始化的工作SpringApplication实例构造完之后调用run方法&#xff0c;启动SpringApplication3.SpringBoot启动代码SpringBootConfigurationComponentScanEnableAutoConfiguration 总结参考…

神经网络-pytorch版本

pytorch神经网络基础 torch简介 torch和numpy import torch import numpy as np np_datanp.arange(6).reshape((2,3)) torch_datatorch.from_numpy(np_data) tensor2arraytorch_data.numpy() print(np_data,"\n",torch_data,"\n",tensor2array)torch的数…

竞赛 基于机器视觉的火车票识别系统

文章目录 0 前言1 课题意义课题难点&#xff1a; 2 实现方法2.1 图像预处理2.2 字符分割2.3 字符识别部分实现代码 3 实现效果最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 基于机器视觉的火车票识别系统 该项目较为新颖&#xff0c;适合作为竞赛…

【Linux学习笔记】 - 常用指令学习及其验证(上)

前言&#xff1a;本文主要记录对Linux常用指令的使用验证。环境为阿里云服务器CentOS 7.9。关于环境如何搭建等问题&#xff0c;大家可到同平台等各大资源网进行搜索学习&#xff0c;本文不再赘述。 由于本人对Linux学习程度尚且较浅&#xff0c;本文仅介绍验证常用指令的常用…

27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例(2-1)

Flink 系列文章 1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接 13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的ta…

7-15 求矩阵的局部极大值

输入格式&#xff1a; 输入在第一行中给出矩阵A的行数M和列数N&#xff08;3≤M,N≤20&#xff09;&#xff1b;最后M行&#xff0c;每行给出A在该行的N个元素的值。数字间以空格分隔。 输出格式&#xff1a; 每行按照“元素值 行号 列号”的格式输出一个局部极大值&#xff0…

事件监听-@TransactionalEventListener与@EventListener的介绍、区别和使用

文章目录 前言事件监听-TransactionalEventListener与EventListener的介绍、区别和使用1. EventListener 是什么?2. TransactionalEventListener 是什么?3. TransactionalEventListener与EventListener的缺点3.1. TransactionalEventListener 的缺点&#xff1a;3.2. EventLi…

2.9 PE结构:重建导入表结构

脱壳修复是指在进行加壳保护后的二进制程序脱壳操作后&#xff0c;由于加壳操作的不同&#xff0c;有些程序的导入表可能会受到影响&#xff0c;导致脱壳后程序无法正常运行。因此&#xff0c;需要进行修复操作&#xff0c;将脱壳前的导入表覆盖到脱壳后的程序中&#xff0c;以…

openGauss学习笔记-69 openGauss 数据库管理-创建和管理普通表-更新表中数据

文章目录 openGauss学习笔记-69 openGauss 数据库管理-创建和管理普通表-更新表中数据 openGauss学习笔记-69 openGauss 数据库管理-创建和管理普通表-更新表中数据 修改已经存储在数据库中数据的行为叫做更新。用户可以更新单独一行、所有行或者指定的部分行。还可以独立更新…