Kafka 源码分析——Producer

文章目录

  • 前言
  • Producer 整体流程
  • Producer 初始化
  • Producer 发送流程
    • 执行拦截器逻辑
    • 获取集群元数据
    • 序列化
    • 选择分区
    • 消息累加进缓存
    • 消息发送
  • Producer缓冲区
  • Producer 参数调优

前言

在 Kafka 中, 把产生消息的一方称为 Producer 即 生产者,它是 Kafka 的核心组件之一, 也是消息的来源所在。它的主要功能是将客户端的请求打包封装发送到 kafka 集群的某个 Topic 的某个分区上。那么这些生产者产生的消息是怎么传到 Kafka 服务端的呢?

Producer 整体流程

Kafka一条消息发送和消费的流程

image.png

站在源码的核心角度,可以把Producer分成以下几个核心部分:

  1. Producer初始化
  2. Producer发送流程
  3. Producer缓冲区
  4. Producer参数与调优

Producer 初始化

image.png

因为源码中有非常多的一些额外处理,所以解读源码没必要每行都读,只需要根据梳理的主流程找到核心代码进行解读就可以。

设置分区器(partitioner),分区器是支持自定义的

image.png

设置重试时间

重试时间(retryBackoffMs)默认100ms。如果发送消息到broker时抛出异常,且是允许重试的异常,那么就会最大重试retries参数指定的次数,同时retryBackoffMs是重试的间隔。

image.png

设置序列化器

image.png

设置拦截器(interceptors)

image.png

拦截器一般用得不多,可以为消息统一添加字段或统计发送失败成功次数,这些逻辑会拖慢producer的消息发送效率,不推荐生产中使用。

想要实现拦截器,我们需要先实现ProducerInterceptor接口即可,然后在生产者中设置进去即可。

image.png

image.png

image.png
上图的一些设置

  1. 设置最大的消息为多大(maxRequestSize), 默认最大1M, 生产环境可以提高到10M

  2. 设置缓存大小(totalMemorySize) 默认是32M

  3. 设置压缩格式(compressionType)

  4. 初始化RecordAccumulator也就是缓冲区指定为32M

设置缓冲区

image.png

设置消息累加器

因为生产者是通过缓冲的方式发送,所以需要一个消息累加器配合才能完成消息的发送。

image.png

初始化集群元数据(metadata)

image.png

创建Sender线程

image.png

这里还初始化了一个重要的管理网路的组件 NetworkClient

image.png

KafkaThread将Sender设置为守护线程并启动

image.png

Producer 发送流程

执行拦截器逻辑

执行拦截器逻辑,预处理消息,封装 Producer Record

image.png

获取集群元数据

从 Kafka Broker 集群获取集群元数据metadata

image.png

序列化

调用Serializer.serialize()方法进行消息的key/value序列化

image.png

选择分区

调用partition()选择合适的分区策略,给消息体 Producer Record 分配要发送的 topic 分区号

image.png

消息累加进缓存

将消息缓存到RecordAccumulator 收集器中, 最后判断是否要发送。

image.png

消息发送

真正的消息发送是Sender线程来做,并且还要结合缓冲区来处理。这里我们只需要知道发送的条件:缓冲区数据大小达到 batch.size 或者 linger.ms 达到上限。

Producer缓冲区

Kafka生产者的缓冲区,也就是内存池,可以将其类比为连接池(DB, Redis),主要是避免不必要的创建连接的开销,。这样内存池可以对 RecordBatch 做到反复利用,防止引起Full GC问题。

核心就是这段代码:

image.png

image.png

Kafka 内存设计有两部分,可用的内存(未分配的内存,初始的时候是 32M)和已经被分配了的内存,每个小 Batch 是 16K,然后这一个个的 Batch 就可以被反复利用,不需要每次都申请内存, 两部分加起来是 32M。

申请内存的过程

发送流程中会把消息放入 accumulator中,即调用 accumulator.append() 追加, 然后把消息封装成一个个Batch 进行发送,然后去申请内存(free.allocate())。

