消费者Rebalance机制

优质博文:IT-BLOG-CN

一、消费者Rebalance机制

Apache Kafka中,消费者组
Consumer Group会在以下几种情况下发生重新平衡Rebalance
【1】消费者加入或离开消费者组: 当一个新的消费者加入消费者组或一个现有的消费者离开消费者组时,Kafka会触发重新平衡,以重新分配分区给消费者。
【2】消费者崩溃或失去连接: 如果Kafka检测到某个消费者崩溃或失去连接(例如,由于网络问题或消费者进程被终止),它会触发重新平衡。
【3】主题的分区数量发生变化: 如果一个主题的分区数量增加或减少,Kafka会触发重新平衡,以确保新的分区被分配给消费者组中的消费者。
【4】消费者组协调器变更: 消费者组协调器是负责管理消费者组的一个Kafka Broker。如果消费者组协调器发生变更(例如,协调器所在的Broker崩溃),也会触发重新平衡。
【5】消费者组成员发送心跳失败: 消费者需要定期向消费者组协调器发送心跳heartbeat以表明它们仍然活跃。如果心跳失败,协调器会认为该消费者已经失去连接,从而触发重新平衡。

rebalance只针对subscribe这种不指定分区消费的情况,如果通过assign这种消费方式指定了分区,kafka不会进行rebanlance

Kafka在高峰期重平衡rebalancing会导致消费者组的停顿,影响系统的性能和稳定性。为了避免在高峰期发生重平衡,可以采取以下几种策略:
【1】优化分区分配策略: 使用RangeAssignorStickyAssignor等分区分配策略来减少重平衡的频率和影响。

RangeAssignorKafka默认的分区分配策略之一,它将分区按范围分配给消费者。

我们通过一个具体的例子来说明RangeAssignor如何分配分区。

假设我们有一个Kafka主题my-topic,它有6个分区P0, P1, P2, P3, P4, P5,并且我们有3个消费者C1, C2, C3在一个消费者组中。

初始分配:假设初始分配如下:

C1: P0, P1
C2: P2, P3
C3: P4, P5

消费者组成员变化:现在假设C2离开了消费者组,那么RangeAssignor会重新分配分区,以确保分区尽量按顺序和均匀地分配给剩余的消费者。新的分配可能如下:

C1: P0, P1, P2
C3: P3, P4, P5

在这个过程中,RangeAssignor将分区按顺序重新分配给剩余的消费者,确保每个消费者分配到的分区尽量连续。

新消费者加入:现在假设有一个新消费者C4加入了消费者组,RangeAssignor会再次按顺序和均匀地分配分区。新的分配可能如下:

C1: P0, P1
C3: P2, P3
C4: P4, P5

在这个过程中,RangeAssignor将分区重新分配,以确保每个消费者分配到的分区尽量连续和均匀。

通过这个例子,我们可以看到RangeAssignor的分配策略:
1、将分区按顺序分配给消费者。
2、当消费者组成员变化时,重新分配分区,以确保分区尽量按顺序和均匀地分配给所有消费者。
3、分区分配尽量保持连续性。
这种策略的好处是分区分配简单且稳定,减少了分区在消费者组成员变化时的重新分配范围,从而减少了重平衡的频率和影响。

以下是配置RangeAssignor的代码示例:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Properties;public class RangeAssignorExample {public static void main(String[] args) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "example-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 设置分区分配策略为 RangeAssignorprops.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RangeAssignor");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(List.of("example-topic"));// 消费消息的逻辑// ...}
}

StickyAssignorKafka 2.4及以上版本引入的一种分区分配策略,它的目标是尽量保持分区分配的稳定性,减少重平衡的频率。

我们通过一个具体的例子来说明StickyAssignor如何分配分区。

假设我们有一个Kafka主题my-topic,它有6个分区P0, P1, P2, P3, P4, P5,并且我们有3个消费者C1, C2, C3在一个消费者组中。

初始分配:假设初始分配如下:

C1: P0, P1
C2: P2, P3
C3: P4, P5

消费者组成员变化:现在假设C2离开了消费者组,那么StickyAssignor会尽量保持现有的分区分配不变,并重新分配C2的分区。新的分配可能如下:

C1: P0, P1, P2
C3: P3, P4, P5

在这个过程中,StickyAssignor尽量保持C1C3的分区分配不变,只是将C2的分区重新分配给其他消费者。

新消费者加入:现在假设有一个新消费者C4加入了消费者组,StickyAssignor会尝试保持现有的分区分配不变,并将分区尽量均匀地分配给所有消费者。新的分配可能如下:

