kafka生产端之架构及工作原理

文章目录

  • 整体架构
  • 元数据更新

整体架构

消息在真正发往Kafka之前,有可能需要经历拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)等一系列的作用,那么在此之后又会发生什么呢?下面我们来看一下生产者客户端的整体架构,如图所示。

在这里插入图片描述

整个生产者客户端由两个线程协调运行,这两个线程分别为主线程Sender线程(发送线程)。在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。Sender线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。

RecordAccumulator主要用来缓存消息以便Sender线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator缓存的大小可以通过生产者客户端参数buffer.memory配置,默认值为33554432B,即32MB。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send0方法调用要么被阻塞,要么抛出异常,这个取决于参数max.b1ock.ms的配置,此参数的默认值为60000,即60秒。

主线程中发送过来的消息都会被追加到RecordAccumulator的某个双端队列(Deque)中,在RecordAccumulator的内部为每个分区都维护了一个双端队列,队列中的内容就是ProducerBatch,即Deque。消息写入缓存时,追加到双端队列的尾部;Sender读取消息时,从双端队列的头部读取。注意ProducerBatch不是ProducerRecord,ProducerBatch中可以包含一至多个ProducerRecord。通俗地说,ProducerRecord是生产者中创建的消息,而ProducerBatch是指一个消息批次,ProducerRecord会被包含在ProducerBatch中,这样可以使字节的使用更加紧漆。与此同时,将较小的ProducerRecord拼漆成一个较大的ProducerBatch,也可以减少网络请求的次数以提升整体的吞吐量。ProducerBatch和消息的具体格式有关。如果生产者客户端需要向很多分区发送消息,则可以将buffer.memory参数适当调大以增加整体的吞吐量。

消息在网络上都是以字节(Byte)的形式传输的,在发送之前需要创建一块内存区域来保存对应的消息。在Kafka生产者客户端中,通过java.io.ByteBuffer实现消息内存的创建和释放。不过频繁的创建和释放是比较耗费资源的,在RecordAccumulator的内部还有一个BufferPool,它主要用来实现ByteBuffer的复用,以实现缓存的高效利用。不过BufferPool只针对特定大小的ByteBuffer进行管理,而其他大小的ByteBuffer不会缓存进BufferPool中,这个特定的大小由batch.size参数来指定,默认值为16384B,即16KB。我们可以适当地调大batch.size参数以便多缓存一些消息。

ProducerBatch的大小和batch.size参数也有着密切的关系。当一条消息(ProducerRecord)流入RecordAccumulator时,会先寻找与消息分区所对应的双端队列(如果没有则新建),再从这个双端队列的尾部获取一个ProducerBatch(如果没有则新建),查看ProducerBatch中是否还可以写入这个ProducerRecord,如果可以则写入,如果不可以则需要创建一个新的ProducerBatch。在新建ProducerBatch时评估这条消息的大小是否超过batch.size参数的大小,如果不超过,那么就以batch.size参数的大小来创建ProducerBatch,这样在使用完这段内存区域之后,可以通过BufferPool的管理来进行复用;如果超过,那么就以评估的大小来创建ProducerBatch,这段内存区域不会被复用。

Sender从RecordAccumulator中获取缓存的消息之后,会进一步将原本<分区,Deque>的保存形式转变成<Node,ListProducerBatch>的形式,其中Node表示Kafka集群的broker节点。对于网络连接来说,生产者客户端是与具体的broker节点建立的连接,也就是向具体的broker节点发送消息,而并不关心消息属于哪一个分区;而对于KafkaProducer的应用逻辑而言,我们只关注向哪个分区中发送哪些消息,所以在这单需要做一个应用逻辑层面到网络IO层面的转换。

在转换成<Node,ListProducerBatch>>的形式之后,Sender还会进一步封装成<Node,Request>的形式,这样就可以将Request请求发往各个Node了,这里的Request是指Kafka的各种协议请求,对于消息发送而言就是指具体的ProduceRequest。

请求在从Sender线程发往Kafka之前还会保存到InFlightRequests中,InFlightRequests保存对象的具体形式为Map<NodeId,Deque>,它的主要作用是缓存了已经发出去但还没有收到响应的请求(NodeId是一个String类型,表示节点的id编号)。与此同时,InFlightRequests还提供了许多管理类的方法,并且通过配置参数还可以限制每个连接(也就是客户端与Node之间的连接)最多缓存的请求数。这个配置参数为max.in.flight.requests.per.connection,默认值为5,即每个连接最多只能缓存5个未响应的请求,超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应(Response)。通过比较Deque的size与这个参数的大小来判断对应的Node中是否已经堆积了很多未响应的消息,如果真是如此,那么说明这个Node节点负载较大或网络连接有问题,再继续向其发送请求会增大请求超时的可能。

元数据更新

上面提及的InFlightRequests还可以获得leastLoadedNode,即所有Node中负载最小的那一个。这里的负载最小是通过每个Node在InFlightRequests中还未确认的请求决定的,未确认的请求越多则认为负载越大。对于图中的InFlightRequests来说,图中展示了三个节点Node0、Node1和Node2,很明显Node1的负载最小。也就是说,Node1为当前的leastLoadedNodec选择leastLoadedNode发送请求可以使它能够尽快发出,避免因网络拥塞等异常而影响整体的进度。leastLoadedNode的概念可以用于多个应用场合,比如元数据请求、消费者组播协议的交互。
在这里插入图片描述

