Kafka 分区分配及再平衡策略深度解析与消费者事务和数据积压的简单介绍

Kafka:分布式消息系统的核心原理与安装部署-CSDN博客

自定义 Kafka 脚本 kf-use.sh 的解析与功能与应用示例-CSDN博客

Kafka 生产者全面解析:从基础原理到高级实践-CSDN博客

Kafka 生产者优化与数据处理经验-CSDN博客

Kafka 工作流程解析:从 Broker 工作原理、节点的服役、退役、副本的生成到数据存储与读写优化-CSDN博客

Kafka 消费者全面解析:原理、消费者 API 与Offset 位移-CSDN博客

Kafka 分区分配及再平衡策略深度解析与消费者事务和数据积压的简单介绍-CSDN博客

Kafka 数据倾斜:原因、影响与解决方案-CSDN博客

Kafka 核心要点解析_kafka mirrok-CSDN博客

Kafka 核心问题深度解析:全面理解分布式消息队列的关键要点_kafka队列日志-CSDN博客

目录

一、分区分配策略基础

二、Range 分区分配策略

(一)原理

(二)案例

(三)Range 分区分配再平衡案例

三、RoundRobin 分区分配策略

(一)原理

(二)案例

(三)RoundRobin 分区分配再平衡案例

四、Sticky 分区分配策略

(一)原理

(二)案例

(三)Sticky 分区分配再平衡案例

五、CooperativeSticky 分区分配策略

六、消费者事务

七、数据积压(消费者如何提高吞吐量)

八、总结


        在 Kafka 的消费任务处理中,分区的分配以及再平衡是至关重要的环节。合理的分区分配策略能够确保消费者高效地处理消息,而理解再平衡机制则有助于应对消费者组在运行过程中的动态变化。本文将深入探讨 Kafka 中不同的分区分配策略,包括 Range、RoundRobin、Sticky 和 CooperativeSticky,以及它们在各种场景下的再平衡表现,并结合实际案例进行详细分析,并对消费者事务和数据积压进行简单介绍。

一、分区分配策略基础

        在一个 Kafka 消费者组中,包含多个消费者,而一个主题则由多个分区组成。关键问题在于确定哪个消费者来消费哪个分区的数据。Kafka 提供了四种主流的分区分配策略,并且可以通过配置参数 partition.assignment.strategy 来修改分区的分配策略,默认策略是 Range + CooperativeSticky。同时,还有一些相关的重要参数:

参数名称

描述

heartbeat.interval.ms

Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。 该条目的值必须小于session.timeout.ms,也不应该高于 session.timeout.ms 的 1/3。

session.timeout.ms

Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。超 过该值,该消费者被移除,消费者组执行再平衡。

max.poll.interval.ms

消费者处理消息的最大时长,默认是 5 分钟。超过该值,该 消费者被移除,消费者组执行再平衡

partition.assignment.strategy

消 费 者 分 区 分 配 策 略 , 默 认 策 略 是 Range +CooperativeSticky。Kafka 可以同时使用多个分区分配策略。

可 以 选 择 的 策 略 包 括 : Range 、 RoundRobin 、 Sticky 、CooperativeSticky

二、Range 分区分配策略

(一)原理

        Range 分区分配策略是基于主题的分区数量和消费者数量进行分配。它会按照顺序将连续的分区分配给每个消费者,尽可能平均地分配分区,但可能会导致不同消费者分配到的分区数量不一致。

(二)案例

首先,将主题 first 修改为 7 个分区:

bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --alter --topic first --partitions 7

注意,分区数可增加但不能减少,主题的副本数修改需要制定计划执行,不能直接修改。


         由三个消费者 CustomConsumerCustomConsumer1CustomConsumer2 组成消费者组,组名都为 “test”,同时启动这 3 个消费者。


        启动 CustomProducer 生产者,发送 500 条消息,随机发送到不同的分区(修改发送次数为 500 次)。