C1: P0, P1
C3: P4, P5
C4: P2, P3

在这个过程中,StickyAssignor保持了C1C3的分区不变,并将C2的分区重新分配给C4

通过这个例子,我们可以看到StickyAssignor的分配策略:
1、尽量保持现有的分区分配不变。
2、当消费者组成员变化时,尽量最小化分区在消费者之间的移动。
3、尽量保持分区分配的平衡性。
这种策略的好处是减少了重平衡带来的影响,提高了分区分配的稳定性,减少了因分区移动带来的数据重新加载和处理的开销。

以下是配置StickyAssignor的代码示例:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Properties;public class StickyAssignorExample {public static void main(String[] args) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "example-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 设置分区分配策略为 StickyAssignorprops.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.StickyAssignor");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(List.of("example-topic"));// 消费消息的逻辑// ...}
}

或者在配置中进行指定

group.id=my-consumer-group
partition.assignment.strategy=org.apache.kafka.clients.consumer.StickyAssignor

【2】增加session.timeout.msheartbeat.interval.ms:增加session.timeout.msheartbeat.interval.ms的值,这样可以减少消费者因为心跳超时而被认为失效,从而触发重平衡。

1、session.timeout.ms是消费者与Kafka broker之间的会话超时时间。如果在这个时间内Kafka broker没有收到某个消费者的心跳,broker就会认为该消费者已经失效,并触发重平衡
2、heartbeat.interval.ms是消费者发送心跳给Kafka broker的时间间隔。心跳是消费者向broker表示自己仍然活跃的方式。

session.timeout.ms=30000
heartbeat.interval.ms=3000

3、heartbeat.interval.ms的值通常要远小于session.timeout.ms的值。这样可以确保在会话超时之前,消费者有多次机会发送心跳。一般建议session.timeout.ms至少是heartbeat.interval.ms10倍,以确保有足够的时间进行多次心跳尝试。

【3】合理配置消费者组:确保消费者组中的消费者数量稳定,避免频繁地增加或减少消费者。尽量在低峰期进行消费者的添加或移除操作。

【4】优化消费者性能:提高消费者的处理能力,确保消费者能够及时处理消息,避免因为处理延迟导致的重平衡。使用异步处理或批量处理来提高消费者的吞吐量。

【5】监控和报警:实时监控Kafka集群和消费者组的状态,设置报警机制,当检测到重平衡风险时,及时采取措施。

【6】使用静态成员Static MembershipKafka 2.3及以上版本支持静态成员功能,可以通过配置group.instance.id来减少重平衡的频率。

group.instance.idKafka 2.4.0引入的一个配置项,用于为每个消费者实例指定一个唯一的标识符。当消费者组中的消费者具有唯一的group.instance.id时,Kafka可以更智能地处理消费者组成员的变化,从而减少不必要的重平衡。

静态成员:通过配置group.instance.id,消费者实例变成了“静态成员”,即使它们暂时断开连接,Kafka也会保留它们的成员身份。这与传统的动态成员(没有group.instance.id)不同,动态成员在断开连接后会被移除,从而触发重平衡。

group.id=my-consumer-group
group.instance.id=consumer-instance-1

【7】调整rebalance.timeout.ms:增加rebalance.timeout.ms的值,确保消费者有足够的时间完成重平衡过程,避免因超时导致的频繁重平衡。

消费者Rebalance分区分配策略

主要包含四种relalance策略:RangeAssignor(范围分配策略),RoundRobinAssignor(轮询分配策略),StickyAssignor(粘性分配策略),CooperativeStickyAssignor(协作粘性分配策略),之前已经讲过两个,这里聊聊剩下的两个

RoundRobinAssignor(轮询分配策略)

RoundRobinAssignor采用轮询的方式将分区分配给消费者。它会将所有分区和消费者按照字典顺序排序,然后依次将每个分区分配给下一个消费者,直到所有分区都被分配完毕。

CooperativeStickyAssignor(协作粘性分配策略)

CooperativeStickyAssignorStickyAssignor的改进版本,它引入了协作重平衡的概念,使得重平衡过程更加平滑,减少了重平衡期间的停顿时间。

二、Rebalance 过程

第一阶段:选择"组协调器"
组协调器GroupCoordinator:每个consumer group都会选择一个broker作为自己的组协调器coordinator,负责监控这个消费组里的所有消费者的心跳,以及判断是否宕机,然后开启消费者rebalance

