Kafka Producer发送消息流程之分区器和数据收集器

文章目录

  • 1. Partitioner分区器
  • 2. 自定义分区器
  • 3. RecordAccumulator数据收集器

1. Partitioner分区器

在这里插入图片描述

clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java,中doSend方法,记录了生产者将消息发送的流程,其中有一步就是计算当前消息应该发送往对应Topic哪一个分区,

int partition = partition(record, serializedKey, serializedValue, cluster);

private final Partitioner partitioner;private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {//当record的分区已存在,则直接返回,这对应了创建Record时可以手动传入partition参数if (record.partition() != null)return record.partition();// 如果存在partitioner分区器,则使用Partitioner.partition方法计算分区数据if (partitioner != null) {int customPartition = partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);if (customPartition < 0) {throw new IllegalArgumentException(String.format("The partitioner generated an invalid partition number: %d. Partition number should always be non-negative.", customPartition));}return customPartition;}// 如果没有分区器的情况if (serializedKey != null && !partitionerIgnoreKeys) {// hash the keyBytes to choose a partitionreturn BuiltInPartitioner.partitionForKey(serializedKey, cluster.partitionsForTopic(record.topic()).size());} else {return RecordMetadata.UNKNOWN_PARTITION;}}// 利用键的哈希值来选择分区
public static int partitionForKey(final byte[] serializedKey, final int numPartitions) {return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;}

2. 自定义分区器

新建类实现Partitioner接口,key是字符串数字,奇数送到分区0,偶数送到分区1 。

public class MyKafkaPartitioner implements Partitioner {@Overridepublic int partition(String s, Object key, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {// Ensure the key is a non-null stringif (key == null || !(key instanceof String)) {throw new IllegalArgumentException("Key must be a non-null String");}// Parse the key as an integerint keyInt;try {keyInt = Integer.parseInt((String) key);} catch (NumberFormatException e) {throw new IllegalArgumentException("Key must be a numeric string", e);}// Determine the partition based on the key's odd/even natureif (keyInt % 2 == 0) {return 1; // Even keys go to partition 2} else {return 0; // Odd keys go to partition 0}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}

新建一个存在多分区的Topic。

在这里插入图片描述

public class KafkaProducerPartitionorTest {public static void main(String[] args) throws InterruptedException {//创建producerHashMap<String, Object> config = new HashMap<>();config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//指定拦截器config.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ValueInterceptorTest.class.getName());//指定分区器config.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyKafkaPartitioner.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);for (int i = 0; i < 10; i++) {//创建recordProducerRecord<String, String> record = new ProducerRecord<String, String>("test1","key"+i,"我是你爹"+i);//发送recordproducer.send(record);Thread.sleep(500);}//关闭producerproducer.close();}
}

配置好PARTITIONER_CLASS_CONFIG后发送消息。
在这里插入图片描述
在这里插入图片描述

可以分区器成功起作用了。

3. RecordAccumulator数据收集器

通过数据校验后,数据从分区器来到数据收集器

数据收集器的工作机制

  1. 队列缓存RecordAccumulator为每个分区维护一个队列。默认情况下,每个队列的批次大小(buffer size)是16KB,这个大小可以通过配置参数batch.size来调整。

  2. 缓冲区管理

    • 每个分区都有一个或多个批次,每个批次包含多条消息。
    • 当一个批次填满(即达到batch.size),或者达到发送条件(如linger.ms时间窗口,即发送消息前等待的时间)时,批次会被标记为可发送状态,并被传递给Sender线程。
  3. 满批次处理

    • 当某个分区的队列中的某个批次大小超过了16KB(默认值)或满足linger.ms的时间条件,RecordAccumulator会将该批次加入到一个待发送的队列中。
    • Sender线程会从待发送队列中获取这些满批次并将其发送到Kafka集群。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/380980.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【自动化测试】几种常见的自动化测试框架

在软件测试领域&#xff0c;自动化测试框架有很多&#xff0c;这里主要介绍几种常用的自动化测试框架。 1.pytest pytest 是 Python 的一种单元测试框架&#xff0c;与 Python 自带的 unittest 测试框架类似&#xff0c;但是比 unittest 框架使用起来更简洁&#xff0c;效率更高…

UDP详细总结

UDP协议特点 UDP是无连接的传输层协议&#xff1b; UDP使用尽最大努力交付&#xff0c;不保证可靠交付&#xff1b; UDP是面向报文的&#xff0c;对应用层交下来的报文&#xff0c;不合并&#xff0c;不拆分&#xff0c;保留原报文的边界&#xff1b; UDP没有拥塞控制&#…

[集成学习]基于python的Stacking分类模型的客户购买意愿分类预测

1 导入必要的库 import pandas as pd import numpy as np import missingno as msno import matplotlib.pyplot as plt from matplotlib import rcParams import seaborn as sns from sklearn.metrics import roc_curve, auc from sklearn.linear_model import LogisticRegres…

【C#】计算两条直线的交点坐标

问题描述 计算两条直线的交点坐标&#xff0c;可以理解为给定坐标P1、P2、P3、P4&#xff0c;形成两条线&#xff0c;返回这两条直线的交点坐标&#xff1f; 注意区分&#xff1a;这两条线是否垂直、是否平行。 代码实现 斜率解释 斜率是数学中的一个概念&#xff0c;特别是…

改变你对文本生成程序的误解!用C++标准库,MinGW情况下,写一个文本生成器(一种AI)

声明:我这个不是那种“文本生成器” 我之前见过那种“自动写作文”的程序,无非就是这样的文章: 文章写的只有主题,没有内容 我曾多次向我的朋友提问他们看没看过那种AI写作的代码,而给我的回复很简单:你弄那玩楞干哈?装*?那玩楞我见过,写的文章空有其表,没有其实;…

Java并发04之线程同步机制

文章目录 1 线程安全1.1 线程安全的变量1.2 Spring Bean1.3 如果保证线程安全 2 synchronized关键字2.1 Java对象头2.1.1 对象组成部分2.1.2 锁类型2.1.3 锁对象 2.2 synchronized底层实现2.2.1 无锁状态2.2.2 偏向锁状态2.2.3 轻量级锁状态2.2.4 重量级锁2.2.5 锁类型总结2.2.…

windows USB 设备驱动开发-编写 UCSI 客户端驱动程序

编写 UCSI 客户端驱动程序 USB Type-C 连接or 系统软件接口&#xff08;UCSI&#xff09;驱动程序充当带有嵌入式控制器&#xff08;EC&#xff09;的 USB Type-C 系统的控制器驱动程序。 如果实现平台策略管理器&#xff08;PPM&#xff09;的系统&#xff0c;如 UCSI 规范中…

国产化低功耗HDMI转VGA方案,大量出货产品,广泛应用在显示器以及广告机产品

芯片描述&#xff1a; 兼具高性能和低成本效益的优点&#xff0c;是一款可以将高清视频 HDMI1.4 数字信号转换成 VGA 模拟信号输出的芯片。不需要提供外部电源&#xff0c;ICNM7301 就可以在正常模式下使用&#xff1b;ICNM7301 广 泛适用于各种市场系统和显示应用体系&#x…

LabVIEW异步和同步通信详细分析及比较

1. 基本原理 异步通信&#xff1a; 原理&#xff1a;异步通信&#xff08;Asynchronous Communication&#xff09;是一种数据传输方式&#xff0c;其中数据发送和接收操作在独立的时间进行&#xff0c;不需要在特定时刻对齐。发送方在任何时刻可以发送数据&#xff0c;而接收…

2024年广东省安全员B证第四批(项目负责人)证模拟考试题库及广东省安全员B证第四批(项目负责人)理论考试试题

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 2024年广东省安全员B证第四批&#xff08;项目负责人&#xff09;证模拟考试题库及广东省安全员B证第四批&#xff08;项目负责人&#xff09;理论考试试题是由安全生产模拟考试一点通提供&#xff0c;广东省安全员B证…

手持式气象站:便携科技,掌握微观气象的利器

手持式气象站&#xff0c;顾名思义&#xff0c;是一种可以随身携带的气象监测设备。它小巧轻便&#xff0c;通常配备有温度、湿度、风速、风向、气压等多种传感器&#xff0c;能够实时测量并显示各种气象参数。不仅如此&#xff0c;它还具有数据存储、数据传输、远程控制等多种…

kafka开启kerberos和ACL

作者&#xff1a;恩慈 一、部署kafka-KB包 1&#xff0e;上传软件包 依次点击 部署中心----部署组件----上传软件包 选择需要升级的kafka版本并点击确定 2&#xff0e;部署kafka 依次点击部署中心----部署组件----物理/虚拟机部署----选择集群----下一步 选择手动部署-…

MongoDB自学笔记(四)

一、前文回顾 上一篇文章中我们学习了MongoDB中的更新方法&#xff0c;也学了一部分操作符。今天我们将学习最后一个操作“删除”。 二、删除 原始数据如下&#xff1a; 1、deleteOne 语法&#xff1a;db.collection.deleteOne(< query >,< options >) 具体参…

学生信息管理系统-可视化-科目管理CRUD代码生成器

学生管理系统中的科目管理是一个重要的组成部分&#xff0c;它负责维护和管理学校中所有的教学科目信息。 可视化快速界面生成CRUD界面&#xff0c;API通过代码生成器生成器生成。 新增数据库表 拷贝demo_table修改为clazz_kemu表 修改表结构 其中包括一个自增ID字段&#x…

在虚拟机 CentOS7 环境下安装 MySQL5.7 数据库

配置目标 在虚拟机的 Linux CentOS7 环境下安装 MySQL5.7 版数据库&#xff0c;并能从宿主机 Windows 系统连接该数据库&#xff08;默认端口&#xff1a;3306&#xff09;。 1. 准备工作 WMware 虚拟机&#xff1a;VMware Workstation 16 ProCentOS7 镜像&#xff1a;CentO…

Java面试题--JVM大厂篇之深入解析JVM中的Serial GC:工作原理与代际区别

目录 引言&#xff1a; 正文&#xff1a; 一、Serial GC工作原理 年轻代垃圾回收&#xff08;Minor GC&#xff09;&#xff1a; 老年代垃圾回收&#xff08;Major GC或Full GC&#xff09;&#xff1a; 二、年轻代和老年代的区别 年轻代&#xff08;Young Generation&a…

redis其他类型和配置文件

很多博客只讲了五大基本类型&#xff0c;确实&#xff0c;是最常用的&#xff0c;而且百分之九十的程序员对于Redis只限于了解String这种最常用的。但是我个人认为&#xff0c;既然Redis官方提供了其他的数据类型&#xff0c;肯定是有相应的考量的&#xff0c;在某些特殊的业务…

【C++】——new和delete

文章目录 热身试题C中的内存管理new与delete对于内置类型的操作new与delete对于自定义类型的操作 malloc/free和new/delete的区别 热身试题 int globalVar 1; static int staticGlobalVar 1; void Test() {static int staticVar 1;int localVar 1;int num1[10] { 1, 2, 3…

嵌入式物联网在教育行业的应用——案例分析

作者主页: 知孤云出岫 嵌入式物联网在教育行业的应用——案例分析 目录 作者主页:嵌入式物联网在教育行业的应用——案例分析一、引言二、智能教室&#xff1a;环境监测系统1. 硬件需求2. 电路连接3. 代码实现 三、个性化学习&#xff1a;智能学习平台1. 数据处理与分析2. 代…

Flutter中GetX的用法(超详细使用指南之路由依赖管理篇)

目录 1.前言 2.GetX 依赖管理概述 1.GetX 依赖管理的基本概念 2.与其他依赖管理工具的比较 3. 基础依赖注入 1.Get.put 2.Get.lazyPut 3.Get.putAsync 4.高级依赖注入 1.使用Get.create 2.依赖生命周期管理 5. 参考资料 1.前言 今天这篇博客主要介绍Getx的三大功能…