【kafka】生产者

1. 主要参数:

  1. · **bootstrap.servers:**该参数用来指定生产者客户端连接Kafka集群所需的broker地址清单,具体的内容格式为host1:port1,host2:port2,可以设置一个或多个地址,中间以逗号隔开,此参数的默认值为“​”​。
    (并非需要所有的broker地址,因为生产者会从给定的broker里查找到其他broker的信息。不过建议至少要设置两个以上的broker 地址信息,当其中任意一个宕机时,生产者仍然可以连接到 Kafka集群上。)

  2. key.serializer /value.serializer指定key和value序列化操作的序列化器,这两个参数无默认值。注意这里必须填写序列化器的全限定名,如:org.apache.kafka.common.serialization.StringSerializer

  3. client.id 设定KafkaProducer对应的客户端id,默认值为“​”​。如果客户端不设置,则KafkaProducer会自动生成一个非空字符串,内容形式如“producer-1”​“producer-2”​,即字符串“producer-”与数字的拼接

  4. acks
    acks=1:可靠性和吞吐量之间的折中
    1)默认值即为1。生产者发送消息之后,只要分区的leader副本成功写入消息,那么它就会收到来自服务端的成功响应;
    2)如果消息无法写入leader副本,比如在leader 副本崩溃、重新选举新的 leader 副本的过程中,那么生产者就会收到一个错误的响应,为了避免消息丢失,生产者可以选择重发消息;
    3)如果消息写入leader副本并返回成功响应给生产者,且在被其他follower副本拉取之前leader副本崩溃,那么此时消息还是会丢失,因为新选举的leader副本中并没有这条对应的消息

     **acks=0**:生产者发送消息之后不需要等待任何服务端的响应。acks 设置为 0 可以达到最大的吞吐量。**acks=-1或acks=all** :需要等待ISR中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。如果ISR中可能只有leader副本,这样就退化成了acks=1的情况acks参数配置的值是一个字符串类型,而不是整数类型。![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/ecfd5e9216244a2bb0490596e2c7e560.png)
    
  5. max.request.size:
    用来限制生产者客户端能发送的消息的最大值,默认值为 1048576B,即 1MB;涉及一些其他参数的联动,比如broker端的message.max.bytes参数
    如将broker端的message.max.bytes参数配置为10,而max.request.size参数配置为20,那么当我们发送一条大小为15B的消息时,生产者客户端就会报出如下的异常:
    在这里插入图片描述

  6. retries和retry.backoff.ms
    用来配置生产者重试的次数,默认值为0,即在发生异常的时候不进行任何重试动作,如果重试达到设定的次数,那么生产者就会放弃重试并返回异常。
    retry.backoff.ms有关,这个参数的默认值为100,它用来设定两次重试之间的时间间隔,避免无效的频繁重试。
    在需要保证消息顺序的场合建议把参数max.in.flight.requests.per.connection配置为1,而不是把acks配置为0,不过这样也会影响整体的吞吐。

  7. compression.type:
    “gzip”​“snappy”和“lz4”​。对消息进行压缩可以极大地减少网络传输量、降低网络I/O,从而提高整体的性能。消息压缩是一种使用时间换空间的优化方式,如果对时延有一定的要求,则不推荐对消息进行压缩。

  8. connections.max.idle.ms:指定在多久之后关闭限制的连接,默认值是540000(ms)​,即9分钟。

  9. linger.ms指定生产者发送 ProducerBatch 之前等待更多消息(ProducerRecord)加入ProducerBatch 的时间,默认值为 0。生产者客户端会在 ProducerBatch 被填满或等待时间超过linger.ms 值时发送出去。增大这个参数的值会增加消息的延迟,但是同时能提升一定的吞吐量。

  10. receive.buffer.bytes 设置Socket接收消息缓冲区(SO_RECBUF)的大小,默认值为32768(B)​,即32KB。如果设置为-1,则使用操作系统的默认值。如果Producer与Kafka处于不同的机房,则可以适地调大这个参数值。

  11. send.buffer.bytes
    用来设置Socket发送消息缓冲区(SO_SNDBUF)的大小,默认值为131072(B)​,即128KB。与receive.buffer.bytes参数一样,如果设置为-1,则使用操作系统的默认值。

  12. request.timeout.ms这个参数用来配置Producer等待请求响应的最长时间,默认值为30000(ms)​。请求超时之后可以选择进行重试。注意这个参数需要比broker端参数replica.lag.time.max.ms的值要大,这样可以减少因客户端重试而引起的消息重复的概率。

  13. **其他参数:**略~