consumer group中的每个consumer启动时会向kafka集群中的某个节点发送FindCoordinatorRequest请求来查找对应的组协调器GroupCoordinator,并跟其建立网络连接。

组协调器选择方式:consumer消费的offset要提交到__consumer_offsets的哪个分区,这个分区leader对应的broker就是这个consumer groupcoordinator

第二阶段:加入消费组JOIN GROUP
在成功找到消费组所对应的GroupCoordinator之后就进入加入消费组的阶段,在此阶段的消费者会向GroupCoordinator发送JoinGroupRequest请求,并处理响应。然后GroupCoordinator从一个consumer group中选择第一个加入groupconsumer作为leader(消费组协调器),把consumer group情况发送给这个leader,接着这个leader会负责制定分区方案。

第三阶段:SYNC GROUP
consumer leader通过给GroupCoordinator发送SyncGroupRequest,接着GroupCoordinator就把分区方案下发给各个consumer,他们会根据指定分区的leader broker进行网络连接以及消息消费。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/440379.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

人机协作:科技与人类智慧的融合

随着科技的飞速发展&#xff0c;越来越多的领域开始借助人工智能&#xff08;AI&#xff09;和自动化技术来提升工作效率。人机协作&#xff08;Human-Machine Collaboration&#xff09;这一概念逐渐成为现代技术进步的核心。它不仅改变了我们的工作方式&#xff0c;也在重新定…

智能家居有哪些产品?生活中常见的人工智能有哪些?

智能家居有哪些产品? 1、智能照明设备类&#xff1a;智能开关、智能插座、灯控模块、智能空开、智能灯、无线开关。 2、家庭安防类&#xff1a;智能门锁、智能摄像机、智能猫眼、智能门铃。 3、智能传感器类&#xff1a;烟雾传感器、可燃气体传感器、水浸传感器、声光报警器…

舵机驱动详解(模拟/数字 STM32)

目录 一、介绍 二、模块原理 1.舵机驱动原理 2.引脚描述 三、程序设计 main.c文件 servo.h文件 servo.c文件 四、实验效果 五、资料获取 项目分享 一、介绍 舵机(Servo)是在程序的控制下&#xff0c;在一定范围内连续改变输出轴角度并保持的电机系统。即舵机只支持…

【ADC】噪声(1)噪声分类

概述 本文学习于TI 高精度实验室课程&#xff0c;总结 ADC 的噪声分类&#xff0c;并简要介绍量化噪声和热噪声。 文章目录 概述一、ADC 中的噪声类型二、量化噪声三、热噪声四、量化噪声与热噪声对比 一、ADC 中的噪声类型 ADC 固有噪声由两部分组成&#xff1a;第一部分是量…

鸿蒙开发(NEXT/API 12)【穿戴设备传感器获取】手机侧应用开发

手机侧应用可以通过Wear Engine获取穿戴设备上的传感器信息&#xff0c;并通过打开、关闭命令控制获取传感器数据。 使用传感器相关接口前&#xff0c;需要向手机侧用户申请获取对应权限的授权 传感器类型申请权限ECG、PPG、HR[HEALTH_SENSOR]人体传感器ACC、GYRO、MAG[MOTIO…

C#医学影像分析源码,医院影像中心PACS系统源码

医学影像系统源码&#xff0c;影像诊断系统PACS源码&#xff0c;C#语言&#xff0c;C/S架构的PACS系统全套源代码。 PACS系统是医院影像科室中应用的一种系统&#xff0c;主要用于获取、传输、存档和处理医学影像。它通过各种接口&#xff0c;如模拟、DICOM和网络&#xff0c;以…

计算机毕业设计 玩具租赁系统的设计与实现 Java实战项目 附源码+文档+视频讲解

博主介绍&#xff1a;✌从事软件开发10年之余&#xff0c;专注于Java技术领域、Python人工智能及数据挖掘、小程序项目开发和Android项目开发等。CSDN、掘金、华为云、InfoQ、阿里云等平台优质作者✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精…

通信工程学习:什么是RARP反向地址解析协议

RARP&#xff1a;反向地址解析协议 RARP&#xff08;Reverse Address Resolution Protocol&#xff0c;反向地址解析协议&#xff09;是一种网络协议&#xff0c;其主要作用是在设备只知道物理地址&#xff08;如MAC地址&#xff09;时&#xff0c;允许其从网关服务器的地址解析…

LLM prompt提示设计与优化

参看&#xff1a; https://help.aliyun.com/zh/model-studio/use-cases/prompt-engineering-guide?spma2c4g.11186623.0.0.136d55ceDnHbPK https://tenten.co/learning/co-star-tidd-ec-prompt-framework/ 大语言模型中 Prompt 的设计和优化方法&#xff0c;包括使用 Prompt 框…

