Kafka3.0.0版本——消费者(Range分区分配策略以及再平衡)

目录

    • 一、Range分区分配策略原理
      • 1.1、Range分区分配策略原理的示例一
      • 1.2、Range分区分配策略原理的示例二
      • 1.3、Range分区分配策略原理的示例注意事项
    • 二、Range 分区分配策略代码案例
      • 2.1、创建带有4个分区的fiveTopic主题
      • 2.2、创建三个消费者 组成 消费者组
      • 2.3、创建生产者
      • 2.4、测试
      • 2.5、Range 分区分配策略代码案例说明
    • 三、Range 分区分配再平衡案例
      • 3.1、停止某一个消费者后,(45s 以内)重新发送消息示例
      • 3.2、停止某一个消费者后,(45s 以后)重新发送消息示例
      • 3.3、Range 分区分配再平衡案例说明

一、Range分区分配策略原理

  • Range 是对每个 topic 而言的。首先对同一个 topic 里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。

1.1、Range分区分配策略原理的示例一

假如现在有 4 个分区,3 个消费者,排序后的分区将会是0,1,2,3;消费者排序完之后将会是C1,C2,C3。

  • 通过 partitions数/consumer数 来决定每个消费者应该消费几个分区。 如果除不尽,那么前面几个消费者将会多消费 1 个分区。
  • 例如:4/3 = 1 余 1 ,除不尽,那么消费者C1便会多消费1个分区。
    在这里插入图片描述

1.2、Range分区分配策略原理的示例二

假如现在有 5 个分区,3 个消费者,排序后的分区将会是0,1,2,3,4;消费者排序完之后将会是C1,C2,C3。

  • 通过 partitions数/consumer数 来决定每个消费者应该消费几个分区。 如果除不尽,那么前面几个消费者将会多消费 1 个分区。
  • 例如:5/3 = 1 余 2 ,除不尽,那么消费者么C1和C2分别多消费一个分区。
    在这里插入图片描述

1.3、Range分区分配策略原理的示例注意事项

  • 如果只是针对 1 个 topic 而言,C1消费者多消费1个分区影响不是很大。但是如果有N多个topic,那么针对每个 topic,消费者 C1都将多消费 1 个分区,topic越多,C1消费的分区会比其他消费者明显多消费 N 个分区。 容易产生数据倾斜!

二、Range 分区分配策略代码案例

2.1、创建带有4个分区的fiveTopic主题

  • 在 Kafka 集群控制台,创建带有4个分区的fiveTopic主题

    bin/kafka-topics.sh --bootstrap-server 192.168.136.27:9092 --create --partitions 4 --replication-factor 1 --topic fiveTopic
    

    在这里插入图片描述