image.png

  1. 如果申请的内存大小超过了整个缓存池的大小,则抛异常出来。
  2. 如果申请的大小是每个 recordBatch 的大小(16K),并且已分配内存不为空,则直接取出来一个返回。
  3. 如果整个内存池大小比要申请的内存大小大 (this.availableMemory + freeListSize >= size),则直接从可用内存申请一块内存。

Producer 参数调优

在 Kafka 实际使用中,Producer 端既要保证吞吐量,又要确保无消息丢失,一些核心参数的配置就显得至关重要。

acks

参数说明:对于 Kafka Producer 来说是一个非常重要的参数,它表示指定分区中成功写入消息的副本数量,是 Kafka 生产端消息的持久性的保证。

max.request.size

参数说明:这个参数对于 Kafka Producer 也比较重要, 表示生产端能够发送的最大消息大小,默认值为1048576(1M)

调优建议:这个配置对于生产环境来说有点小, 为了避免因消息过大导致发送失败,生产环境建议适当调大,比如可以调到10485760(10M)

retries

参数说明:表示生产端消息发送失败时的重试次数,默认值为0,即不重试。 这个参数一般是为了解决因系统瞬时故障导致的消息发送失败,比如网络抖动、Leader 选举及重选举,其中瞬时的 Leader 重选举是比较常见的。因此这个参数的设置对于 Kafka Producer 就显得非常重要

调优建议:这里建议设置为一个大于0的值,比如3次。

retry.backoff.ms

参数说明:**设定两次重试之间的时间间隔,避免无效的频繁重试,默认值为100, **主要跟 retries 配合使用, 在配置 retries 和 retry.backoff.ms 之前,最好先估算一下可能的异常恢复时间,需要设定总的重试时间要大于异常恢复时间,避免生产者过早的放弃重试。

connections.max.idele.ms

参数说明:主要用来判断多久之后关闭空闲的链接,默认值540000(ms)即9分钟。

compression.type

参数说明: 该参数表示生产端是否要对消息进行压缩,默认值为不压缩(none)。 压缩可以显著减少网络IO传输、磁盘IO以及磁盘空间,从而提升整体吞吐量,但也是以牺牲CPU开销为代价的。

调优建议:出于提升吞吐量的考虑,建议在生产端对消息进行压缩。对于Kafka来说,综合考虑吞吐量与压缩比,建议选择lz4压缩。如果追求最高的压缩比则推荐zstd压缩。

buffer.memory

参数说明: 该参数表示生产端消息缓冲池或缓冲区的大小,默认值为即33554432(32M) 。这个参数基本可以认为是 Producer 程序所使用的内存大小。

调优建议:通常我们应尽量保证生产端整体吞吐量,建议适当调大该参数,也意味着生产客户端会占用更多的内存。

batch.size

参数说明: 该参数表示发送到缓冲区中的消息会被封装成一个一个的Batch,分批次的发送到 Broker 端,默认值为16KB。 因此减小 batch 大小有利于降低消息延时,增加 batch 大小有利于提升吞吐量。

调优建议:通常合理调大该参数值,能够显著提升生产端吞吐量,比如可以调整到32KB,调大也意味着消息会有相对较大的延时。

linger.ms

参数说明: 该参数表示用来控制 Batch 最大的空闲时间,超过该时间的 Batch 也会自动被发送到 Broker 端。 实际情况中, 这是吞吐量与延时之间的权衡。默认值为0,表示消息需要被立即发送,无需关系 batch 是否被填满。

调优建议:通常为了减少请求次数、提升整体吞吐量,建议设置一个大于0的值,比如设置为100,此时会在负载低的情况下带来100ms的延时。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/140784.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spring面试题20:Spring怎样开启注解装配?

该文章专注于面试,面试只要回答关键点即可,不需要对框架有非常深入的回答,如果你想应付面试,是足够了,抓住关键点 面试官:Spring怎样开启注解装配? 要在Spring中开启注解装配,需要进行以下几个步骤: 添加必要的依赖:在项目的构建工具(如Maven或Gradle)配置文件中…

