【Kafka】Kafka Producer 分区-05

【Kafka】Kafka Producer 分区-05

  • 1. 分区的好处
  • 2. 分区策略
    • 2.1 默认的分区器 DefaultPartitioner
  • 3. 自定义分区器

1. 分区的好处

(1)便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。
(2)提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。

在这里插入图片描述

see 2.4.1http://t.csdnimg.cn/m1O9u

2. 分区策略

2.1 默认的分区器 DefaultPartitioner

/**
* The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it
* <li>If no partition is specified but a key is present choose a 
partition based on a hash of the key
* <li>If no partition or key is present choose the sticky 
partition that changes when the batch is full.
* 
* See KIP-480 for details about sticky partitioning.
*/
public class DefaultPartitioner implements Partitioner {… …
}

策略如下:

  1. 如果记录中指定了分区,使用指定的分区。
    这意味着生产者在发送消息时明确指定了一个分区号,Kafka将直接使用这个分区。
  2. 如果没有指定分区但存在键,根据键的哈希值选择分区。
    如果消息中没有明确指定分区,但是提供了一个键,Kafka会使用键的哈希值来计算应该使用哪个分区。这可以保证具有相同键的消息被发送到相同的分区,从而保证消息的有序性。
  3. 如果既没有指定分区也没有键,选择一个“sticky partition”(粘性分区),在批次满时更换分区。
    如果消息既没有指定分区,也没有键,Kafka将使用“sticky partition”策略。这种策略会在消息批次满时更换分区,以便于提高效率和性能。

这个注释是对DefaultPartitioner类的说明,该类实现了Partitioner接口,用于定义消息分区的策略。DefaultPartitioner类的主要功能就是根据上述规则决定消息发送到哪个分区。以下是DefaultPartitioner类的示例实现结构:

public class DefaultPartitioner implements Partitioner {// 初始化方法@Overridepublic void configure(Map<String, ?> configs) {// 配置代码}// 计算分区的方法@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 分区计算逻辑}// 清理资源的方法@Overridepublic void close() {// 资源清理代码}
}

该实现中包括了三个主要的方法:

  • configure(Map<String, ?> configs):用于初始化和配置分区器。
  • partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster):用于根据给定的主题、键和值来计算应该使用哪个分区。
  • close():用于在分区器关闭时清理资源。

在这里插入图片描述

案例一: 将数据发往指定 partition 的情况下,例如,将所有数据发往分区 1 中。

import org.apache.kafka.clients.producer.*;import java.util.Properties;
public class CustomProducerCallbackPartitions {public static void main(String[] args) {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102
:9092 ");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());KafkaProducer<String, String> kafkaProducer = newKafkaProducer<>(properties);for (int i = 0; i < 5; i++) {// 指定数据发送到 1 号分区,key 为空(IDEA 中 ctrl + p 查看参数)kafkaProducer.send(new ProducerRecord<>("first",1, "", "atguigu " + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata,Exception e) {if (e == null) {System.out.println(" 主题: " +metadata.topic() + "->" + "分区:" + metadata.partition());} else {e.printStackTrace();}}});}kafkaProducer.close();}
}

案例二: 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值。

public class CustomProducerCallback {public static void main(String[] args) {Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());KafkaProducer<String, String> kafkaProducer = newKafkaProducer<>(properties);for (int i = 0; i < 5; i++) {// 依次指定 key 值为 a,b,f ,数据 key 的 hash 值与 3 个分区求余,分别发往 1、2、0kafkaProducer.send(new ProducerRecord<>("first","a", "atguigu " + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata,Exception e) {if (e == null) {System.out.println(" 主题: " +metadata.topic() + "->" + "分区:" + metadata.partition());} else {e.printStackTrace();}}});}kafkaProducer.close();}
}

3. 自定义分区器

如果研发人员可以根据企业需求,自己重新实现分区器

  1. 需求
    例如我们实现一个分区器实现,发送过来的数据中如果包含 atguigu,就发往 0 号分区,不包含 atguigu,就发往 1 号分区。

