kafka生产端之拦截器、分区器、序列化器

文章目录

  • 拦截器
  • 序列化器
  • 分区器

拦截器

拦截器(Interceptor)是早在Kafka0.10.0.0中就已经引入的一个功能,Kafka一共有两种拦截器:生产者拦截器和消费者拦截器。本节主要讲述生产者拦截器的相关内容,有关消费者拦截器的具体细节先不讲。

生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。

生产者拦截器的使用也很方便,主要是自定义实现org.apache.kafka.clients.producer.ProducerInterceptor接口。ProducerInterceptor接口中包含3个方法:

``
public ProducerRecord<K,V> onSend(ProducerRecord<K,V> record);

public void onAcknowledgement (RecordMetadata metadata,Exception exception);

public void close();
``

KafkaProducer在将消息序列化和计算分区之前会调用生产者拦截器的onSend0方法来对消息进行相应的定制化操作。一般来说最好不要修改消息ProducerRecord的topic、key和partition等信息,如果要修改,则需确保对其有准确的判断,否则会与预想的效果出现偏差。比如修改key不仅会影响分区的计算,同样会影响broker端日志压缩(LogCompaction)的功能。

KafkaProducer会在消息被应答(Acknowledgement)之前或消息发送失败时调用生产者拦截器的onAcknowledgementO方法,优先于用户设定的Caliback之前执行。这个方法运行在Producer 的IO线程中,所以这个方法中实现的代码逻辑越简单越好,否则会影响消息的发送速度。

close0方法主要用于在关闭拦截器时执行一些资源的清理工作。在这3个方法中抛出的异常都会被捕获并记录到日志中,但并不会再向上传递。

自定义拦截器时需要实现ProducerInterceptor接口,并在配置类或者配置中设置需要添加的拦截器,多个拦截器存在时会根据拦截器的添加顺序(配置类或者配置中)按顺序执行。

序列化器

生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网络发送给Kafka。而在对侧,消费者需要用反序列化器(Deserializer)把从Kafka中收到的字节数组转换成相应的对象。除了用于String类型的序列化器,还有ByteArray、ByteBuffer、Bytes、Double、Integer、Long这几种类型,它们都实现了orgapache.kafkka.common.serialization.Serializer接口。

生产者使用的序列化器和消费者使用的反序列化器是需要一一对应的,如果生产者使用了某种序列化器,比如StringSerializer,而消费者使用了另一种序列化器,比如IntegerSerializer那么是无法解析出想要的数据的。

如果Kafka客户端提供的几种序列化器都无法满足应用需求,则可以选择使用如Avro、JSON、Thrift、ProtoBuf和Protostuff等通用的序列化工具来实现,或者使用自定义类型的序列化器来实现。自定义序列化器需要实现Serializer接口,并在配置类或者配置中设置该自定义序列化器为生产端序列化器。

分区器

消息在通过send()方法发往broker的过程中,有可能需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列作用之后才能被真正地发往broker。拦截器一般不是必需的,而序列化器是必需的。消息经过序列化之后就需要确定它发往的分区,如果消息ProducerRecord中指定了partition字段,那么就不需要分区器的作用,因为partition代表的就是所要发往的分区号。

如果消息ProducerRecord中没有指定partition字段,那么就需要依赖分区器,根据key这个字段来计算partition的值。分区器的作用就是为消息分配分区。

Kafka中提供的默认分区器是org.apache.kafka.clients.producer.intemals.DefaultPartitioner,它实现了org.apache.kafka.clients.producer.Partitioner接口。

接口中partition()方法用来计算分区号,返回值为int类型。partition()方法中的参数分别表示主题、键、序列化后的键、值、序列化后的值,以及集群的元数据信息,通过这些信息可以实现功能丰富的分区器。close()方法在关闭分区器的时候用来回收一些资源。

在默认分区器DefaultPartitioner的实现中,closeO是空方法,而在partition0方法中定义了主要的分区分配逻辑。如果key不为null,那么默认的分区器会对key进行哈希,最终根据得到的哈希值来计算分区号,拥有相同key的消息会被写入同一个分区。如果key为null,那么消息将会以轮询的方式发往主题内的各个可用分区。

在不改变主题分区数量的情况下,key与分区之间的映射可以保持不变。不过,一旦主题中增加了分区,那么就难以保证key与分区之间的映射关系了。