深度学习中的结构化概率模型 - 使用图来描述模型结构篇

序言 在深度学习的探索之路上&#xff0c;结构化概率模型以其独特的视角和强大的表达能力&#xff0c;成为了研究复杂数据关系的重要工具。这一模型的核心在于其巧妙地利用图来描述模型结构&#xff0c;将随机变量间的复杂交互关系可视化、结构化。图的引入&#xff0c;不仅为…

CPU 多级缓存

在多线程并发场景下&#xff0c;普通的累加很可能错的 CPU 多级缓存 Main Memory : 主存Cache : 高速缓存&#xff0c;数据的读取存储都经过此高速缓存CPU Core : CPU 核心Bus : 系统总线 CPU Core 和 Cache 通过快速通道连接&#xff0c;Main menory 和 Cache 都挂载到 Bus 上…

python爬虫 - 初识requests模块

&#x1f308;个人主页&#xff1a;https://blog.csdn.net/2401_86688088?typeblog &#x1f525; 系列专栏&#xff1a;https://blog.csdn.net/2401_86688088/category_12797772.html 前言 requests 是一个用于发送 HTTP 请求的 Python 库&#xff0c;设计简单且功能强大&am…

强制删除了windows自带的edge浏览器,重装不了怎么办【已解决】

#最近我的edge浏览器出了点问题&#xff0c;点击打不开但是能在下面的任务栏看到他开启了&#xff0c;就是不能够显示在桌面&#xff0c;小窗口叫我配置设置。 我不懂&#xff0c;感觉很烦&#xff0c;就把他强制卸载了。但是windows是不允许将他卸载的&#xff0c;使用window…

Java | Leetcode Java题解之第461题汉明距离

题目&#xff1a; 题解&#xff1a; class Solution {public int hammingDistance(int x, int y) {int s x ^ y, ret 0;while (s ! 0) {s & s - 1;ret;}return ret;} }

二叉树进阶学习——从中序和后续遍历序列构建二叉树

1.题目解析 题目来源&#xff1a;106.从中序和后序遍历序列构造二叉树 测试用例 2.算法原理 后序遍历&#xff1a;按照左子树->右子树->根节点的顺序遍历二叉树&#xff0c;也就是说最末尾的节点是最上面的根节点 中序遍历&#xff1a;按照左子树->根节点->右子树…

Qt操作主/从视图及XML——实例:汽车管理系统

目录 1. 主界面布局2.连接数据库3.主/从视图应用 1. 主界面布局 先创建一个QMainwindow&#xff0c;不带设计界面 #ifndef MAINWINDOW_H #define MAINWINDOW_H#include <QMainWindow> #include <QGroupBox> #include <QTableView> #include <QListWidg…

探索Python文本处理的新境界:textwrap库揭秘

文章目录 **探索Python文本处理的新境界&#xff1a;textwrap库揭秘**一、背景介绍二、textwrap库是什么&#xff1f;三、如何安装textwrap库&#xff1f;四、简单函数使用方法4.1 wrap()4.2 fill()4.3 shorten()4.4 dedent()4.5 indent() 五、实际应用场景5.1 格式化日志输出5…

Study-Oracle-11-ORALCE19C-ADG集群搭建

一、ORACLE--ADG VS ORACLE--DG的区别 1、DG是Oracle数据库的一种灾难恢复和数据保护解决方案&#xff0c;它通过在主数据库和一个或多个备用数据库之间实时复制数据&#xff0c;提供了数据的冗余备份和故障切换功能。它的主要作用是灾难恢复&#xff0c;可以在主数据库发生故…

Html批量转word工具2.1

2024年10月7日记录&#xff1a; 有客户反馈&#xff0c;2.0刚运行就提示转换完成 有问题就解决。正好国庆假期这几天有空&#xff0c;2.1版就出炉了。 2.1 更新记录&#xff1a; 修复了1个bug&#xff1a;刚运行就提示转换完成 下载地址&#xff1a;Html 转 word 批量处理工具…

基于SpringBoot vue3 的山西文旅网java网页设计与实现

博主介绍&#xff1a;专注于Java&#xff08;springboot ssm springcloud等开发框架&#xff09; vue .net php phython node.js uniapp小程序 等诸多技术领域和毕业项目实战、企业信息化系统建设&#xff0c;从业十五余年开发设计教学工作☆☆☆ 精彩专栏推荐订阅☆☆☆☆…