Kakfa - Producer机制原理与调优

Producer是Kakfa模型中生产者组件,也就是Kafka架构中数据的生产来源,虽然其整体是比较简单的组件,但依然有很多细节需要细品一番。比如Kafka的Producer实现原理是什么,怎么发送的消息?IO通讯模型是什么?在实际工作中,怎么调优来实现高效性?

简单的生产者程序:

一、客户端初始化  KafkaProducer

new KafkaProducer() 是Producer初始化过程,比如Interceptor、Serializer、Partitioner、RecordAccumulator等。当我们使用KafkaProducer发送消息的时候,消息会经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner),最后会暂存到消息收集器(RecordAccumulator)中,最终读取按批次发送。

以下跟踪比较核心的机制流程:

1、 初始化RecordAccumulator记录累加器

简单介绍:RecordAccumulator可以理解为Producer发送数据缓冲区,Producer数据发送时并不会直接连接Broker后,一条一条的发送,而是会将数据(Record)放入RecordAccumulator中按批次发送。

2、初始化Sender的Iothread,在Producer在初始化过程中,会额外的创建一个ioThread。

二、Send方法

到此位置Kafka只是做了一些初始化的工作,没有与kafka集群建立连接,更没有相关元数据信息。那继续看send中的doSend方法。

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {TopicPartition tp = null;try {try {// waitOnMetadata更新元数据clusterAndWaitTime = this.waitOnMetadata(record.topic(),record.partition(), this.maxBlockTimeMs);} catch (KafkaException var19) {Cluster cluster = clusterAndWaitTime.cluster;........byte[] serializedKey;try {// 序列化serializedKey = this.keySerializer.serialize(record.topic(), record.headers(), record.key());} catch (ClassCastException var18) {........byte[] serializedValue;try {serializedValue = this.valueSerializer.serialize(record.topic(), record.headers(), record.value());} // 获取元数据中partition信息int partition = this.partition(record, serializedKey, serializedValue, cluster);// ..........// 数据append到accumulator中RecordAppendResult result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs);..........return result.future;

1、WaitonMetadat元数据更新

方法内部会优先判断当前Cluster是否存在元数据partitions,如果不存在意味着还没有建立连接获取元数据,此时它会wakeup唤醒sender线程。

注意:此时Cluster并不是完全时空的,它已经有指定的Node列表信息。

在早期版本的时候元数据是存储在zookeeper中的, 元数据指的是集群中分区信息、节点信息、以及节点、主题、分区的映射关系等。在生产者启动的时候没有元数据的支撑,是无法进行数据的发送的,等于瞎子。但是zookeeper存储元数据,在并发场景下会对zookeeper产生网卡压力,那就意味着要保障Kakfa可靠性的前提就要保障zookeeper的可靠性。

所以在1.0版本之后,Kafka将元数据维护在了Broker节点中。Producer可以通过Borker获取元数据,减少对zookeeper的依赖。只有一些核心的内容交给zookeeper做分布式协调。

2、Sender线程run方法

Sender线程中run方法,一个while(runing),这是一中Loop过程一种常见的响应式编程方式,比如Redis服务中也是一种EventLoop事件轮询过程。

其内部核心方法NetWorkClient.poll实现了客户端连接、数据发送、事件处理工作。

metadataUpdater.maybeUpdate方法在第一次被执行时,因为没有元数据节点信息,会执行this.maybeUpdate(now, node)方法,方法内部实现了initiateConnect方法用于客户端建立连接,其底层就是使用的Java Nio的Selector多路复用器。

建立连接之后,nioSelector.select()等待事件响应。

之后触发handleCompletedReceives处理器进行元数据同步过程。

注意: 在完成元数据更新以后,metadata.update会调用 this.notifyAll(),唤醒阻塞的main线程,进行数据发送工作。

到此为止主线程waitOnMetadata方法完成元数据的更新。

之后main就开始处理Serializer序列化,获取partition元数据信息,以及数据发送工作。

3、RecordAccumulator 记录累加器

生产者在发送数据时,并不是建立连接后每消息发送的,而是会将消息按批次发送。RecordAccumulator 对象中batches会为每一个TopicPartition维护一个双端队列。用于缓存record数据。

ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches

TopicPartition 主题分区:缓冲区按照主题分为不同的双端队列

Deque<ProducerBatch> 双端队列,ProducerBatch:一批次的数据(多个数据,默认容量16k)

结构如下:

生产者在往batches中添加数据时,使用了Sychronized,所以Producer在多线程场景下是线程安全的。

为什么要有RecordAccumulator ?

RecordAccumulator的主要作用是暂存Main Thread发送过来的消息,然后Sender Thread就可以从RecordAccumulator中批量的获取到消息,减少单个消息获取的请求次数,减少网卡IO压力,提升性能效率。

相关参数配置以及调优点:

1、RecordAccumulator buffer.memory默认大小32mb

指每一个new KafkaProducer中RecordAccumulator的batches所有承载的最大buffer.memory=32mb。

设置方式:properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 3210241024);

如果RecordAccumulator 缓存空间满时,会进行阻塞,等待数据被消费,如果指定时间内消息没有发送除去,即仍然是满状态,则抛出异常,默认60s。

设置方式:properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60*1000);

优化点: 根据业务需求,如果TopicPartition较多,而数据量很大时,这是及时单个TopicPartition中batche很少,可能总的容量也超过32mb,这时可以扩大buffer_memery大小。

2、Kafka中单个batche大小默认为16k。

指每个batche大小为16k。batche用于存储数据Record。

如果record < 16k,则batche可以存储多个数据,此时batche空间是会被重复利用的。

如果record > 16k,则当前record会额外申请存储空间,使用完后销毁。

优化点:batche大小需要根据业务评估,不要有过多大record存在,确保每一个batche可以容纳record,尽量减少内存空间的频繁申请和销毁,以及内存碎片化。

 3、同步阻塞和非阻塞的选择

RecordAccumulator用于支持分批次发送数据。在KafkaProducer中send方法是异步接口,通过 send.get()方法可以使其阻塞,等待数据返回。

实现同步的发送数据,需要等待kafka接收了record后响应,producer才会进行下一个record发送。此时虽然会有更高一致性,但RecordAccumulator就失去了意义

非阻塞send情况下,当生产和消费端IO不对称时,可以通过LINGRE_MS_CONFG 30 来要求sender线程每次拉取RecordAccumulator中数据时等待一段时间再拉取,尽量确保按批次拉取,减少更多的网络IO。

设置方式:properties.put(ProducerConfig.LINGRE_MS_C0NFG , 0);

继续内容分析

到此当Main线程将数据append到RecordAccumulator容器后,其核心的工作就结束了,此时它也会调用sender.wakeup,告知已经有数据需要处理了,并确保sender线程不会select阻塞住。

Sender线程是一个Loop过程,在发送数据过程中,会从RecordAccumulator中拉取批次数据进行打包发送,并不是一个个batche发送。默认封装的包大小为1mb。

设置方式:pp.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG , String.valueOf(1 * 1024 * 1024))