除了使用Kafka提供的默认分区器进行分区分配,还可以使用自定义的分区器,只需同DefaultPartitioner一样实现Partitioner接口即可。默认的分区器在key为null时不会选择非可用的分区,我们可以通过自定义的分区器DemoPartitioner来打破这一限制。如果自定义分区器还是需要在配置类或者配置中设置分区器为自定义的分区器。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/14557.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

解锁AI语音魅力——yoyo鹿鸣在线语音合成器,让创意声音即刻绽放!

yoyo鹿鸣-在线语音合成 人工智能语音克隆生成&#xff0c;二次元&#xff5e; AI工具 | AI探金 可以在AI探金社区来找我&#xff5e; yoyo鹿鸣 - 在线语音生成器 需求人群&#xff1a; 有语音合成需求的用户。 使用场景示例&#xff1a; 合成yoyo鹿鸣语音 等等 产品特色&a…

基于STM32的智能鱼缸水质净化系统设计

&#x1f91e;&#x1f91e;大家好&#xff0c;这里是5132单片机毕设设计项目分享&#xff0c;今天给大家分享的是智能鱼缸水质净化系统。 目录 1、设计要求 2、系统功能 3、演示视频和实物 4、系统设计框图 5、软件设计流程图 6、原理图 7、主程序 8、总结 1、设计要求…

t113-qt

修改QT配置: # # qmake configuration for building with arm-linux-gnueabi-g ## MAKEFILE_GENERATOR UNIX # CONFIG incremental # QMAKE_INCREMENTAL_STYLE sublib# include(../common/linux.conf) # include(../common/gcc-base-unix.conf) # inc…

apisix的real-ip插件使用说明

k8s集群入口一般都需要过负载均衡&#xff0c;然后再到apisix。 这时候如果后台业务需要获取客户端ip&#xff0c;可能拿到的是lb或者网关的内网ip。 这里一般要获取真实ip需要做几个处理。 1. 负载均衡上&#xff0c;一般支持配置获取真实ip参数&#xff0c;需要配置上。然…

[Meet DeepSeek] 如何顺畅使用DeepSeek?告别【服务器繁忙,请稍后再试。】

文章目录 [Meet DeepSeek] 如何顺畅使用DeepSeek&#xff1f;告别【服务器繁忙&#xff0c;请稍后再试。】引言使用渠道一&#xff1a;硅基流动 Chatbox AI【推荐】硅基流动 Chatbox AI的优势 使用渠道二&#xff1a;秘塔AI搜索秘塔AI搜索的优势 其它方案1. DeepSeek官网2. 纳…

四模型消融实验!DCS-CNN-BiLSTM-Attention系列四模型多变量时序预测

四模型消融实验&#xff01;DCS-CNN-BiLSTM-Attention系列四模型多变量时序预测 目录 四模型消融实验&#xff01;DCS-CNN-BiLSTM-Attention系列四模型多变量时序预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 基于DCS-CNN-BiLSTM-Attention、CNN-BiLSTM-Attention…

索引(MySQL)

1. 没有索引&#xff0c;可能会有什么问题 索引&#xff1a;提高数据库的性能&#xff0c;索引是物美价廉的东西了。不用加内存&#xff0c;不用改程序&#xff0c;不用调sql&#xff0c;只要执行 正确的 create index &#xff0c;查询速度就可能提高成百上千倍。但是天下没有…

Windows 实用设置工具 v3.6.5:一键优化系统设置

这款 Windows 实用设置工具 v3.6.5 是一款功能强大的系统优化软件&#xff0c;由 kernel 开发。它提供了丰富的系统设置选项&#xff0c;帮助用户轻松管理和优化 Windows 系统。以下是该工具的主要功能和特点&#xff1a; 主要功能 隐藏电脑文件夹 视频、文档、图片、音乐、下…

快速上手Vim的使用

Vim Linux编辑器-vim使用命令行模式下所有选项都可以带数字底行模式可视块模式&#xff08;ctrlV进入&#xff09; Linux编辑器-vim使用 Vim有多种模式的编辑器。能帮助我们很快的进行代码的编辑&#xff0c;甚至完成很多其他事情。 默认情况下我们打开vim在命令模式下&#x…

334递增的三元子序列贪心算法(思路解析+源码)

文章目录 题目思路解析源码总结题目 思路解析 有两种解法:解法一:动态规划(利用dp找到数组最长递增序列长度,判断是否大于3即可)本题不适用,因为时间复杂度为O(n^2),超时。 解法二:贪心算法:解法如上图,题目要求长度为三,设置第一个元素为长度1的值,是指长度二的…

