Kafka自定义分区机制

文章目录

  • 1.如何自定义分区机制
  • 2.示例


1.如何自定义分区机制

若需要使用自定义分区机制,需要完成两件事:
1)在 producer 程序中创建一个类,实现 org.apache.kafka.clients.producer.Partitioner 接口主要分区逻辑在 Partitioner.partition中实现。
2)在用于构造KafkaProducer的Properties对象中设置 partitioner.class 参数。

2.示例

假设我们的消息中有一些消息是用于审计功能的,这类消息的 key 会被固定地分配一个字符串“audit”。我们想要让这类消息发送到 topic 的最后一个分区上,便于后续统一处理,而对于相同 topic 下的其他消息则采用随机发送的策略发送到其他分区上。那么现在就可以这样来实现自定义的分区策略,如下列代码所示:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Map;
import java.util.Random;
public class AuditPartitioner implements Partitioner {private Random random;@Overridepublic void configure(Map<String, ?> map) {//该方法实现必要资源的初始化工作random= new Random();}@Overridepublic int partition(String topic, Object keyObj, byte[] keyBytes, Object valueObj, byte[] valueBytes, Cluster cluster) {String key=(String)keyObj;//从集群元数据中把属于该topic的所有分区信息都读取出供分区策略使用List<PartitionInfo> partitionInfoList = cluster.availablePartitionsForTopic(topic);int partitionCount =partitionInfoList.size();int auditPartition=partitionCount-1;return key == null|| key.isEmpty()|| !key.contains ("audit")?random.nextInt(partitionCount-1):auditPartition;}@Overridepublic void close() {//该方法实现必要资源的清理工作}
}

创建好自定义分区策略类后,在构建KafkaProducer 之前为Properties增加该属性;代码如下:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class ProducerTest {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");//必须指定props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");//必须指定props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//必须指定props.put("acks", "-1");props.put("retries", 3);props.put("batch.size", 323840);props.put("linger.ms", 10);props.put("buffer.memory", 33554432);props.put("max.block.ms", 3000);props.put("partitioner.class","com.exm.collectcodenew.kafka.producer.custompartitioner.AuditPartitioner");Producer<String, String> producer = new KafkaProducer<>(props);ProducerRecord nonKeyRecord = new ProducerRecord("topic-test","non-key record");ProducerRecord auditRecord = new ProducerRecord("topic-test", "audit","audit record");ProducerRecord nonAuditRecord =new ProducerRecord("topic-test","other","non-sudit record");producer.send(nonKeyRecord).get();producer.send(nonAuditRecord).get();producer.send(auditRecord).get();producer.send(nonKeyRecord).get();producer.send(nonAuditRecord).get();producer.close();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/37313.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux安装Elasticsearch集群-----docker安装es集群

目录 技术背景 1.2 实验目标 二、实验内容 1.1 服务器规划 二、传统方式安装Elasticsearch集群 2.1 安装Java环境&#xff08;10.1.1.6/8&#xff09; 2.3 配置集群节点&#xff08;以10.1.1.6&#xff09; 2.4 启动服务 ES Data节点1&#xff08;10.1.1.8&#xff09;…

【嵌入式】复刻SQFMI开源的Watchy墨水屏电子表——(2)软件部分

书接上文 基于乐鑫 ESP32-PICO-D4 模块的墨水屏智能手表开源项目Watchy 完成了硬件部分&#xff0c;接下来就是软件部分&#xff1a; 一 开发环境配置&#xff08;Arduino ESP32&#xff09; 首先需要进行 Arduino ESP32 开发环境的安装配置&#xff0c;过程参考之前的帖子&a…

关于微信小程序端base64解码问题

由于atob是浏览器端的&#xff0c;对于微信小程序不支持&#xff0c;导致模拟器【开发工具】显示正常&#xff0c;但真机异常解析失败问题&#xff0c;微信小程序原有的api&#xff0c;官方文档中也废弃了 解决方案&#xff1a; 调用&#xff1a; const decodedString ba…

如何通过Odoo 18创建与配置服务器操作

如何通过Odoo 18创建与配置服务器操作 服务器操作是Odoo实现业务流程自动化的核心工具&#xff0c;允许你在服务器端执行自动化任务&#xff0c;通常由按钮点击或自动化工作流等事件触发。这些操作使用 Python 编写&#xff0c;能够执行复杂的业务逻辑&#xff0c;从而增强 Od…

Windows主机、虚拟机Ubuntu、开发板,三者之间文件互传

以下内容源于日常学习的整理&#xff0c;欢迎交流。 下图是Windows主机、虚拟机Ubuntu、开发者三者之间文件互传的方式示意图&#xff1a; 注意&#xff0c;下面谈及的所有方式&#xff0c;都要求两者的IP地址处于同一网段&#xff0c;涉及到的软件资源见felm。 一、Windows主…

[设计模式与源码]1_Spring三级缓存中的单例模式

欢迎来到啾啾的博客&#x1f431;&#xff0c;一个致力于构建完善的Java程序员知识体系的博客&#x1f4da;&#xff0c;记录学习的点滴&#xff0c;分享工作的思考、实用的技巧&#xff0c;偶尔分享一些杂谈&#x1f4ac;。 欢迎评论交流&#xff0c;感谢您的阅读&#x1f604…

微服务架构中的API网关:Spring Cloud与Kong/Traefik等方案对比

微服务架构中的API网关&#xff1a;Spring Cloud与Kong/Traefik等方案对比 一、API 网关的概念二、API 网关的主要功能2.1 统一入口与路由转发2.2 安全与权限控制2.3 流量管理与容错2.4 API 管理与聚合2.5 监控与日志2.5 协议转换与适配2.6 控制平面与配置管理 三、API 网关选型…

中兴B860AV3.2-T/B860AV3.1-T2_S905L3-B_2+8G_安卓9.0_先线刷+后卡刷固件-完美修复反复重启瑕疵

中兴电信B860AV3.2-T&#xff0f;B860AV3.1-T2_晶晨S905L3-B芯片_28G_安卓9.0_先线刷后卡刷-刷机固件包&#xff0c;完美修复刷机后盒子反复重启的瑕疵。 这两款盒子是可以通刷的&#xff0c;最早这个固件之前论坛本人以及其他水友都有分享交流过不少的固件&#xff0c;大概都…

Stable Diffusion lora训练(一)

一、不同维度的LoRA训练步数建议 2D风格训练 数据规模&#xff1a;建议20-50张高质量图片&#xff08;分辨率≥10241024&#xff09;&#xff0c;覆盖多角度、多表情的平面风格。步数范围&#xff1a;总步数控制在1000-2000步&#xff0c;公式为 总步数 Repeat Image Epoch …

Web3 时代数据保护的关键挑战与应对策略

Web3 时代数据保护的关键挑战与应对策略 随着互联网技术的飞速发展&#xff0c;我们正步入 Web3 时代&#xff0c;这是一个以去中心化、用户主权和数据隐私为核心的新时代。在这个时代&#xff0c;数据保护成为了一个至关重要的议题。本文将探讨 Web3 时代数据保护面临的主要挑…

微信小程序计算属性与监听器:miniprogram-computed

小程序框架没有提供计算属性相关的 api &#xff0c;但是官方为开发者提供了拓展工具库 miniprogram-computed。 该工具库提供了两个功能&#xff1a; 计算属性 computed监听器 watch 一、安装 miniprogram-computed 在项目的根目录下&#xff0c;使用如下命令&#xff0c;…

实体机安装linux视频教程。windows和ubuntu共存。启动时选择切换引导系统。

登录ubuntu官网下载iso镜像。 https://ubuntu.com/download 桌面版带G U I 操作界面&#xff0c;服务版靠远程命令行操作&#xff0c;类似wsl&#xff0c;没有图形界面&#xff0c;显卡跑满无需分散算力到显示交互界面上。 点alter natice downloads可以下载旧版本。具体版本选…

Numpy

一、Numpy优势 学习目标 目标 了解Numpy运算速度上的优势 知道Numpy的数组内存块风格 知道Numpy的并行化运算 1 Numpy介绍 Numpy&#xff08;Numerical Python&#xff09;是一个开源的Python科学计算库&#xff0c;用于快速处理任意维度的数组。 Numpy支持常见的数组和矩…

小红书不绑定手机号会显示ip吗

小红书作为一个生活方式分享平台&#xff0c;拥有庞大的用户群体。在小红书上&#xff0c;用户可以分享自己的生活点滴、购物心得、美食体验等&#xff0c;与其他用户进行互动交流。最近&#xff0c;不少用户对于小红书是否会在不绑定手机号的情况下显示IP属地产生了疑问&#…

FPGA multiboot 方案

FPGA multiboot 方案 初版方案 初版方案不需要软件参与&#xff0c;只是为了验证flash启动。当前已完成。 使用jtag 通过vivaod harwaremanager去将fpga bit流文件加载到demo板flash中。 具体操作&#xff1a; 约束添加for golden bitstream # 设置电源参考&#xff0c;1.…

SpringBoot的启动原理?

大家好&#xff0c;我是锋哥。今天分享关于【SpringBoot的启动原理&#xff1f;】面试题。希望对大家有帮助&#xff1b; SpringBoot的启动原理&#xff1f; 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 Spring Boot的启动原理主要是通过 SpringApplication 类来…

aws训练快速入门教程

AWS 相关核心概念 简洁地介绍一下AWS训练云服务的核心关联概念: AWS核心服务层: 基础设施层: EC2(计算), S3(存储), RDS(数据库)等人工智能层: SageMaker(训练平台), AI服务等 机器学习服务分级: 高层: 预构建AI服务(开箱即用)中层: SageMaker(主要训练平台)底层: 框架和基…

(一)飞行器的姿态欧拉角, 欧拉旋转, 完全数学推导(基于坐标基的变换矩阵).(偏航角,俯仰角,横滚角)

(这篇写的全是基矢变换矩阵)不是坐标变换矩阵,坐标变换矩阵的话转置一下,之后会有推导. 是通过M转置变换到P撇点.

工程管理系统简介 工程管理系统源码 java工程管理系统 工程管理系统功能设计

鸿鹄工程项目管理系统 Spring CloudSpring BootMybatisVueElementUI前后端分离构建工程项目管理系统 1. 项目背景 一、随着公司的快速发展&#xff0c;企业人员和经营规模不断壮大。为了提高工程管理效率、减轻劳动强度、提高信息处理速度和准确性&#xff0c;公司对内部工程管…

在 Windows 系统下,将 FFmpeg 编译为 .so 文件

1. 准备环境 确保你的 Windows 系统已安装以下工具&#xff1a; Android Studio NDK&#xff08;Native Development Kit&#xff09; MSYS2&#xff08;用于提供类 Unix 环境&#xff09; FFmpeg 源码 Git Bash&#xff08;可选&#xff0c;推荐使用&#xff09; 安装 …