kafka学习-生产者

目录

1、消息生产流程

2、生产者常见参数配置

3、序列化器

基本概念

自定义序列化器

4、分区器

默认分区规则

自定义分区器

5、生产者拦截器

作用

自定义拦截器

6、生产者原理解析


1、消息生产流程

2、生产者常见参数配置

3、序列化器

基本概念

  • 在Kafka中保存的数据都是字节数组。
  • 消息发送前,需要将消息序列化为字节数组进行发送。
  • 生产者通过key.serializer和value.serializer指定key和value的序列化器。
  • Kafka使用org.apache.kafka.common.serialization.Serializer接口定义序列化器。
  • Kafka已实现的序列化器有:ByteArraySerializer、ByteBufferSerializer、BytesSerializer、DoubleSerializer、FloatSerializer、IntegerSerializer、StringSerializer、LongSerializer、ShortSerializer。

自定义序列化器

实现org.apache.kafka.common.serialization.Serializer<T>接口,并实现其中的serializer方法。

@Data
public class User {private Integer userId;private String username;
}public class UserSerializer implements Serializer<User> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {// do nothing}@Overridepublic byte[] serialize(String topic, User data) {try {// 如果数据是null,则返回nullif (data == null) return null;Integer userId = data.getUserId();String username = data.getUsername();int length = 0;byte[] bytes = null;if (null != username) {bytes = username.getBytes("utf-8");length = bytes.length;}// 第一个4字节存储userId的值// 第二个4字节存储username字节数组的长度int值// 第三个length长度,存储username序列化之后的字节数组ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);buffer.putInt(userId);buffer.putInt(length);buffer.put(bytes);return buffer.array();} catch (UnsupportedEncodingException e) {throw new SerializationException("序列化数据异常");}}@Overridepublic void close() {// do nothing}
}

4、分区器

默认分区规则

KafkaProducer.partition();DefaultPartitioner.partition();

  1. 如果record提供了分区号,则使⽤record提供的分区号
  2. 如果record没有提供分区号,则使⽤key的序列化后的值的hash值对分区数量取模
  3. 如果record没有提供分区号,也没有提供key,则使⽤轮询的⽅式分配分区号。

自定义分区器

实现org.apache.kafka.clients.producer.Partitioner接口,并实现其中的partition方法。

在生产者参数中通过配置partitioner.class指定自定义分区器。

/*** 自定义分区器*/
public class MyPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 此处可以计算分区的数字。// 我们直接指定为2return 2;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

5、生产者拦截器

作用

        在发送消息前,或者在执行回调逻辑前,对消息做一些定制化的处理,比如修改消息,打印消息日志等。此外,Producer允许设置多个拦截器从而形成一条拦截器链,Producer将按照指定顺序调用它们。

自定义拦截器

        自定义拦截器实现org.apache.kafka.clients.producer.ProducerInterceptor接口,并实现其中的onSend()、onAcknowledgement()、close()接口。其中:

  • onSend(ProducerRecord):Producer 确保在消息被序列化前调⽤该⽅法。⽤户可以在该⽅法中对消息做任何操作,但最好保证不要修 改消息所属的topic和分区,否则会影响⽬标分区的计算。
  • onAcknowledgement(RecordMetadata, Exception):该⽅法会在消息被应答之前或消息发送失败时调⽤, 并且通常都是在Producer回调逻辑触发之前。
  • close:关闭Interceptor,主要⽤于执⾏⼀些资源清理⼯作。

        在生产者参数中通过配置ProducerConfig.INTERCEPTOR_CLASSES_CONFIG指定自定义拦截器。

