Kafka 为什么这么快?

Kafka 是一款性能非常优秀的消息队列,每秒处理的消息体量可以达到千万级别。今天来聊一聊 Kafka 高性能背后的技术原理。

1 批量发送

Kafka 收发消息都是批量进行处理的。我们看一下 Kafka 生产者发送消息的代码:

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {TopicPartition tp = null;try {//省略前面代码Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);//把消息追加到之前缓存的这一批消息上RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs);//积累到设置的缓存大小,则发送出去if (result.batchIsFull || result.newBatchCreated) {log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);this.sender.wakeup();}return result.future;// handling exceptions and record the errors;// for API exceptions return them in the future,// for other exceptions throw directly} catch /**省略 catch 代码*/
}

从代码中可以看到,生产者调用 doSend 方法后,并不会直接把消息发送出去,而是把消息缓存起来,缓存消息量达到配置的批量大小后,才会发送出去。

注意:从上面 accumulator.append 代码可以看到,一批消息属于同一个 topic 下面的同一个 partition。

Broker 收到消息后,并不会把批量消息解析成单条消息后落盘,而是作为批量消息进行落盘,同时也会把批量消息直接同步给其他副本。

消费者拉取消息,也不会按照单条进行拉取,而是按照批量进行拉取,拉取到一批消息后,再解析成单条消息进行消费。

使用批量收发消息,减轻了客户端和 Broker 的交互次数,提升了 Broker 处理能力。

2 消息压缩

如果消息体比较大,Kafka 消息吞吐量要达到千万级别,网卡支持的网络传输带宽会是一个瓶颈。Kafka 的解决方案是消息压缩。发送消息时,如果增加参数 compression.type,就可以开启消息压缩:

public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//开启消息压缩props.put("compression.type", "gzip");Producer<String, String> producer = new KafkaProducer<>(props);ProducerRecord<String, String> record = new ProducerRecord<>("my_topic", "key1", "value1");producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {logger.error("sending message error: ", e);} else {logger.info("sending message successful, Offset: ", metadata.offset());}}});producer.close();
}

如果 compression.type 的值设置为 none,则不开启压缩。那消息是在什么时候进行压缩呢?前面提到过,生产者缓存一批消息后才会发送,在发送这批消息之前就会进行压缩,代码如下:

public RecordAppendResult append(TopicPartition tp,long timestamp,byte[] key,byte[] value,Header[] headers,Callback callback,long maxTimeToBlock) throws InterruptedException {// ...try {// ...buffer = free.allocate(size, maxTimeToBlock);synchronized (dq) {//...RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);if (appendResult != null) {// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...return appendResult;}//这批消息缓存已满,这里进行压缩MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));dq.addLast(batch);incomplete.add(batch);// Don't deallocate this buffer in the finally block as it's being used in the record batchbuffer = null;return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);}} finally {if (buffer != null)free.deallocate(buffer);appendsInProgress.decrementAndGet();}
}

上面的 recordsBuilder 方法最终调用了下面 MemoryRecordsBuilder 的构造方法。

public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,byte magic,CompressionType compressionType,TimestampType timestampType,long baseOffset,long logAppendTime,long producerId,short producerEpoch,int baseSequence,boolean isTransactional,boolean isControlBatch,int partitionLeaderEpoch,int writeLimit) {//省略其他代码this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic));
}

上面的 wrapForOutput 方法会根据配置的压缩算法进行压缩或者选择不压缩。目前 Kafka 支持的压缩算法包括:gzip、snappy、lz4,从 2.1.0 版本开始,Kafka 支持 Zstandard 算法。

在 Broker 端,会解压 header 做一些校验,但不会解压消息体。消息体的解压是在消费端,消费者拉取到一批消息后,首先会进行解压,然后进行消息处理。

因为压缩和解压都是耗费 CPU 的操作,所以在开启消息压缩时,也要考虑生产者和消费者的 CPU 资源情况。

有了消息批量收集和压缩,kafka 生产者发送消息的过程如下图:

图片

3 磁盘顺序读写

顺序读写省去了寻址的时间,只要一次寻址,就可以连续读写。

在固态硬盘上,顺序读写的性能是随机读写的好几倍。而在机械硬盘上,寻址时需要移动磁头,这个机械运动会花费很多时间,因此机械硬盘的顺序读写性能是随机读写的几十倍。

Kafka 的 Broker 在写消息数据时,首先为每个 Partition 创建一个文件,然后把数据顺序地追加到该文件对应的磁盘空间中,如果这个文件写满了,就再创建一个新文件继续追加写。这样大大减少了寻址时间,提高了读写性能。

4 PageCache

在 Linux 系统中,所有文件 IO 操作都要通过 PageCache,PageCache 是磁盘文件在内存中建立的缓存。当应用程序读写文件时,并不会直接读写磁盘上的文件,而是操作 PageCache。