WebGL 选中一个表面

目录 选中一个表面 示例程序(PickFace.js) 代码详解 gl.readPixels()见126行效果 gl.UNSIGNED_BYTE注意点 示例效果 选中一个表面 ​​​​​​​WebGL 选中物体_山楂树の的博客-CSDN博客可以使用同样的方法来选中物体的某一个表面。这一节在Pi…

苹果手表 Series 6 拆解

步骤 1 苹果手表 Series 6 拆解 Series 6(右)与具有一年历史的姐妹(左)的外部比较仅显示出细微的差异,但这就是拆卸的目的。我们已经知道这些细节: LTPO OLED Retina 显示屏针对常亮功能进行了优化——这次…

亚马逊 CodeWhisperer 初体验

1、CodeWhisperer 介绍 CodeWhisperer 是亚马逊出品的一款基于机器学习的通用代码生成器,可实时提供代码建议。类似 Cursor 和 Github Copilot 编码工具。 官网:AI 代码生成器 - Amazon CodeWhisperer - AWS 在编写代码时,它会自动根据您现…

【深度学习实验】前馈神经网络(三):自定义两层前馈神经网络(激活函数logistic、线性层算子Linear)

目录 一、实验介绍 二、实验环境 1. 配置虚拟环境 2. 库版本介绍 三、实验内容 0. 导入必要的工具包 1. 构建数据集 2. 激活函数logistic 3. 线性层算子 Linear 4. 两层的前馈神经网络MLP 5. 模型训练 一、实验介绍 本实验实现了一个简单的两层前馈神经网络 激活函数…

JavaScript(WebAPI)

目录 一.WebAPI 二.DOM 1.选中页面元素 2.事件 三.操作元素 获取修改元素内容 获取/修改表单元素属性 value type 获取/修改样式属性 1.修改内联样式 2.修改元素应用的CSS类名 四.操作节点 1.新增元素 2.删除元素 五.小结 六.案例 1.网页版本的猜数字 2.表白…

Python | 为FastAPI后端服务添加API Key认证(分别基于路径传参和header两种方式且swagger文档友好支持)

文章目录 01 前言02 路径传参方式添加API Key2.1 完整代码2.2 请求示例2.3 swagger文档测试 03 请求头Header方式传入API Key(推荐)3.1 完整代码3.2 请求示例3.3 swagger文档测试 01 前言 FastAPI,如其名所示,是一个极为高效的框…

CSS3有哪些新特性

CSS3 引入了许多新特性&#xff0c;以增强样式设计和页面布局的能力&#xff0c;提供更多的视觉效果和交互性。以下是一些 CSS3 中的新特性&#xff1a; 圆角边框&#xff08;Border Radius&#xff09;&#xff1a;圆角的边框&#xff0c;而不是传统的方形边框。 <!DOCTY…

Java面试被问了几个简单的问题,却回答的不是很好

作者&#xff1a;逍遥Sean 简介&#xff1a;一个主修Java的Web网站\游戏服务器后端开发者 主页&#xff1a;https://blog.csdn.net/Ureliable 觉得博主文章不错的话&#xff0c;可以三连支持一下~ 如有需要我的支持&#xff0c;请私信或评论留言&#xff01; 前言 前几天参加了…

解决0-1背包问题(方案二):一维dp数组(滚动数组)

往期文章&#xff1a;解决0-1背包问题&#xff08;方案一&#xff09;:二维dp数组_呵呵哒(&#xffe3;▽&#xffe3;)"的博客-CSDN博客https://blog.csdn.net/weixin_41987016/article/details/133207350?spm1001.2014.3001.5501 >>探索一维dp数组和二维dp数组的…

深入学习 Redis Cluster - 基于 Docker、DockerCompose 搭建 Redis 集群,处理故障、扩容方案