public class Interceptor<KEY, VALUE> implements ProducerInterceptor<KEY, VALUE> {private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorTwo.class);@Overridepublic ProducerRecord<KEY, VALUE> onSend(ProducerRecord<KEY, VALUE> record) {System.out.println("拦截器---go");// 此处根据业务需要对相关的数据作修改String topic = record.topic();Integer partition = record.partition();Long timestamp = record.timestamp();KEY key = record.key();VALUE value = record.value();Headers headers = record.headers();// 添加消息头headers.add("interceptor", "interceptor".getBytes());ProducerRecord<KEY, VALUE> newRecord = new ProducerRecord<KEY, VALUE>(topic, partition, timestamp, key, value, headers);return newRecord;}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {System.out.println("拦截器---back");if (exception != null) {// 如果发生异常,记录在日志中LOGGER.error(exception.getMessage());}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

6、生产者原理解析

以上内容为个人学习理解,如有问题,欢迎在评论区指出。

部分内容截取自网络,如有侵权,联系作者删除。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/127097.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

一文速学-让神经网络不再神秘,一天速学神经网络基础(七)-基于误差的反向传播

前言 思索了很久到底要不要出深度学习内容&#xff0c;毕竟在数学建模专栏里边的机器学习内容还有一大半算法没有更新&#xff0c;很多坑都没有填满&#xff0c;而且现在深度学习的文章和学习课程都十分的多&#xff0c;我考虑了很久决定还是得出神经网络系列文章&#xff0c;…

Linux安装kibana

相关链接 https://www.elastic.co/cn/downloads/kibana https://artifacts.elastic.co/downloads/kibana/kibana-7.5.1-linux-x86_64.tar.gz 官网下载可能比较慢&#xff0c;下面提供下载地址 百度云链接&#xff1a;https://pan.baidu.com/s/1d9Cqr9EwHF94op90F57bww 提取码…

Elasticsearch实战(五):Springboot实现Elasticsearch电商平台日志埋点与搜索热词

文章目录 系列文章索引一、提取热度搜索1、热搜词分析流程图2、日志埋点&#xff08;1&#xff09;排除logback的默认集成。&#xff08;2&#xff09;引入log4j2起步依赖&#xff08;3&#xff09;设置配置文件&#xff08;4&#xff09;配置文件模板&#xff08;5&#xff09…

计算机竞赛 基于深度学习的人脸性别年龄识别 - 图像识别 opencv

文章目录 0 前言1 课题描述2 实现效果3 算法实现原理3.1 数据集3.2 深度学习识别算法3.3 特征提取主干网络3.4 总体实现流程 4 具体实现4.1 预训练数据格式4.2 部分实现代码 5 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 毕业设计…

使用 【jacoco】对基于 SpringBoot 和 Dubbo RPC 的项目生成测试覆盖率报告:实践+原理

基于 Dubbo RPC 的项目中有一个提供者项目backend、一个消费者项目gateway、以及注册中心nacos。本篇文章记录在windows本地对该框架的测试过程&#xff0c;以及介绍jacoco的基本原理 测试过程 官网下载安装包解压到本地&#xff0c;https://www.jacoco.org/jacoco/ 只需要用…

vue.js+nodejs家庭个人理财收支管理系统5x6nf

本收支管理系统以vue.js作为框架&#xff0c;nodejs语言&#xff0c;B/S模式以及MySql作为后台运行的数据库。本系统主要包括以下功能模块&#xff1a;用户管理、收入分类、支出分类、每日收入、每日支出等模块。 本文的组织结构如下&#xff1a; 1、绪论。综述了本文的研究背景…

Redis快速入门

文章目录 0. Redis介绍1. Centos下Redis安装2. redis.conf配置文件介绍3. redis相关命令4. 启动3.2 **命令行操作**3.3 Redis压测命令 4.redis中发布订阅和事务4.1 发布订阅&#xff08;Pub/Sub&#xff09;4.2 事务 5. redis封装系统服务6. 问题与解决6.1 启动Redis报错&#…

荣耀崛起阵容推荐,荣耀崛起最强阵容

今天给大家带来的荣耀崛起阵容推荐是新手阵容推荐&#xff0c;以核心输出为点&#xff0c;由点及面&#xff0c;来展开叙述阵容&#xff01; 关注【娱乐天梯】&#xff0c;获取荣耀崛起0.1折内部福利号 荣耀崛起最强阵容兽族战神流&#xff1a; 此阵容是以战士为核心&#xff0…

机器学习_特征工程_特征数据的评价标准

本文主要从 单特征分析&#xff0c;多特征筛选&#xff0c;特征监控&#xff0c;外部特征评估的几个方面对特征数据进行阐述。 来源 &#xff1a; 特征筛选_特征覆盖度怎么算_adamyoungjack的博客-CSDN博客 1. 单特征分析 1.1 简介 好特征可以从几个角度衡量&#xff1a;覆…

geopandas 笔记: datasets 数据集

geopandas 自带的几个数据集 1 世界各个国家 import geopandas as gpd import pandas as pdpd.set_option(display.max_rows,None) gpd.read_file(gpd.datasets.get_path(naturalearth_lowres)) pop_est人口数量continent国家所在的大陆name国家的名称iso_a3国家的三个字母的…

初识集合框架 -Java

目录 一、集合框架的概念 二、集合框架的重要性 三、涉及的数据结构和算法 3.1 什么是数据结构 3.2 集合框架&#xff08;容器&#xff09;背后对应的数据结构 3.3 相关的Java知识 3.4 什么是算法 3.5 如何学好数据结构和算法 一、集合框架的概念 Java 集合框架&#xff0c;…

山寨能走多远?盗版还是盗火?KCC@深圳活动圆满举办

2023.8.19 &#xff0c;我们组织了一场大家期待已久的线下活动&#xff1a; KCC深圳&#xff1a;Kickoff & 读书会Meetup 来了深圳都是深圳人&#xff0c;没有哪座城市能够像深圳代表中国改革开放的发展历程&#xff0c;年轻、上进、有活力、拥抱新事物&#xff0c;同样&am…

脚本:python实现动态爱心

文章目录 效果代码Reference python实现dynamic heart 效果 代码 import turtle as tu import random as ratu.setup(0.5, 0.5) # 设置画板大小&#xff08;小数表示比例&#xff0c;整数表示大小&#xff09; tu.screensize(1.0, 1.0) # 设置屏幕大小 tu.bgcolor(black) #…

系统架构设计师(第二版)学习笔记----多媒体技术

【原文链接】系统架构设计师&#xff08;第二版&#xff09;学习笔记----多媒体技术 文章目录 一、多媒体概述1.1 媒体的分类1.2 多媒体的特征1.3 多媒体系统的基本组成 二、多媒体系统的关键技术2.1 多媒体系统的关键技术2.2 视频技术的内容2.3 音频技术的内容2.4 数据压缩算法…

2023挖漏洞给报酬的网站汇总,兼职副业3天收益2000

一、众测平台(国内)二、前沿漏洞研究奖励计划三、行业SRC四、企业应急响应中心-SRC-汇总 1、互联网企业2、生活服务、住宿、购物相关企业3、物流、出行、旅游4、金融相关企业5、视频游戏直播社交娱乐6、教育、问答、知识付费7、泛科技通讯物联网云服务8、安全企业9、其他 一、…

java中log使用总结

目录 一、概述1.1. 核心日志框架1.2 门面日志框架 二、最佳实践2.1 核心日志框架API包2.2 门面日志框架依赖2.3 集成使用2.3.1 集成jcl2.3.2 集成slf4j2.3.2.1 slf4j集成单一框架2.3.2.2 slf4j整合混合框架 三、总结3.1 所有相关包3.1.1 核心日志框架包3.1.2 门面日志框架3.1.3…

Linux中安装MySQL5.7.42

1. 首先&#xff0c;下载mysql5.7.42的安装包&#xff08;下方是下载地址&#xff09;&#xff0c;选择红色框框的下载&#xff08;注意的是&#xff0c;这个链接只提供5.7的版本下载&#xff0c;可能还会更新&#xff0c;不一定打开就是5.7.42的版本&#xff0c;后续可能会有4…

html常用标签

文章目录 注释标题段落换行格式化标签图片超链接表格合并单元格 列表无序列表--- ul li有序列表--- ol li自定义列表--- dl (总标签) dt (小标题) dd (围绕标题来说明) 表单forminput文本框密码框单选框复选框普通按钮提交按钮清空按钮选择文件 labelselecttextarea 无语义标签…

UMA 2 - Unity Multipurpose Avatar☀️六.Advanced Occlusion高级遮挡功能解决皮肤服饰穿模

文章目录 🟥 本节功能效果展示🟧 基础项目配置🟨 本节项目配置🟩 配置MeshHideAsset1️⃣ 创建MeshHideAsset2️⃣ 配置SlotDataAsset3️⃣ 配置遮挡信息🟦 将 MeshHideAsset 配置到 Recipe🟥 本节功能效果展示 未遮挡前的穿模问题: 遮挡后效果:

2023工博会,正运动超高速PCIe实时运动控制卡应用预览(一)

展会倒计时&#xff1a;11天 本次的中国国际工业博览会正运动技术将携超高速PCIe实时运动控制卡亮相。 •为智能装备提供高速高精运动控制解决方案&#xff1b; •内部搭载运动控制实时内核MotionRT7; •提供多路高速IO输入输出&#xff0c;具备多种实时运动控制功能&#x…