我们只知道主题的名称,对于其他一些必要的信息却一无所知。KafkaProducer要将此消息追加到指定主题的某个分区所对应的leader副本之前,首先需要知道主题的分区数量,然后经过计算得出(或者直接指定)目标分区,之后KafkaProducer需要知道目标分区的leader副本所在的broker节点的地址、端口等信息才能建立连接,最终才能将消息发送到Kafka,在这一过程中所需要的信息都属于元数据信息。

在上面的讲解中我们了解了bootstrap.servers参数只需要配置部分broker节点的地址即可,不需要配置所有broker节点的地址,因为客户端可以自己发现其他broker节点的地址,这一过程也属于元数据相关的更新操作。与此同时,分区数量及leader副本的分布都会动态地变化,客户端也需要动态地捕捉这些变化。

元数据是指Kafka集群的元数据,这些元数据具体记录了集群中有哪些主题,这些主题有哪些分区,每个分区的leader副本分配在哪个节点上,follower副本分配在哪些节点上,哪些副本在AR、ISR等集合中,集群中有哪些节点,控制器节点又是哪一个等信息

当客户端中没有需要使用的元数据信息时,比如没有指定的主题信息,或者超过metadata.max.age.ms时间没有更新元数据都会引起元数据的更新操作。客户端参数metadata.max.age.ms的默认值为300000,即5分钟。元数据的更新操作是在客户端内部进行的,对客户端的外部使用者不可见。当需要更新元数据时,会先挑选出leastLoadedNode,然后向这个Node发送MetadataRequest请求来获取具体的元数据信息。这个更新操作是由Sender线程发起的,在创建完MetadataRequest之后同样会存入InFlightRequests,之后的步骤就和发送消息时的类似。元数据虽然由Sender线程负责更新,但是主线程也需要读取这些信息,这里的数据同步通过synchronized和final关键字来保障

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/16909.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

二、交换机的vlan子设备接入

一、交换机的vlan设置-CSDN博客 二、交换机的vlan子设备接入-CSDN博客 接上篇的文章&#xff0c;本文接入了子设备 网络结构如下&#xff1a; 用路由器A和POE交换机B代替第一篇中的笔记本电脑&#xff0c;路由器A和交换机B都关闭DHCP服务&#xff0c;并分别接入一个IPC&#…

DedeBIZ系统审计小结

之前简单审计过DedeBIZ系统&#xff0c;网上还没有对这个系统的漏洞有过详尽的分析&#xff0c;于是重新审计并总结文章&#xff0c;记录下自己审计的过程。 https://github.com/DedeBIZ/DedeV6/archive/refs/tags/6.2.10.zip &#x1f4cc;DedeBIZ 系统并非基于 MVC 框架&…

C语言基本概念————讨论sqrt()和pow()函数与整数的关系

本文来源&#xff1a;C语言基本概念——讨论sqrt()和pow()函数与整数的关系. C语言基本概念——sqrt和pow函数与整数的关系 1. 使用sqrt()是否可以得到完全平方数的精确的整数平方根1.1 完全平方数的计算结果是否精确&#xff1f;1.2 为什么不会出现误差&#xff08;如 1.99999…

浏览器自动化与AI Agent结合项目browser-use初探

browser-use介绍 browser-use是将您的 AI 代理连接到浏览器的最简单方式。它通过提供一个强大且简单的接口来实现 AI 代理访问网站的自动化。 GitHub地址&#xff1a;https://github.com/browser-use/browser-use。目前已经获得了27.3k颗stars&#xff0c;2.7kforks&#xff…

阿里云 DeepSeek 模型部署与使用技术评测

引言 随着人工智能技术的不断发展&#xff0c;越来越多的企业和个人开始探索如何利用深度学习模型来提升业务效率和用户体验。阿里云推出的【零门槛、轻松部署您的专属 DeepSeek 模型】解决方案为用户提供了多种便捷的部署方式&#xff0c;包括基于百炼 API 调用满血版、基于人…

第二天:工具的使用

每天上午9点左右更新一到两篇文章到专栏《Python爬虫训练营》中&#xff0c;对于爬虫有兴趣的伙伴可以订阅专栏一起学习&#xff0c;完全免费。 键盘为桨&#xff0c;代码作帆。这趟为期30天左右的Python爬虫特训即将启航&#xff0c;每日解锁新海域&#xff1a;从Requests库的…

vue项目 Axios创建拦截器

Axios 1. Axios 和 Ajax 简介2. Axios 和 Ajax 的区别3. 从 按钮 到 Axios请求后端接口的 大致顺序 1. Axios 和 Ajax 简介 Ajax&#xff08;Asynchronous JavaScript and XML&#xff09; 不是一种技术&#xff0c;而是一个编程技术概念&#xff0c;核心是通过 XMLHttpReques…