图片

应用程序写文件时,都先会把数据写入 PageCache,然后操作系统定期地将 PageCache 的数据写到磁盘上。如下图:

图片

而应用程序在读取文件数据时,首先会判断数据是否在 PageCache 中,如果在则直接读取,如果不在,则读取磁盘,并且将数据缓存到 PageCache。

图片

Kafka 充分利用了 PageCache 的优势,当生产者生产消息的速率和消费者消费消息的速率差不多时,Kafka 基本可以不用落盘就能完成消息的传输。

5 零拷贝

Kafka Broker 将消息发送给消费端时,即使命中了 PageCache,也需要将 PageCache 中的数据先复制到应用程序的内存空间,然后从应用程序的内存空间复制到 Socket 缓存区,将数据发送出去。如下图:

图片

Kafka 采用了零拷贝技术把数据直接从 PageCache 复制到 Socket 缓冲区中,这样数据不用复制到用户态的内存空间,同时 DMA 控制器直接完成数据复制,不需要 CPU 参与。如下图:

图片

Java 零拷贝技术采用 FileChannel.transferTo() 方法,底层调用了 sendfile 方法。

6 mmap

Kafka 的日志文件分为数据文件(.log)和索引文件(.index),Kafka 为了提高索引文件的读取性能,对索引文件采用了 mmap 内存映射,将索引文件映射到进程的内存空间,这样读取索引文件就不需要从磁盘进行读取。如下图:

图片

7 总结

本文介绍了 Kafka 实现高性能用到的关键技术,这些技术可以为我们学习和工作提供参考。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/429510.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PowerMill 2025简体中文版百度云资源分享下载

如大家所了解的&#xff0c;PowerMill是一款专业的CAM&#xff08;计算机辅助制造&#xff09;软件。主要用于加工行业&#xff0c;可以帮助用户进行高效、精准的加工工艺设计和数控编程&#xff0c;以达到生产部件的高精度和高质量。 对于初次接触的小伙伴来说&#xff0c;目…

GAMES101(15节)

Irradiance辐射度量学 辐射度量学在渲染领域&#xff0c;可以帮助理解基于物理的光照模型 radiant energy辐射能量Q&#xff0c;累计总能量&#xff08;单位J joule焦耳&#xff09;&#xff0c;就像太阳能板&#xff0c;光照时间越长接收能量越多&#xff0c;收到的能量总和…

前后端分离,使用MOCK进行数据模拟开发,让前端攻城师独立于后端进行开发

mock是什么 Mock生成随机数据,拦截Ajax 请求&#xff0c;前后端分离&#xff0c;让前端攻城师独立于后端进行开发。 增加单元测试的真实性 通过随机数据,模拟各种场景。 在实际开发过程中&#xff0c;前端是通过axios来请求数据的&#xff0c;很多时候前端开发者就是通过写固定…

浅谈Spring Cloud:OpenFeign

RestTemplate 方式调用存在的问题&#xff1a; String url "http://userservice/user/" order.getUserId(); User user restTemplate.getForObject(url, User.class); 这是通过URL地址来访问的。但是&#xff1a; 代码可读性差&#xff0c;编程体验不统一参数复…

Lucene详解介绍以及底层原理说明

文章目录 什么是Lucene?示意图Lucene 的使用场景&#xff1a;Lucene 的生态系统&#xff1a; 相关概念1. **Document&#xff08;文档&#xff09;**2. **Field&#xff08;字段&#xff09;**3. **Analyzer&#xff08;分析器&#xff09;**4. **Tokenizer&#xff08;分词器…

前端基础知识+算法(一)

文章目录 算法二分查找条件注意方式基本原理左闭右闭正向写法 左闭右开正向写法 前端基础知识定时器及清除盒子垂直水平居中的方式垂直水平1.flex布局2.grid布局3.定位对于块级元素 解决高度塌陷的方式1.给父元素一个固定的高度2.给父元素添加属性 overflow: hidden;3.在子元素…

深度学习-从零基础快速入门到项目实践,这本书上市了!!!

此书地址&#xff1a; 《【2024新书】深度学习 从零基础快速入门到项目实践 文青山 跟我一起学人工智能 机器学习算法原理代码实现教程 深度学习项目分析 深度学习 从零基础快速入门到项目实践》【摘要 书评 试读】- 京东图书 除深度学习外我还写了一本软件测试书。我大概是国…

[Excel VBA办公]如何使用VBA批量删除空行

在处理Excel数据时&#xff0c;空行可能会干扰数据分析和展示。以下是一个VBA代码示例&#xff0c;帮助你批量删除工作表中的空行。 1. 代码说明 此代码将遍历指定工作表&#xff0c;删除所有空行&#xff0c;确保数据整洁。 2. VBA代码 删除sheet1的空行 Sub DeleteEmptyRow…