另外:
KafkaProducer是线程安全的,可以在多个线程中共享单个KafkaProducer实例,也可以将KafkaProducer实例进行池化来供其他线程调用。

2. 消息返送:

  1. fire-and-forget(发后即忘):
    性能最好,可靠性最差
  2. sync(同步)
    在这里插入图片描述
    send(​)方法本身就是异步的,send(​)方法返回的Future对象可以使调用方稍后获得发送的结果。可以在执行send(​)方法之后直接链式调用了get(​)方法来阻塞等待Kafka的响应,直到消息发送成功,或者发生异常。

在这里插入图片描述
方法的返回值是一个Future类型的对象,那么完全可以用Java语言层面的技巧来丰富应用的实现,比如使用Future中的 get(long timeout,TimeUnit unit)方法实现可超时的阻塞。
可重试异常:
常见的可重试异常有:NetworkException、LeaderNotAvailableException、UnknownTopicOrPartitionException、NotEnoughReplicasException、NotCoordinatorException等

对于可重试的异常,如果配置了 retries 参数,那么只要在规定的重试次数内自行恢复了,就不会抛出异常。
在这里插入图片描述
同步发送的方式可靠性高,要么消息被发送成功,要么发生异常。如果发生异常,则可以捕获并进行相应的处理,而不会像“发后即忘”的方式直接造成消息的丢失。不过同步发送的方式的性能会差很多,需要阻塞等待一条消息发送完之后才能发送下一条。

  1. async(异步)
    一般是在send(​)方法里指定一个Callback的回调函数,Kafka在返回响应时调用该函数来实现异步的发送确认。
    在这里插入图片描述
    消息发送成功时,metadata 不为 null 而exception为null;消息发送异常时,metadata为null而exception不为null。

一个KafkaProducer不会只负责发送单条消息,更多的是发送多条消息,在发送完这些消息之后,需要调用KafkaProducer的close(​)方法来回收资源。
在这里插入图片描述
close(​)方法会阻塞等待之前所有的发送请求完成后再关闭KafkaProducer。如果调用了带超时时间timeout的close(​)方法,那么只会在等待timeout时间内来完成所有尚未完成的请求处理,然后强行退出。在实际应用中,一般使用的都是无参的close(​)方法。

3. 序列化:

除了用于String类型的序列化器,还有ByteArray、ByteBuffer、Bytes、Double、Integer、Long这几种类型,它们都实现了org.apache.kafka.common.serialization.Serializer接口
在这里插入图片描述
configure(​)方法用来配置当前类,serialize(​)方法用来执行序列化操作。而close(​)方法用来关闭当前的序列化器,一般情况下 close(​)是一个空方法,如果实现了此方法,则必须确保此方法的幂等性,因为这个方法很可能会被KafkaProducer调用多次。

4. 分区器:

如果消息ProducerRecord中没有指定partition字段,那么就需要依赖分区器,根据key这个字段来计算partition的值。分区器的作用就是为消息分配分区。
在这里插入图片描述
中partition(​)方法用来计算分区号,返回值为int类型。partition(​)方法中的参数分别表示主题、键、序列化后的键、值、序列化后的值,以及集群的元数据信息,通过这些信息可以实现功能丰富的分区器。
close(​)方法在关闭分区器的时候用来回收一些资源。
Partitioner 接口还有一个父接口org.apache.kafka.common.Configurable,这个接口中只有一个方法:在这里插入图片描述
用来获取配置信息及初始化数据。
如果key 不为 null,那么默认的分区器会对 key 进行哈希(采用MurmurHash2算法,具备高运算性能及低碰撞率)​,最终根据得到的哈希值来计算分区号,拥有相同key的消息会被写入同一个分区。如果key为null,那么消息将会以轮询的方式发往主题内的各个可用分区。

注意:如果 key 不为 null,那么计算得到的分区号会是所有分区中的任意一个;如果 key为null,那么计算得到的分区号仅为可用分区中的任意一个,注意两者之间的差别。

注意:
在不改变主题分区数量的情况下,key与分区之间的映射可以保持不变。不过,一旦主题中增加了分区,那么就难以保证key与分区之间的映射关系了。