windows + visual studio 2019 使用cmake 编译构建静、动态库并调用详解

环境 windows visual studio 2019 visual studio 2019创建cmake工程 1. 静态库.lib 1.1 静态库编译生成 以下是我创建的cmake工程文件结构&#xff0c;只关注高亮文件夹部分 libout 存放编译生成的.lib文件libsrc 存放编译用的源代码和头文件CMakeLists.txt 此次编译CMak…

通过 VBA 在 Excel 中自动提取拼音首字母

在excel里面把表格里的中文提取拼音大写缩写怎么弄 在Excel中&#xff0c;如果你想提取表格中的中文字符并转换为拼音大写缩写&#xff08;即每个汉字的拼音首字母的大写形式&#xff09;&#xff0c;可以通过以下步骤来实现。这项工作可以分为两个主要部分&#xff1a; 提取拼…

通过环境变量实现多个 python 版本的自由切换以及 Conda 虚拟环境的使用教程

目录 Python 安装包的下载和安装通过环境变量的方式来切换不同的 Python 版本Pycharm 创建项目使用虚拟环境 使用虚拟环境管理工具 condaConda 教程1. **环境管理**创建虚拟环境激活虚拟环境退出虚拟环境列出所有虚拟环境删除虚拟环境导出虚拟环境配置从文件创建虚拟环境 2. **…

排序算法详解、应用对比与C语言实现

四种经典排序算法详解&#xff08;原理动图代码&#xff09; 一、排序算法的重要性 排序算法是计算机科学领域最基础的算法之一&#xff0c;在数据库索引、搜索引擎优化、大数据分析等领域有广泛应用。根据Stack Overflow 2022开发者调查&#xff0c;超过83%的面试会考察算法…

Python基于Django的微博热搜、微博舆论可视化系统(V3.0)【附源码】

博主介绍&#xff1a;✌Java老徐、7年大厂程序员经历。全网粉丝12w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;&…

网络安全ids是什么意思

1、 简述IPS和IDS的异同点&#xff1b; 入侵检测系统&#xff08;IDS&#xff09; IDS&#xff08;Intrusion Detection Systems&#xff0c;入侵检测系统&#xff09;&#xff0c;专业上讲就是依照一定的安全策略&#xff0c;对网络、系统、运行状况进行监视&#xff0c;尽可能…

JVM春招快速学习指南

1.说在前面 在Java相关岗位的春/秋招面试过程中&#xff0c;JVM的学习是必不可少的。本文主要是通过《深入理解Java虚拟机》第三版来介绍JVM的学习路线和方法&#xff0c;并对没有过JVM基础的给出阅读和学习建议&#xff0c;尽可能更加快速高效的进行JVM的学习与秋招面试的备战…

json格式,curl命令,及轻量化处理工具

一. JSON格式 JSON&#xff08;JavaScript Object Notation&#xff09; 是一种轻量级的数据交换格式。它基于一个子集的JavaScript编程语言&#xff0c;使用人类易于阅读的文本格式来存储和表示数据。尽管名字中有“JavaScript”&#xff0c;但JSON是语言无关的&#xff0c;几…

echarts 3d中国地图飞行线

一、3D中国地图 1. 一定要使用 echarts 5.0及以上的版本; 2. echarts 5.0没有内置中国地图了。点击下载 china.json&#xff1b; 3. 一共使用了四层地图。 &#xff08;1&#xff09;第一层是中国地图各省细边框和展示南海诸岛&#xff1b; &#xff08;2&#xff09;第二层是…

从 0 开始本地部署 DeepSeek:详细步骤 + 避坑指南 + 构建可视化(安装在D盘)

个人主页&#xff1a;chian-ocean 前言&#xff1a; 随着人工智能技术的迅速发展&#xff0c;大语言模型在各个行业中得到了广泛应用。DeepSeek 作为一个新兴的 AI 公司&#xff0c;凭借其高效的 AI 模型和开源的优势&#xff0c;吸引了越来越多的开发者和企业关注。为了更好地…

[AI]Mac本地部署Deepseek R1模型 — — 保姆级教程

[AI]Mac本地部署DeepSeek R1模型 — — 保姆级教程 DeepSeek R1是中国AI初创公司深度求索&#xff08;DeepSeek&#xff09;推出大模型DeepSeek-R1。 作为一款开源模型&#xff0c;R1在数学、代码、自然语言推理等任务上的性能能够比肩OpenAI o1模型正式版&#xff0c;并采用MI…

Linux(socket网络编程)TCP连接

Linux&#xff08;socket网络编程&#xff09;TCP连接 基础文件目录函数系统进程控制函数fork()exec系列函数void abort(void)void assert(int expression)void exit(int status)void _exit(int status)int atexit(void (*func)(void))int on_exit(void (*function)(int,void*)…

408-数据结构

数据结构在学什么&#xff1f; 1.用代码把问题信息化 2.用计算机处理信息 ch1 数据&#xff1a;数据是信息的载体&#xff0c;是描述客观事物属性的数、字符及所有能输入到计算机中并被计算机程序识别和处理的符号的集合。数据是计算机程序加工的原料。 ch2 //假设线性表…