VMware虚拟机因磁盘空间不足黑屏无法登录

在虚拟机里存储了一些文件之后&#xff0c;再打开发现进不去了&#xff0c;只有光标一直在左上角&#xff0c;登录的框都是黑的&#xff0c;具体如下&#xff1a; 明明知道登录框的存在却怎么也触碰不到它T_T &#xff0c;先说解决方法&#xff1a; 产生这个问题的原因是因为磁…

大数据Flink(一百二十一):Flink CDC基本介绍

文章目录 Flink CDC基本介绍 一、什么是CDC 二、CDC的实现机制 三、​​​​​​​​​​​​​​传统 CDC ETL 分析 四、​​​​​​​​​​​​​​基于 Flink CDC 的 ETL 分析 五、​​​​​​​​​​​​​​什么是 Flink CDC 六、​​​​​​​​​​​​​​…

OpenCV特征检测(10)检测图像中直线的函数HoughLinesP()的使用

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 在二值图像中使用概率霍夫变换查找线段。 该函数实现了用于直线检测的概率霍夫变换算法&#xff0c;该算法在文献 181中有所描述。 HoughLines…

go webapi上传文件

一、导入依赖 import "net/http" 我这里用到了Guid所以安装依赖 go get github.com/google/uuid 二、main.go package mainimport ("fmt""github.com/jmoiron/sqlx""github.com/tealeg/xlsx""log""path/filepath&q…

Cpp类和对象(中续)(5)

文章目录 前言一、赋值运算符重载运算符重载赋值运算符重载赋值运算符不可重载为全局函数前置和后置的重载 二、const修饰成员函数三、取地址及const取地址操作符重载四、日期类的实现构造函数日期 天数日期 天数日期 - 天数日期 - 天数日期类的大小比较日期类 > 日期类日…

【CSS in Depth 2 精译_036】5.6 Grid 网格布局中与对齐相关的属性 + 5.7本章小结

当前内容所在位置&#xff08;可进入专栏查看其他译好的章节内容&#xff09; 第一章 层叠、优先级与继承&#xff08;已完结&#xff09; 1.1 层叠1.2 继承1.3 特殊值1.4 简写属性1.5 CSS 渐进式增强技术1.6 本章小结 第二章 相对单位&#xff08;已完结&#xff09; 2.1 相对…

桶排序和计数排序(非比较排序算法)

桶排序 桶排序是一种基于分配的排序算法&#xff0c;特别适合用来排序均匀分布的数据。它的基本思想是将输入的数据分到有限数量的桶里&#xff0c;然后对每个桶内的数据分别进行排序&#xff0c;最后再将各个桶内的数据合并得到最终的排序结果。(通常用于浮点数&#xff0c;因…

Go-知识-定时器

Go-知识-定时器 1. 介绍2. Timer使用场景2.1 设定超时时间2.2 延迟执行某个方法 3. Timer 对外接口3.1 创建定时器3.2 停止定时器3.3 重置定时器3.4 After3.5 AfterFunc 4. Timer 的实现原理4.1 Timer数据结构4.1.1 Timer4.1.2 runtimeTimer 4.2 Timer 实现原理4.2.1 创建Timer…

【Godot4.3】基于状态切换的游戏元素概论

提示 本文的设想性质比较大,只是探讨一种设计思路。完全理论阶段&#xff0c;不可行就当是闹了个笑话O(∩_∩)O哈哈~但很符合我瞎搞的气质。 概述 一些游戏元素&#xff0c;其实是拥有多个状态的。比如一个宝箱&#xff0c;有打开和关闭两个状态。那么只需要设定两个状态的图…

并发编程多线程

1.线程和进程的区别&#xff1f; 进程是正在运行程序的实例&#xff0c;进程中包含了线程&#xff0c;每个线程执行不同的任务不同的进程使用不同的内存空间&#xff0c;在当前进程下的所有线程可以共享内存空间线程更轻量&#xff0c;线程上下文切换成本一般上要比进程上下文…

axure的下载,激活,汉化全过程,多图

1.前言 下载地址&#xff1a;https://pan.baidu.com/s/12xo1mJer2hmBK7QrYM5v-Q?pwd0107#list/path%2Fcsdn%E5%85%B1%E4%BA%AB%E6%96%87%E4%BB%B6 源文章&#xff1a;https://blog.csdn.net/iwanttostudyc/article/details/123773796?ops_request_misc%257B%2522request%25…

STM32 单片机最小系统全解析

STM32 单片机最小系统全解析 本文详细介绍了 STM32 单片机最小系统&#xff0c;包括其各个组成部分及设计要点与注意事项。STM32 最小系统在嵌入式开发中至关重要&#xff0c;由电源、时钟、复位、调试接口和启动电路等组成。 在电源电路方面&#xff0c;采用 3.3V 直流电源供…