package com.bigdata.kafka.producer;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class CustomProducerCallback {public static void main(String[] args) throws InterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.235.128:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = newKafkaProducer<String, String>(properties);// 4. 调用 send 方法,发送消息for (int i = 0; i < 500; i++) {// 添加回调kafkaProducer.send(new ProducerRecord<>("first","bigdata " + i), new Callback() {// 该方法在 Producer 收到 ack 时调用,为异步调用@Overridepublic void onCompletion(RecordMetadata metadata,Exception exception) {if (exception == null) {// 没有异常,输出信息到控制台System.out.println(" 主题: " +metadata.topic() + "->" + "分区:" + metadata.partition());} else {// 出现异常打印exception.printStackTrace();}}});// 延迟一会会看到数据发往不同分区Thread.sleep(20);}// 5. 关闭资源kafkaProducer.close();}
}

说明:Kafka 默认的分区分配策略就是 Range + CooperativeSticky,所以不需要修改策略。

默认是Range,但是在经过一次升级之后,会自动变为CooperativeSticky。这个是官方给出的解释。

默认的分配器是[RangeAssignor, CooperativeStickyAssignor],默认情况下将使用RangeAssignor,但允许通过一次滚动反弹升级到CooperativeStickyAssignor,该滚动反弹会将RangeAssignor从列表中删除。


        观察消费情况,发现一个消费者消费了 5,6 分区,一个消费了 0,1,2 分区,一个消费了 3,4 分区。这是按照 Range 策略分配的结果。

此时并没有修改分区策略,原因是默认是Range.

(三)Range 分区分配再平衡案例

        停止掉 0 号消费者,快速重新发送消息(45s 以内),此时 1 号消费者消费到 3、4 号分区数据,2 号消费者消费到 5、6 号分区数据,0 号的数据无人消费。

说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。

        再次重新发送消息(45s 以后),1 号消费者消费到 0、1、2、3 号分区数据,2 号消费者消费到 4、5、6 号分区数据。

说明:消费者 0 已经被踢出消费者组,所以重新按照 range 方式分配。

 

三、RoundRobin 分区分配策略

(一)原理

        RoundRobin 分区分配策略以轮询的方式将分区分配给消费者,确保每个消费者尽可能均衡地获取分区,不考虑主题的因素,只要是消费者组内的分区都会按照轮询顺序分配。

(二)案例

        在 CustomConsumerCustomConsumer1CustomConsumer2 三个消费者代码中修改分区分配策略为 RoundRobin(指定 org.apache.kafka.clients.consumer.RoundRobinAssignor),并修改消费者组为 test2

package com.bigdata.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumerWithFenPei {public static void main(String[] args) {Properties properties = new Properties();// 连接kafkaproperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop11:9092");// 字段反序列化   key 和  valueproperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 配置消费者组(组名任意起名) 必须properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");// 指定分区的分配方案properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);// 消费者订阅主题,主题有数据就会拉取数据// 指定消费的主题ArrayList<String> topics = new ArrayList<>();topics.add("first");// 一个消费者可以订阅多个主题kafkaConsumer.subscribe(topics);while(true){//1 秒中向kafka拉取一批数据ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String,String> record :records) {// 打印一条数据System.out.println(record);// 可以打印记录中的很多内容,比如 key  value  offset topic 等信息System.out.println(record.value());}}}
}修改一下消费者组为test2

 

重启 3 个消费者,重复发送消息步骤并观察分区结果。

 

 

(三)RoundRobin 分区分配再平衡案例

        停止掉 0 号消费者,快速重新发送消息(45s 以内),1 号消费者消费到 2、5 号分区数据,2 号消费者消费到 4、1 号分区数据,0 号消费者以前对应的数据无人消费。

说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。

        再次重新发送消息(45s 以后),1 号消费者消费到 0、2、4、6 号分区数据,2 号消费者消费到 1、3、5 号分区数据。

说明:消费者 0 已经被踢出消费者组,所以重新按照 RoundRobin 方式分配。

四、Sticky 分区分配策略

(一)原理

        粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前, 考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。 粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区 到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化

(二)案例

1)需求