2.2、创建三个消费者 组成 消费者组

  • 复制 CustomConsumer1类,创建 CustomConsumer2和CustomConsumer3。这样可以由三个消费者组成消费者组,组名都为“test”。

    package com.xz.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;public class CustomConsumer1 {public static void main(String[] args) {// 0 配置Properties properties = new Properties();// 连接 bootstrap.serversproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 配置消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");// 1 创建一个消费者  "", "hello"KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);// 2 订阅主题 firstArrayList<String> topics = new ArrayList<>();topics.add("fiveTopic");kafkaConsumer.subscribe(topics);// 3 消费数据while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
    }
    

2.3、创建生产者

  • 创建CustomProducer生产者。

    package com.xz.kafka.producer;import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    import java.util.Properties;public class CustomProducerCallback {public static void main(String[] args) throws InterruptedException {//1、创建 kafka 生产者的配置对象Properties properties = new Properties();//2、给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");//3、指定对应的key和value的序列化类型 key.serializer value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//4、创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);//5、调用 send 方法,发送消息for (int i = 0; i < 200; i++) {kafkaProducer.send(new ProducerRecord<>("fiveTopic", "hello kafka" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null){System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());}}});Thread.sleep(2);}// 3 关闭资源kafkaProducer.close();}
    }
    

2.4、测试

  • 首先,在 IDEA中分别启动消费者1、消费者2和消费者3代码
    在这里插入图片描述

  • 然后,在 IDEA中分别启动生产者代码
    在这里插入图片描述

  • 在 IDEA 控制台观察消费者1、消费者2和消费者3控制台接收到的数据,如下图所示:

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

2.5、Range 分区分配策略代码案例说明

  • 由上述测试输出结果截图可知: 消费者1消费2分区的数据;消费者2消费0和3分区的数据;消费者3消费2分区的数据。
  • 说明:Kafka 默认的分区分配策略就是 Range + CooperativeSticky,所以不需要修改策略。

三、Range 分区分配再平衡案例

3.1、停止某一个消费者后,(45s 以内)重新发送消息示例

  • 由下图控制台输出可知:2号消费者 消费到 0、3号分区数据。在这里插入图片描述
  • 由下图控制台输出可知:3号消费者 消费到 1号分区数据。
    在这里插入图片描述

3.2、停止某一个消费者后,(45s 以后)重新发送消息示例

  • 由下图控制台输出可知:2号消费者 消费到 0、3号分区数据。
    在这里插入图片描述
  • 由下图控制台输出可知:3号消费者 消费到 1、2号分区数据。
    在这里插入图片描述

3.3、Range 分区分配再平衡案例说明

  • 1号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。
  • 消费者1 已经被踢出消费者组,所以重新按照 range 方式分配。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/133725.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PowerShell脚本免杀/bypass/绕过杀毒软件,ReconFTW 漏洞扫描

PowerShell脚本免杀/bypass/绕过杀毒软件&#xff0c;ReconFTW 漏洞扫描。 #################### 免责声明&#xff1a;工具本身并无好坏&#xff0c;希望大家以遵守《网络安全法》相关法律为前提来使用该工具&#xff0c;支持研究学习&#xff0c;切勿用于非法犯罪活动&#…

STM32 FreeRTOS 内存问题

1. STM32L151C8T6 内存&#xff0c;64Kb 的Flash&#xff08;代码就是烧录在这里面的&#xff09;&#xff0c;16Kb 的RAM&#xff0c;程序跑起来之后的内存&#xff0c;相当于我们高考时发的草稿纸&#xff0c;直接影响程序的运行速度&#xff0c;可以用STM32 CubeMx 软件直接…

【Linux】常用工具(上)

Linux 常用工具 一、Linux 软件包管理器 yum1. 软件包2. 查看软件包3. 安装/卸载软件4. yum 其他指令的功能 二、Linux 编辑器 - vim 使用1. vim 的基本概念2. vim 的基本操作&#xff08;1&#xff09;光标移动&#xff08;命令模式&#xff09;&#xff08;2&#xff09;光标…

性能测试之压力测试

文章目录 一.基本介绍二.性能指标三.下载安装JMeter1.下载安装包2.启动JMeter 四.使用JMeter1.模拟用户请求2.填写测试地址3.接收测试结果4.结果解释 一.基本介绍 压力测试考察当前软硬件条件下系统所能承受的最大负荷并找到系统瓶颈所在。压测是为了系统在线上的处理能力和稳定…

Webserver项目解析

一.webserver的组成部分 Buffer类 用于存储需要读写的数据 Channel类 存储文件描述符和相应的事件&#xff0c;当发生事件时&#xff0c;调用对应的回调函数 ChannelMap类 Channel数组&#xff0c;用于保存一系列的Channel Dispatcher 监听器&#xff0c;可以设置为epo…

【数据结构】堆排序详解

文章目录 一、堆排序思想二、向上调整建堆排序三、向下调整建堆排序四、总结 对于什么是堆&#xff0c;堆的概念分类以及堆的向上和向下两种调整算法可见&#xff1a; 堆的创建 一、堆排序思想 int a[] { 2,3,5,7,4,6 };对于这样一个数组来说&#xff0c;要想要用堆排序对它…

学信息系统项目管理师第4版系列07_项目管理知识体系

1. 项目管理原则 1.1. 勤勉、尊重和关心他人 1.1.1. 关键点 1.1.1.1. 关注组织内部和外部的职责 1.1.1.2. 坚持诚信、关心、可信、合规原则 1.1.1.3. 秉持整体观 1.1.2. 职责 1.1.2.1. 诚信 1.1.2.2. 关心 1.1.2.3. 可信 1.1.2.4. 合规 1.2. 营造协作的项目管理团队…

vim,emacs,verilog-mode这几个到底是啥关系?

vim&#xff1a;不多说了被各类coder誉为地表最强最好用的编辑器&#xff1b;gvim&#xff0c;gui vim的意思&#xff1b; emacs&#xff1a;也是一个编辑器&#xff0c;类似vscode&#xff1b; vim在使用的时候为了增强其功能&#xff0c;有好多好多插件&#xff0c;都是以.…

eNSP网络学习

一、eNSP 1.什么是eNSP eNSP(Enterprise Network Simulation Platform)是一款由华为提供的免费的、可扩展的、图形化操作的网络仿真工具平台&#xff0c;主要对企业网络路由器、交换机进行软件仿真&#xff0c;完美呈现真实设备实景&#xff0c;支持大型网络模拟&#xff0c;让…

浅谈C/S vs. B/S的区别

目录 C/S简介: B/S简介&#xff1a; C/S-B/S区别: 1.硬件环境不同: 2.安全要求不同: 3.处理问题不同&#xff1a; 总结: C/S简介: C/S:客户机(Client)/服务器模式(Server)模型中&#xff0c;(C/S是Client/Server的缩写。客户端需要安装专用的客户端软件) 客户端和服务器…

如何在谷某地球飞行模拟中导入简单飞机开发的飞机模型

如何在谷某地球飞行模拟中导入简单飞机开发的飞机模型 简飞的飞友们&#xff01;我并没有弃坑&#xff0c;只不过我不是你们想象的那样设计飞机。我之前写过一篇图文讲解如何在谷某地球里规划飞行航线&#xff1a; 手把手教你驾驶西锐SR-22小飞机在美国大峡谷中穿行https://b…

c语言每日一练(15)

前言&#xff1a;每日一练系列&#xff0c;每一期都包含5道选择题&#xff0c;2道编程题&#xff0c;博主会尽可能详细地进行讲解&#xff0c;令初学者也能听的清晰。每日一练系列会持续更新&#xff0c;上学期间将看学业情况更新。 五道选择题&#xff1a; 1、程序运行的结果…

【C++】红黑树插入操作实现以及验证红黑树是否正确

文章目录 前言一、红黑树的插入操作1.红黑树结点的定义2.红黑树的插入1.uncle存在且为红2.uncle不存在3.uncle存在且为黑 3.完整代码 二、是否为红黑树的验证1.IsBlance函数2.CheckColor函数 三、红黑树与AVL树的比较 前言 红黑树&#xff0c;是一种二叉搜索树&#xff0c;但在…

驱动轴相机参数设置Web前端界面开发

一、基于Django的Web应用界面的开发&#xff1a; 在Realtimeresults.html上添加一个按钮组件&#xff0c;获取检测到的轴型和车轮信息&#xff0c;点击后可以获取package.json里存放的json数据&#xff0c;效果如下&#xff1a; 实现逻辑&#xff1a;需要从URL设置、视图函数、…

百度千帆大模型文心一言api调用

注册百度智能云账号并申请文心千帆大模型资格 https://login.bce.baidu.com/ https://cloud.baidu.com/product/wenxinworkshop 创建应用用于获取access_token 创建应用成功后,可以获取到API Key和Secret Key 获取access_token curl https://aip.baidubce.com/oauth/2.0/to…

Vue 3的革命性新特性:深入了解Composition API

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文…

Java毕业设计 SSM SpringBoot 水果蔬菜商城

Java毕业设计 SSM SpringBoot 水果蔬菜商城 SSM 水果蔬菜商城 功能介绍 首页 图片轮播 关键字搜索商品 分类菜单 折扣大促销商品 热门商品 商品详情 商品评价 收藏 加入购物车 公告 留言 登录 注册 我的购物车 结算 个人中心 我的订单 商品收藏 修改密码 后台管理 登录 商品…

【玩玩Vue】使用elementui页面布局和控制页面的滚动

原文作者&#xff1a;我辈李想 版权声明&#xff1a;文章原创&#xff0c;转载时请务必加上原文超链接、作者信息和本声明。 文章目录 前言一、页面布局二、页面滚动1.禁用body的滑动2.禁用el-aside的滚动3.启动el-main的滚动 前言 一、页面布局 这里布局使用vueelementui&…

NotePad——xml格式化插件xml tools在线安装+离线安装

在使用NotePad时&#xff0c;在某些情形下&#xff0c;需要格式化Xml格式内容&#xff0c;可以使用Xml Tools插件。 一、在线安装 1. 打开Notepad 软件 2. 选择插件&#xff0c;选择“插件管理” 3. 搜索 XML Tools&#xff0c;找到该插件后&#xff0c;勾选该文件&#xff…

idea部署javaSE项目(awt+swing项目)/idea导入eclipse的javaSE项目

一.idea打开项目 选择需要部署的项目 二、设置JDK 三、引入数据库驱动包 四、执行sql脚本 四、修改项目的数据库连接 找到数据库连接文件 五.其他系统实现 JavaSwing实现学生选课管理系统 JavaSwing实现学校教务管理系统 JavaSwingsqlserver学生成绩管理系统 JavaSwing用…