  2. 实现步骤
    定义类实现 Partitioner 接口
    重写 partition()方法

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;import java.util.Map;/*** 1. 实现接口 Partitioner* 2. 实现 3 个方法:partition,close,configure* 3. 编写 partition 方法,返回分区号*/
public class MyPartitioner implements Partitioner {/*** 返回信息对应的分区** @param topic 主题* @param key 消息的 key* @param keyBytes 消息的 key 序列化后的字节数组* @param value 消息的 value* @param valueBytes 消息的 value 序列化后的字节数组* @param cluster 集群元数据可以查看分区信息* @return*/@Overridepublic int partition(String topic, Object key, byte[]keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 获取消息String msgValue = value.toString();// 创建 partitionint partition;// 判断消息是否包含 atguiguif (msgValue.contains("atguigu")) {partition = 0;} else {partition = 1;}// 返回分区号return partition;}// 关闭资源@Overridepublic void close() {}// 配置方法@Overridepublic void configure(Map<String, ?> configs) {}
}

使用分区器的方法,在生产者的配置中添加分区器参数。

import org.apache.kafka.clients.producer.*;import java.util.Properties;public class CustomProducerCallbackPartitions {public static void main(String[] args) throws InterruptedException {Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 添加自定义分区器properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.atguigu.kafka.producer.MyPartitioner");KafkaProducer < String, String > kafkaProducer = newKafkaProducer<>(properties);for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata,Exception e) {if (e == null) {System.out.println(" 主题: " +metadata.topic() + "->" + "分区:" + metadata.partition());} else {e.printStackTrace();}}});}kafkaProducer.close();}
}

测试
①在 hadoop102 上开启 Kafka 消费者

[atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

②在 IDEA 控制台观察回调信息

主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/350607.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

什么是DMZ?路由器上如何使用DMZ?

文章目录 📖 介绍 📖🏡 演示环境 🏡📒 DMZ 📒🚀 DMZ的应用场景💡 路由器设置DMZ🎈 注意事项 🎈⚓️ 相关链接 ⚓️📖 介绍 📖 在网络管理中,DMZ(Demilitarized Zone,隔离区)是一个特殊的网络区域,常用于将公共访问和内部网络隔离开来。DMZ功能允许…

完美的移动端 UI 风格让客户无可挑剔

完美的移动端 UI 风格让客户无可挑剔

GDB:从零开始入门GDB

目录 1.前言 2.开启项目报错 3.GDB的进入和退出 4.GDB调试中查看代码和切换文件 5.GDB调试中程序的启动和main函数传参 6.GDB中断点相关的操作 7.GDB中的调试输出指令 8.GDB中自动输出值指令 9.GDB中的调试指令 前言 在日常开发中&#xff0c;调试是我们必不可少的技能。在专业…

算法day26

第一题 429. N 叉树的层序遍历 本题的要求我们可以通过队列来辅助完成层序遍历&#xff1b; 如下图的n叉树&#xff1a; 步骤一&#xff1a; 我们定义一个队列&#xff0c;先进行根节点入队列操作&#xff1b; 步骤二&#xff1a; 我们进行当前队列每一个元素的出队列操作&…

功能强大的API函数FindFirstFile使用介绍(附源码)

在处理文件的相关代码中,会频繁使用到Windows系统API函数FindFirstFile,这个函数功能很强大,很多功能都不开它。本文就根据我们在项目中使用该函数的情况,来大概地梳理一下使用FindFirstFile都可以实现哪些常用的功能。 1、FindFirstFile函数声明与WIN32_FIND_DATA结构体 我…

接口测试详解

接口测试详解 本文主要讲软件接口 一、什么是接口&#xff1f;硬件接口&#xff1a;硬件接口指的是硬件提供给外界的一种实体。主要作用是内部数据分离出外 部的沟通方法 目的是&#xff1a;沟通外部来改变内部的数据。如&#xff1a;USB接口&#xff0c;投影仪接口 软件接口…

Hadoop 2.0:主流开源云架构(三)

目录 四、Hadoop 2.0体系架构&#xff08;一&#xff09;Hadoop 2.0公共组件Common&#xff08;二&#xff09;分布式文件系统HDFS&#xff08;三&#xff09;分布式操作系统Yarn&#xff08;四&#xff09;Hadoop 2.0安全机制简介 四、Hadoop 2.0体系架构 &#xff08;一&…

c++使用nlohmann读取json文件

下载&#xff1a; GitHub - nlohmann/json: JSON for Modern C 解压&#xff1a; 包含头文件&#xff1a; 要包含的头文件和要使用的命名空间&#xff1a; #include <nlohmann/json.hpp>using json nlohmann::json; 测试文件&#xff1a; 代码&#xff1a; #include…

Vscode中使用make命令

前言 需要注意&#xff0c;如下操作需要进行网络代理&#xff0c;否则会出现安装失败的情况 安装 第一步 — 安装MingGW &#xff08;1&#xff09;进入官网下载 &#xff08;2&#xff09;下载完成之后&#xff0c;双击exe文件 &#xff08;3&#xff09;点击Install &#x…

远程桌面端口,远程桌面改端口有哪些方法

方法一&#xff1a;通过修改注册表 步骤一&#xff1a;打开注册表编辑器 按下 Windows键R 打开“运行”对话框。输入 regedit 并按 Enter 打开注册表编辑器。 步骤二&#xff1a;定位到远程桌面服务的端口设置 导航至第一个注册表路径&#xff1a;HKEY_LOCAL_MACHINE\SYSTE…

抢占人工智能行业红利,前阿里巴巴产品专家带你15天入门AI产品经理

前言 当互联网行业巨头纷纷布局人工智能&#xff0c;国家将人工智能上升为国家战略&#xff0c;藤校核心课程涉足人工智能…人工智能领域蕴含着巨大潜力&#xff0c;早已成为业内共识。 面对极大的行业空缺&#xff0c;不少人都希望能抢占行业红利期&#xff0c;进入AI领域。…

多线程中run()和start()的区别

我们知道&#xff0c;在多线程中 Thread thread new Thread(runnable); thread.start();以及 thread.run();都可以执行runnable中run方法下的代码&#xff0c;但是二者又有所不同 下面给出一段代码用以体现二者的区别&#xff1a; 以下代码中&#xff0c;通过thread.start()启…

探索互联网寻址机制 | 揭秘互联网技术的核心,解析网络寻址

揭秘互联网技术的核心&#xff0c;解析网络寻址题 前提介绍局域网地址IP地址的分配方式动态IP分配机制内部网&#xff08;intranet&#xff09;ICANN负责IP分配DHCP协议获取IP地址 域名系统域名是什么域名工作方式hosts文件存储域名映射关系DNS分布式数据库DNS域名解析 Java进行…

搭建知识付费APP平台教学:在线教育系统源码详解

如何搭建一个高效的知识付费APP平台呢&#xff1f;今天&#xff0c;笔者将详细解析在线教育系统的源码&#xff0c;帮助您快速搭建自己的知识付费APP平台。 一、平台的核心功能 一个完整的知识付费APP平台通常需要具备以下核心功能&#xff1a; 用户管理 内容管理 支付 课…

【秋招突围】2024届秋招笔试-小红书笔试题-第一套-三语言题解(Java/Cpp/Python)

&#x1f36d; 大家好这里是清隆学长 &#xff0c;一枚热爱算法的程序员 ✨ 本系计划跟新各公司春秋招的笔试题 &#x1f4bb; ACM银牌&#x1f948;| 多次AK大厂笔试 &#xff5c; 编程一对一辅导 &#x1f44f; 感谢大家的订阅➕ 和 喜欢&#x1f497; &#x1f4e7; 清隆这边…

CrossOver 2024软件安装包下载

CrossOver不像Parallels或VMware的模拟器&#xff0c;而是实实在在Mac OS X系统上运行的一个软件。CrossOvers能够直接在Mac上运行Windows软件与游戏&#xff0c;而不需虚拟机。它为Windows软件提供所需的资源&#xff0c;以达到在Mac OS X系统上运行Windows程序的目的。 安 装…

模型 WOOP

说明&#xff1a;系列文章 分享 模型&#xff0c;了解更多&#x1f449; 模型_思维模型目录。不再拖延和懒惰&#xff0c;让梦想照进现实。 1 WOOP模型的应用 1.1 WOOP模型提高自己健身习惯 如果你想要养成健身的习惯&#xff0c;那么使用WOOP模型来提高自己健身习惯&#xf…

【第9章】Vue之Element Plus快速入门

文章目录 前言一、安装1. 兼容性2. 安装 二、按需导入1.自动导入2.Vite 三、全局配置四、官方案例五、效果总结 前言 基于 Vue 3&#xff0c;面向设计师和开发者的组件库。 一、安装 1. 兼容性 Element Plus 目前还处于快速开发迭代中。 由于 Vue 3 不再支持 IE11&#xff0c…

vite-plugin-mock前端自行模拟接口返回数据的插件

vite-plugin-mock前端自行模拟接口返回数据的插件 安装导入、配置&#xff08;vite.config.js&#xff09;使用目录结构/mock/user.js具体在页面请求中的使用 注意事项 中文文档&#xff1a;[https://gitcode.com/vbenjs/vite-plugin-mock/blob/main/README.zh_CN.md) 参考其他…

紫光展锐5G处理器T750__国产手机芯片5G方案

展锐T750核心板采用6nm EUV制程工艺&#xff0c;CPU架构采用了八核设计&#xff0c;其中包括两个主频为2.0GHz的Arm Cortex-A76性能核心和六个主频为1.8GHz的A55小核。这种组合使得T750具备卓越的处理能力&#xff0c;并能在节能的同时提供出色的性能表现。该核心模块还搭载了M…