设置主题为 first,7 个分区;准备 3 个消费者,采用粘性分区策略,并进行消费,观察

消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。

2)步骤

(1)修改分区分配策略为粘性。

注意:3 个消费者都应该注释掉,之后重启 3 个消费者,如果出现报错,全部停止等

会再重启,或者修改为全新的消费者组。

// 修改分区分配策略
ArrayList<String> startegys = new ArrayList<>();
startegys.add("org.apache.kafka.clients.consumer.StickyAssignor");
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, startegys);

(3)使用同样的生产者发送 500 条消息。

可以看到会尽量保持分区的个数近似划分分区。

(三)Sticky 分区分配再平衡案例

        停止掉 0 号消费者,快速重新发送消息(45s 以内),1 号消费者消费到 2、5、3 号分区数据,2 号消费者消费到 4、6 号分区数据,0 号消费者的任务无人顶替。

说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需

要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。

        再次重新发送消息(45s 以后),1 号消费者消费到 2、3、5 号分区数据,2 号消费者消费到 0、1、4、6 号分区数据。

说明:消费者 0 已经被踢出消费者组,所以重新按照粘性方式分配。

五、CooperativeSticky 分区分配策略

        CooperativeSticky 是新添加的策略。在消费过程中,会根据消费的偏移量情况进行重新再平衡,也就是粘性分区,并且在运行过程中还会根据消费的实际情况重新分配消费者,直到平衡为止。其好处是实现负载均衡,但多次平衡会浪费性能,它采用动态平衡,在消费过程中实施再平衡,而不是等到某个消费者退出再平衡。

六、消费者事务

        若要实现 Kafka 消费端的精准一次性消费,需要将消费过程和提交 offset 过程做原子绑定。此时可将 Kafka 的 offset 保存到支持事务的自定义介质(如 MySQL),这部分知识将在后续项目中深入涉及,事务具有 ACID 四大特征,例如转账场景(张三 --> 李四)就需要事务的保障来确保数据的准确性和完整性。

七、数据积压(消费者如何提高吞吐量)

        当面临数据积压问题时,消费者可以采取多种方式提高吞吐量,例如增加消费者数量、优化消费者代码处理逻辑、调整相关参数(如 max.poll.interval.ms 等)以适应更高的处理负载等。后续将深入探讨数据积压场景下的优化策略。

八、总结

        通过对 Kafka 分区分配以及再平衡策略的深入理解和实践,可以更好地构建和优化 Kafka 消费任务处理流程,确保系统的高效稳定运行。在实际应用中,需要根据具体的业务需求和场景特点选择合适的分区分配策略,并合理处理再平衡过程中的各种情况。

        消费者事务方面,为实现精准一次性消费,需将消费与提交 offset 原子绑定,可将 offset 存于支持事务的自定义介质如 MySQL 中。在数据积压场景下,消费者可通过增加数量、优化代码处理逻辑、调整参数等方式提高吞吐量,后续会深入探讨相关优化策略。这些知识对于深入理解和优化 Kafka 消费者的性能、可靠性和数据处理准确性具有极为重要的意义,有助于在实际应用中更好地构建和管理基于 Kafka 的系统架构

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/476699.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[Unity Demo]从零开始制作空洞骑士Hollow Knight第二十集:制作专门渲染HUD的相机HUD Camera和画布HUD Canvas

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、制作HUD Camera以及让两个相机同时渲染屏幕二、制作HUD Canvas 1.制作法力条Soul Orb引入库2.制作生命条Health读入数据3.制作吉欧统计数Geo Counter4.制作…

30. 并发编程