只需同DefaultPartitioner一样实现Partitioner接口即可。默认的分区器在key为null时不会选择非可用的分区,我们可以通过自定义的分区器DemoPartitioner来打破这一限制

5. 拦截器:

生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等

产者拦截器的使用也很方便,主要是自定义实现org.apache.kafka.clients.producer.ProducerInterceptor接口。
在这里插入图片描述
r在将消息序列化和计算分区之前会调用生产者拦截器的onSend(​)方法来对消息进行相应的定制化操作。一般来说最好不要修改消息 ProducerRecord 的 topic、key 和partition 等信息

Producer 会在消息被应答(Acknowledgement)之前或消息发送失败时调用生产者拦截器的 onAcknowledgement(​)方法,优先于用户设定的 Callback 之前执行。这个方法运行在Producer 的 I/O 线程中,所以这个方法中实现的代码逻辑越简单越好,否则会影响消息的发送速度。

close(​)方法主要用于在关闭拦截器时执行一些资源的清理工作。

在这里插入图片描述
KafkaProducer中不仅可以指定一个拦截器,还可以指定多个拦截器以形成拦截链。拦截链会按照 interceptor.classes 参数配置的拦截器的顺序来一一执行(配置的时候,各个拦截器之间使用逗号隔开

6.生产者原理:

在这里插入图片描述
RecordAccumulator 主要用来缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。
RecordAccumulator 缓存的大小可以通过生产者客户端参数buffer.memory 配置,默认值为 33554432B,即 32MB。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send(​)方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms的配置,此参数的默认值为60000,即60秒。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/427110.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

《Google软件测试之道》笔记

介绍 GTAC:Google Test Automation Conference,Google测试自动化大会。 本书出版之前还有一本《微软测试之道》,值得阅读。 质量不是被测试出来的,但未经测试也不可能开发出有质量的软件。质量是开发过程的问题,而不…

ROS第五梯:ROS+VSCode+C++单步调试

解决问题:在ROS项目中进行断点调试。 第一步:创建一个ROS项目或者打开一个现有的ROS项目。 第二步:修改c_cpp_properties.json 增加一段命令: "compileCommands": "${workspaceFolder}/build/compile_commands.json"第三…

线结构光测量系统标定--导轨

光平面标定原理可查看之前的博文《光平面标定》,光条中心提取可参考线结构光专栏光条中心提取系列的文章,相机标定参考相机标定专栏中的博文。(欢迎进Q群交流:874653199) 线结构光测量系统(指一个线结构光传感器与一个…

rocky9虚拟机配置双网卡的详细过程

编辑虚拟机配置->添加->选择网络适配器->确认->打开虚拟机 1.ip add查看第二个网卡的名称,我这里是ens36 2.cd到网卡的配置文件目录 cd /etc/NetworkManager/system-connections/ ls3.复制一份网卡的配置文件并改名为ens36.nmconnection(根据自己的第…

计算机网络(运输层)

物理层、数据链路层以及网络层共同解决了将主机通过异构网络互联起来所面临的问题,实现了主机与主机之间的通信。 实际上在计算机网络中进行通信的真正实体事位于通信两端主机中的进程。 运输层的任务就会是提供运行在不同主机上的应用进程提供直接的通信服务&…

pybind11 学习笔记

pybind11 学习笔记 0. 一个例子1. 官方文档1.1 Installing the Library1.1.1 Include as A Submodule1.1.2 Include with PyPI1.1.3 Include with Conda-forge 1.2 First Steps1.2.1 Separate Files1.2.2 PYBIND11_MODULE() 宏1.2.3 example.cpython-38-x86_64-linux-gnu.so 的…

常见 HTTP 状态码详解与Nginx 文件上传大小限制

在我们日常使用 Nginx 搭建网站或应用服务时,可能会遇到很多与文件上传和请求响应相关的问题。今天我们就来聊聊 如何限制文件上传的大小,并介绍一些常见的 HTTP 状态码 及其在 Nginx 中的处理方式。 一、文件上传大小限制 有时,我们需要限…

从入门到精通,玩转Python的print函数(探索Python print函数的隐藏功能)

文章目录 📖 介绍 📖🏡 演示环境 🏡📒 文章内容 📒📝 基础用法参数详解示例📝 高级用法自定义分隔符和结束符输出到文件追加模式📝 覆盖打印与进度条简单覆盖打印动态进度条示例代码⚓️ 相关链接 ⚓️📖 介绍 📖 刚开始学习编程时,我们接触到的第一个方…

【初阶数据结构】一文讲清楚 “堆” 和 “堆排序” -- 树和二叉树(二)(内含TOP-K问题)

文章目录 前言1. 堆1.1 堆的概念1.2 堆的分类 2. 堆的实现2.1 堆的结构体设置2.2 堆的初始化2.3 堆的销毁2.4 添加数据到堆2.4.1 "向上调整"算法 2.5 从堆中删除数据2.5.1 “向下调整”算法 2.6 堆的其它各种方法接口函数 3. 堆排序3.1 堆排序的代码实现 4. TOP-K问题…

微软Office全家桶再爆办公革命,o1模型加持重塑十亿人工作流!1句话生成PPT+自定义智能体

颠覆全球十亿打工人的Office办公全家桶,昨夜迎来重磅升级! 在微软Copilot第二弹发布会上,CEO纳德拉官宣,「用AI构思,共同协作的全新工作流——WebWorkPages正式开启」。 全程半小时,每一幕都在透露着&…

GPT代码记录

#include <iostream>// 基类模板 template<typename T> class Base { public:void func() {std::cout << "Base function" << std::endl;} };// 特化的子类 template<typename T> class Derived : public Base<T> { public:void…

基于JDK1.8和Maven的GeoTools 28.X源码自主构建实践

目录 前言 一、GeoTools与Jdk的版本关系 1、GeoTools与Jdk版本 2、编译环境简介 二、使用Maven编译GeoTools28.X 1、GeoTools28.x 2、Maven的完整编译 3、构建时的问题 三、总结 前言 想要学习和掌握一个开源软件或者项目&#xff0c;源码是我们主要学习的内容。学习开…

JDBC笔记

文章目录 准备MySQL数据的建立和建表 idea 建工程和模块设置属性配置文件编写JDBC代码URL的设置JDBC 代码配置文件 准备MySQL 数据的建立和建表 idea 建工程和模块 设置属性配置文件 编写JDBC代码 URL的设置 JDBC 代码 package com.yanyu;import java.sql.*; import java.util…

vue2.0+ts注册全局函数和几个递归查找

vue2.0ts注册全局函数和几个递归查找 一、main.ts 一、main.ts // 定义你的全局函数,判断是否有按钮权限 interface Item {label: string;checked: number;[k: string]: any; } // 获取按钮时候权限 function globalLable(arr: Item[], label: string): boolean {for (const i…

硬件基础知识

驱动开发分为&#xff1a;裸机驱动、linux驱动 嵌入式&#xff1a;以计算机技术为基础&#xff0c;软硬结合的、可移植、可剪裁的专用计算机 单片机最小单元&#xff1a;vcc gnd reset 晶振 cpu --- soc :system on chip 片上外设 所有的程序都是在soc&#xff08;cpu&…

1.熟悉接口测试(Postman工具)

一、接口及其类型 API&#xff0c;应用编程接口&#xff0c;简称接口 通过接口&#xff0c;可以让程序和程序之间&#xff0c;能够互相交互。 接口分为两大类&#xff1a; 1&#xff09;基于TCP全双工&#xff08;适用于postman&#xff09; 2&#xff09;基于HTTP半双工 二、…

项目管理 | 一文读懂什么是敏捷开发管理

在快速变化的商业环境中&#xff0c;项目管理方式也在不断演进&#xff0c;其中敏捷开发管理因其高效、灵活和适应性强的特点&#xff0c;逐渐成为众多企业和团队的首选。本文将详细解析敏捷开发管理的定义、具体内容及其核心角色&#xff0c;帮助读者全面理解这一先进的项目管…

普罗米修斯监控

目录 概念 部署方法 1. 二进制&#xff08;源码包&#xff09; 2. 部署在k8s集群当中&#xff0c;用pod形式部署 概念 prometheus是开源的系统监控和告警。在k8s分布式的容器化管理系统当中&#xff0c;一般都是搭配prometheus来进行监控。它是服务监控系统&#xff0c;也…

git reflog 和 git log 的详解和区别

文章目录 1. git log 介绍基本用法&#xff1a;输出内容&#xff1a;常见选项&#xff1a;git log 的局限性&#xff1a; 2. git reflog 介绍基本用法&#xff1a;输出内容&#xff1a;git reflog 输出字段&#xff1a;常见选项&#xff1a;主要用途&#xff1a;示例&#xff1…