sqli-labs靶场实录(二): Advanced Injections

sqli-labs靶场实录: Advanced Injections Less21Less22Less23探测注入点 Less24Less25联合注入使用符号替代 Less25aLess26逻辑符号绕过and/or过滤双写and/or绕过 Less26aLess27Less27aLess28Less28aLess29Less30Less31Less32&#xff08;宽字节注入&#xff09;Less33Less34Le…

Websocket从原理到实战

引言 WebSocket 是一种在单个 TCP 连接上进行全双工通信的网络协议&#xff0c;它使得客户端和服务器之间能够进行实时、双向的通信&#xff0c;既然是通信协议一定要从发展历史到协议内容到应用场景最后到实战全方位了解 发展历史 WebSocket 最初是为了解决 HTTP 协议在实时…

Java 大视界 -- Java 大数据在智能供应链中的应用与优化(76)

&#x1f496;亲爱的朋友们&#xff0c;热烈欢迎来到 青云交的博客&#xff01;能与诸位在此相逢&#xff0c;我倍感荣幸。在这飞速更迭的时代&#xff0c;我们都渴望一方心灵净土&#xff0c;而 我的博客 正是这样温暖的所在。这里为你呈上趣味与实用兼具的知识&#xff0c;也…

使用AI工具(Deepseek or 豆包etc)话业务流程图

①打开AI工具&#xff0c;这里以Deepseek为例子&#xff1a; Deepseek官网 ②输入所要画图的业务流程的文字。 &#xff08;这里以一个用户登录的流程的文字作为例子&#xff09; mermaid在线画图网页&#xff08;根据AI工具对应生成的画图代码&#xff09; ③把AI工具生成的…

Qt+海康虚拟相机的调试

做机器视觉项目的时候&#xff0c;在没有相机或需要把现场采集的图片在本地跑一下做测试时&#xff0c;可以使用海康的虚拟相机调试。以下是设置步骤&#xff1a; 1.安装好海康MVS软件&#xff0c;在菜单栏->工具选择虚拟相机工具&#xff0c;如下图&#xff1a; 2.打开虚拟…

Docker安装Mysql

1.拉取Mysql docker pull mysql:8.3.02.检查成功了没有 docker images mysql:8.3.03.创建先关目录 # conf放配置文件&#xff0c;data放数据&#xff0c;log放日志 mkdir -p /home/mysql/{conf,data,log}4.创建配置文件 vim /home/mysql/conf/my.cnf把这些cv进去&#xff…

OpenCV:图像修复

目录 简述 1. 原理说明 1.1 Navier-Stokes方法&#xff08;INPAINT_NS&#xff09; 1.2 快速行进方法&#xff08;INPAINT_TELEA&#xff09; 2. 实现步骤 2.1 输入图像和掩膜&#xff08;Mask&#xff09; 2.2 调用cv2.inpaint()函数 2.3 完整代码示例 2.4 运行结果 …

数字化转型的三个阶段:信息化、数字化、数智化

在当今快速迭代的数字时代&#xff0c;企业的生存与发展已与数字化转型浪潮紧密相连。数字化转型不仅是对传统业务模式的深度革新&#xff0c;更是企业适应未来市场、提升竞争力的关键路径。这一过程并非一蹴而就&#xff0c;而是循序渐进地分为信息化、数字化、数智化三个阶段…

Spring Boot篇

为什么要用Spring Boot Spring Boot 优点非常多&#xff0c;如&#xff1a; 独立运行 Spring Boot 而且内嵌了各种 servlet 容器&#xff0c;Tomcat、Jetty 等&#xff0c;现在不再需要打成 war 包部署到 容器 中&#xff0c;Spring Boot 只要打成一个可执行的 jar 包就能独…

Python----Python高级(网络编程:网络基础:发展历程,IP地址,MAC地址,域名,端口,子网掩码,网关,URL,DHCP,交换机)

一、网络 早期的计算机程序都是在本机上运行的&#xff0c;数据存储和处理都在同一台机器上完成。随着技术的发展&#xff0c;人 们开始有了让计算机之间相互通信的需求。例如安装在个人计算机上的计算器或记事本应用&#xff0c;其运行环 境仅限于个人计算机内部。这种设置虽然…