目录 一、基于 Docker、DockerCompose 搭建 Redis 集群 1.1、前言 1.2、编写 shell 脚本 1.3、执行 shell 脚本&#xff0c;创建集群配置文件 1.4、编写 docker-compose.yml 文件 1.5、启动容器 1.6、构建集群 1.7、使用集群 1.8、如果集群中&#xff0c;有节点挂了&am…

【沐风老师】3DMAX翻转折叠动画插件FoldFx使用方法详解

3DMAX翻转折叠动画插件FoldFx使用方法详解 3DMAX翻转折叠动画插件FoldFx&#xff0c;是3dMax运动图形工具&#xff0c;用于创建多边形折叠动画。用户几乎有无限的可能性&#xff0c;因为动画的每个方面都是可控的。 【适用版本】 适用于3dMax版本&#xff1a;2010及更新版本&a…

让Pegasus天马座开发板实现超声波测距

在完成《让Pegasus天马座开发板用上OLED屏》后&#xff0c;我觉得可以把超声波测距功能也在Pegasus天马座开发板上实现。于是在箱子里找到了&#xff0c;Grove - Ultrasonic Ranger 这一超声波测传感器。 官方地址: https://wiki.seeedstudio.com/Grove-Ultrasonic_Ranger 超声…

OCR -- 文本检测

目标检测&#xff1a; 不仅要解决定位问题&#xff0c;还要解决目标分类问题&#xff0c;给定图像或者视频&#xff0c;找出目标的位置&#xff08;box&#xff09;&#xff0c;并给出目标的类别&#xff1b; 文本检测&#xff1a; 给定输入图像或者视频&#xff0c;找出文本的…

SpringBoot之响应处理

文章目录 前言一、返回值处理器ReturnValueHandler流程关于HttpMessageConverters的初始化ReturnValueHandler与MappingJackson2HttpMessageConverter关联 二、内容协商内容协商原理底层源码 三、自定义MessageConverter总结 前言 包括返回值处理器ReturnValueHandler、内容协…

【Vue.js】使用Element搭建登入注册界面axios中GET请求与POST请求跨域问题

一&#xff0c;ElementUI是什么&#xff1f; Element UI 是一个基于 Vue.js 的桌面端组件库&#xff0c;它提供了一套丰富的 UI 组件&#xff0c;用于构建用户界面。Element UI 的目标是提供简洁、易用、美观的组件&#xff0c;同时保持灵活性和可定制性 二&#xff0c;Element…

套接字socket编程的基础知识点

目录 前言&#xff08;必读&#xff09; 网络字节序 网络中的大小端问题 为什么网络字节序采用的是大端而不是小端&#xff1f; 网络字节序与主机字节序之间的转换 字符串IP和整数IP 整数IP存在的意义 字符串IP和整数IP相互转换的方式 inet_addr函数&#xff08;会自…

相机One Shot标定

1 原理说明 原理部分网上其他文章[1][2]也已经说的比较明白了&#xff0c;这里不再赘述。 2 总体流程 参考论文作者开源的Matlab代码[3]和github上的C代码[4]进行说明&#xff08;不得不说还是Matlab代码更优雅&#xff09; 论文方法总体分两部&#xff0c;第一部是在画面中找…

封装一个高级查询组件

封装一个高级查询组件 背景一&#xff0c;前端相关代码二&#xff0c;后端相关代码三&#xff0c;呈现效果总结 背景 业务有个按照自定义选择组合查询条件&#xff0c;保存下来每次查询的时候使用的需求。查了一下项目里的代码没有现成的组件可以用&#xff0c;于是封装了一个 …

Python实战实例代码-网络爬虫-数据分析-机器学习-图像处理

Python实战实例代码-网络爬虫-数据分析-机器学习-图像处理 Python实战实例代码1. 网络爬虫1.1 爬取网页数据1.2 爬取图片1.3 爬取动态数据&#xff08;使用Selenium&#xff09; 2. 数据分析2.1 数据清洗2.2 数据变换2.3 数据聚合 3. 机器学习3.1 线性回归3.2 随机森林3.3 K-Me…