一、什么是多任务 如果一个操作系统上同时运行了多个程序&#xff0c;那么称这个操作系统就是 多任务的操作系统&#xff0c;例如&#xff1a;Windows、Mac、Android、IOS、Harmony 等。如果是一个程序&#xff0c;它可以同时执行多个事情&#xff0c;那么就称为 多任务的程序。…

ElasticSearch学习篇17_《检索技术核心20讲》最邻近检索-局部敏感哈希、乘积量化PQ思路

目录 场景在搜索引擎和推荐引擎中&#xff0c;对相似文章去重是一个非常重要的环节&#xff0c;另外是拍照识花、摇一摇搜歌等场景都可以使用它快速检索。 基于敏感性哈希的检索更擅长处理字面上的相似而不是语义上的相似。 向量空间模型ANN检索加速思路 局部敏感哈希编码 随…

mongodb多表查询,五个表查询

需求是这样的&#xff0c;而数据是从mysql导入进来的&#xff0c;由于mysql不支持数组类型的数据&#xff0c;所以有很多关联表。药剂里找药物&#xff0c;需要药剂与药物的关联表&#xff0c;然后再找药物表。从药物表里再找药物与成分关联表&#xff0c;最后再找成分表。 这里…

《机器人控制器设计与编程》考试试卷**********大学2024~2025学年第(1)学期

消除误解&#xff0c;课程资料逐步公开。 复习资料&#xff1a; Arduino-ESP32机器人控制器设计练习题汇总_arduino编程语言 题-CSDN博客 试卷样卷&#xff1a; 开卷考试&#xff0c;时间&#xff1a; 2024年11月16日 001 002 003 004 005 ……………………装………………………

DataWorks快速入门

DataWorks基于MaxCompute、Hologres、EMR、AnalyticDB、CDP等大数据引擎&#xff0c;为数据仓库、数据湖、湖仓一体等解决方案提供统一的全链路大数据开发治理平台。本文以DataWorks的部分核心功能为例&#xff0c;指导您使用DataWorks接入数据并进行业务处理、周期调度以及数据…

0基础跟德姆(dom)一起学AI NLP自然语言处理01-自然语言处理入门

1 什么是自然语言处理 自然语言处理&#xff08;Natural Language Processing, 简称NLP&#xff09;是计算机科学与语言学中关注于计算机与人类语言间转换的领域. 2 自然语言处理的发展简史 3 自然语言处理的应用场景 语音助手机器翻译搜索引擎智能问答...

Python Matplotlib 安装指南:使用 Miniconda 实现跨 Linux、macOS 和 Windows 平台安装

Python Matplotlib 安装指南&#xff1a;使用 Miniconda 实现跨 Linux、macOS 和 Windows 平台安装 Matplotlib是Python最常用的数据可视化工具之一&#xff0c;结合Miniconda可以轻松管理安装和依赖项。在这篇文章中&#xff0c;我们将详细介绍如何使用Miniconda在Linux、mac…

Cmakelist.txt之win-c-udp-client