Sender线程在真正发送数据前,还额外存储了Request数据到InFilgntRequest(飞行中的包),InFilgntRequest 默认大小为5,意思是指生产者向kafka发送5个包request后,都没有回应时,则停止发送变成阻塞状态。

这种设计在同步发送过程没有作用的,因为同步过程是每请求返回的。

SEND_BUFFER_CONFIG 发送缓冲区配置、RECEIVE_BUFFER_CONFIG 接收缓冲区配置,这两个就是IO层的缓冲区配置了,不同的操作系统可能不一样。设置成-1,代表默认使用系统分配大小。

pp.setProperty(ProducerConfig.SEND_BUFFER_CONFIG , String.valueOf(32 * 1024));
pp.setProperty(ProducerConfig.RECEIVE_BUFFER_CONFIG , String.valueOf(32 * 1024));

查看内核默认配置:

到此Producer整个数据发送流程机制就清楚了,Ack的设定涉及到Broker数据同步和Consumer消费状态,这块单独再进行分析。

总结一下:

1、Producer的实现是由Main线程和Sender线程组合完成的。

Main线程核心完成了数据的输入、Producer初始化和数据append到RecordAccumulator工作,具体的元数据的更新、数据发送等IO操作都是都Sender线程完成。

Sender线程工作模式是中间件中比较常见的响应式编程模式。其在Loop过程中进行客户端连接、元数据更新、数据打包发送等工作。