1.cmakelist.txt cmake_minimum_required(VERSION 3.16) ​ project(c_udp_client LANGUAGES C) ​ add_executable(c_udp_client main.c) ​ target_link_libraries(c_udp_client wsock32) ​ ​ include(GNUInstallDirs) install(TARGETS c_udp_clientLIBRARY DESTINATION $…

移动充储机器人“小奥”的多场景应用(上)

一、高速公路服务区应用 在高速公路服务区&#xff0c;新能源汽车的充电需求得到“小奥”机器人的及时响应。该机器人配备有储能电池和自动驾驶技术&#xff0c;能够迅速定位至指定充电点&#xff0c;为待充电的新能源汽车提供服务。得益于“小奥”的机动性&#xff0c;其服务…

Mono Repository方案与ReactPress的PNPM实践

ReactPress Github项目地址&#xff1a;https://github.com/fecommunity/reactpress 欢迎Star。 Mono Repository方案与ReactPress的PNPM实践 在当今软件开发领域&#xff0c;Mono Repository&#xff08;简称Monorepo&#xff09;已成为一种流行的代码管理方式&#xff0c;特…

人工智能(AI)与机器学习(ML)基础知识

目录 1. 人工智能与机器学习的核心概念 什么是人工智能&#xff08;AI&#xff09;&#xff1f; 什么是机器学习&#xff08;ML&#xff09;&#xff1f; 什么是深度学习&#xff08;DL&#xff09;&#xff1f; 2. 机器学习的三大类型 &#xff08;1&#xff09;监督式学…

ROS之什么是Node节点和Package包?

1.什么是ROS&#xff1f; 官方术语&#xff1a;ROS&#xff08;Robot Operating System&#xff0c;机器人操作系统&#xff09;是一个开源的、模块化的机器人软件框架。它为机器人开发提供了一套工具和库&#xff0c;用于实现硬件抽象、设备驱动、消息传递、多线程管理等功能…

【1.4 Getting Started--->Support Matrix】

主页&#xff1a;支持矩阵 这些支持矩阵概述了 TensorRT API、解析器和层支持的平台、特性和硬件功能。 Support Matrix Abstract 这些支持矩阵概述了 TensorRT API、解析器和层所支持的平台、功能和硬件功能。 有关之前发布的 TensorRT 文档&#xff0c;请参阅 TensorRT 档…

C语言教程指针笔记整理(二)

https://www.bilibili.com/video/BV1cx4y1d7Ut?spm_id_from333.788.videopod.episodes&vd_sourcee8984989cddeb3ef7b7e9fd89098dbe8&p107 本篇为贺宏宏老师C语言教程指针部分笔记整理 //8-19 一维数组和二维数组 // int arr[4] [][][][] //含义&#xff1a; //1.arr…

Java 对象头、Mark Word、monitor与synchronized关联关系以及synchronized锁优化

1. 对象在内存中的布局分为三块区域&#xff1a; &#xff08;1&#xff09;对象头&#xff08;Mark Word、元数据指针和数组长度&#xff09; 对象头&#xff1a;在32位虚拟机中&#xff0c;1个机器码等于4字节&#xff0c;也就是32bit&#xff0c;在64位虚拟机中&#xff0…

Consumer Group

不&#xff0c;kafka-consumer-groups.sh 脚本本身并不用于创建 Consumer Group。它主要用于管理和查看 Consumer Group 的状态和详情&#xff0c;比如列出所有的 Consumer Group、查看特定 Consumer Group 的详情、删除 Consumer Group 等。 Consumer Group 是由 Kafka 消费者…

C语言数据结构——详细讲解 双链表

从单链表到双链表&#xff1a;数据结构的演进与优化 前言一、单链表回顾二、单链表的局限性三、什么是双链表四、双链表的优势1.双向遍历2.不带头双链表的用途3.带头双链表的用途 五、双链表的操作双链表的插入操作&#xff08;一&#xff09;双链表的尾插操作&#xff08;二&a…

ElasticSearch学习篇18_《检索技术核心20讲》LevelDB设计思想

目录 一些常见的设计思想以及基于LSM树的LevelDB是如何利用这些设计思想优化存储、检索效率的。 几种常见的设计思想 索引和数据分离减少磁盘IO读写分离分层思想 LevelDB的设计思想 读写分离设计分层设计与延迟合并LRU缓存加速检索 几种常见设计思想 索引与数据分离 索引…

JavaWeb之综合案例

前言 这一节讲一个案例 1. 环境搭建 然后就是把这些数据全部用到sql语句中执行 2.查询所有-后台&前台 我们先写后台代码 2.1 后台 2.2 Dao BrandMapper&#xff1a; 注意因为数据库里面的名称是下划线分割的&#xff0c;我们类里面是驼峰的&#xff0c;所以要映射 …