2、Kakfa中IO操作封装了Java中Nio的实现(Selector),底层是多路复用器的实现,而不是netty。

3、Producer发送数据过程并不是简单的一条一条数据发送,其内部封装RecordAccumulator、Batche、Request包,可以实现按批次发送数据,减少IO次数。同时结合FilghtRequest飞行中请求大小限制,确保kafka未正常响应时,抛出异常防止数据丢失。在开发过程中,可以通过调整参数,来达到优化目的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/135068.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Prometheus黑盒测试模块,监控TCP端口+ HTTP/HTTPS路由状态

文章目录 一、黑盒测试使用场景二、安装blackbox-exporter三、监控TCP端口四、监控HTTP/HTTPS路由五、最后分享几款Grafana模板 一、黑盒测试使用场景 官方下载地址 blackbox-exporter是Prometheus官方提供的一个黑盒测试的解决方案&#xff0c;可用于以下使用场景&#xff1a…

大数据-玩转数据-Flink恶意登录监控

一、恶意登录 对于网站而言&#xff0c;用户登录并不是频繁的业务操作。如果一个用户短时间内频繁登录失败&#xff0c;就有可能是出现了程序的恶意攻击&#xff0c;比如密码暴力破解。 因此我们考虑&#xff0c;应该对用户的登录失败动作进行统计&#xff0c;具体来说&#x…

SpringBoot启动方式

SpringBoot启动方式 springboot的启动经过了一些一系列的处理&#xff0c;我们先看看整体过程的流程图 SpringBoot的启动方式 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><…

线性代数基础-矩阵

八、矩阵的基础概念 1.矩阵 我们忘掉之前行列式的一切&#xff0c;列一种全新的数表&#xff0c;虽然长得很像&#xff0c;但是大不相同&#xff0c;首先一个区别就是矩阵不能展开成一个值&#xff0c;这里不讨论矩阵的空间意义 { a 11 x 1 a 12 x 2 a 13 x 3 . . . a 1…

求解灰度直方图,如何绘制灰度直方图(数字图像处理大题复习 P1)

文章目录 1. 画 X 轴2. 画直方图3. Complete 视频原链接 数字图像处理期末考试大题 B站链接 1. 画 X 轴 2. 画直方图 有几个 0 就在图上画多高&#xff0c;同理有几个 1 &#xff0c;X1 的地方就画多高 3. Complete 这里的情况比较平均&#xff0c;一般来说不会这么平均&a…

CSS选择器笔记

A plate #id #fancy A B plate apple #id A #fancy pickle .classname .small A.className orange.small #id.className #big.wide A,B both plate,bento * all A * plate * AB 紧跟在盘子后的苹果 plate apple A~B 跟在盘子后面所有的泡菜 plate~b…

堆内存与栈内存

文章目录 1. 栈内存2. 堆内存3. 区别和联系参考资料 1. 栈内存 栈内存是为线程留出的临时空间 每个线程都有一个固定大小的栈空间&#xff0c;而且栈空间存储的数据只能由当前线程访问&#xff0c;所以它是线程安全的。栈空间的分配和回收是由系统来做的&#xff0c;我们不需…

LeetCode 周赛上分之旅 #45 精妙的 O(lgn) 扫描算法与树上 DP 问题

⭐️ 本文已收录到 AndroidFamily&#xff0c;技术和职场问题&#xff0c;请关注公众号 [彭旭锐] 和 BaguTree Pro 知识星球提问。 学习数据结构与算法的关键在于掌握问题背后的算法思维框架&#xff0c;你的思考越抽象&#xff0c;它能覆盖的问题域就越广&#xff0c;理解难度…

Qt基于paintEvent自定义CharView

Qt基于paintEvent自定义CharView 鼠标拖动&#xff0c;缩放&#xff0c;区域缩放&#xff0c; 针对x轴&#xff0c;直接上代码 charview.h #ifndef CHARVIEW_H #define CHARVIEW_H#include <QWidget> #include <QPainter> #include <QPaintEvent> #inclu…

unity学习第1天

本身也具有一些unity知识&#xff0c;包括Eidtor界面使用、Shader效果实现、性能分析&#xff0c;但对C#、游戏逻辑不太清楚&#xff0c;这次想从开发者角度理解游戏&#xff0c;提高C#编程&#xff0c;从简单的unity游戏理解游戏逻辑&#xff0c;更好的为工作服务。 unity201…

CMU 15-445 Project #3 - Query Execution(Task #1、Task #2)

文章目录 一、题目链接二、准备工作三、SQL 语句执行流程四、BusTub 表结构五、Task #1 - Access Method Executors5.1 顺序扫描执行器5.2 插入执行器5.3 删除执行器5.4 索引扫描执行器 六、Task #2 - Aggregation & Join Executors6.1 聚合执行器6.2 循环连接执行器6.3 索…

python 二手车数据分析以及价格预测

二手车交易信息爬取、数据分析以及交易价格预测 引言一、数据爬取1.1 解析数据1.2 编写代码爬1.2.1 获取详细信息1.2.2 数据处理 二、数据分析2.1 统计分析2.2 可视化分析 三、价格预测3.1 价格趋势分析(特征分析)3.2 价格预测 引言 本文着眼于车辆信息&#xff0c;结合当下较…

Linux的调试工具 - gdb(超详细)

Linux的调试工具 - gdb 1. 背景2. 开始使用指令的使用都用下面这个C语言简单小代码来进行演示&#xff1a;1. list或l 行号&#xff1a;显示文件源代码&#xff0c;接着上次的位置往下列&#xff0c;每次列10行。2. list或l 函数名:列出某个函数的源代码。3. r或run: 运行程序。…

7.前端·新建子模块与开发(自动生成)

文章目录 学习地址视频笔记自动代码生成模式开发增删改查功能调试功能权限分配 脚本实现权限分配 学习地址 https://www.bilibili.com/video/BV13g411Y7GS/?p15&spm_id_frompageDriver&vd_sourceed09a620bf87401694f763818a31c91e 视频笔记 自动代码生成模式开发 …

简单返回封装实体类(RespBean)

RespBean的作用 返回状态码&#xff0c;返回信息&#xff0c;返回数据 package com.example.entity;import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor;Data AllArgsConstructor NoArgsConstructor public class RespBean {private lon…

AndroidStudio 安装与配置【安装教程】

1.下载软件 进入官网https://developer.android.google.cn/studio&#xff0c;直接点击下载 2.阅读并同意协议书 直接下滑至最底部 如果这里出现了无法访问 官方地址&#xff1a;https://redirector.gvt1.com/edgedl/android/studio/install/2022.3.1.19/android-studio-2022.…

Windows11系统C盘用户文件夹下用户文件夹为中文,解决方案

说明&#xff1a; 1. 博主电脑为Windows11操作系统&#xff0c;亲测有效&#xff0c;修改后无任何影响&#xff0c;软件都可以正常运行&#xff01; 2. Windows10系统还不知道可不可行&#xff0c;因为Windows11的计算机管理中没有本地用户和组&#xff0c;博主在csdn上看到很…

后端中间件安装与启动(Redis、Nginx、Nacos、Kafka)

后端中间件安装与启动 RedisNginxNacosKafka Redis 1.打开cmd终端&#xff0c;进入redis文件目录 2.输入redis-server.exe redis.windows.conf即可启动&#xff0c;不能关闭cmd窗口 &#xff08;端口配置方式&#xff1a;redis目录下的redis.windows.conf配置文件&#xff0c;…

Pytorch-MLP-Mnist

文章目录 model.pymain.py参数设置注意事项初始化权重如果发现loss和acc不变关于数据下载关于输出格式 运行图 model.py import torch.nn as nn import torch.nn.functional as F import torch.nn.init as initclass MLP_cls(nn.Module):def __init__(self,in_dim28*28):super…

快递、外卖、网购自动定位及模糊检索收/发件地址功能实现

概述 目前快递、外卖、团购、网购等行业 &#xff1a;为了简化用户在收发件地址填写时的体验感&#xff0c;使用辅助定位及模糊地址检索来丰富用户的体验 本次demo分享给大家&#xff1b;让大家理解辅助定位及模糊地址检索的功能实现过程&#xff0c;以及开发出自己理想的作品…