Kafka的消息存储机制

前面咱们简单讲了K啊开发入门相关的概念、架构、特点以及安装启动。 今天咱们来说一下它的消息存储机制。

前言:

Kafka通过将消息持久化到磁盘上的日志文件来实现高吞吐量的消息传递。

这种存储机制使得Kafka能够处理大量的消息,并保证消息的可靠性。

1、消息存储机制概述:

1.1 分区与副本:

Kafka将每个主题划分为一个或多个分区,每个分区可以有多个副本。分区和副本的概念为Kafka提供了水平扩展和故障容错的能力。

1.2 消息日志:

Kafka的消息存储机制基于消息日志的概念。消息被追加到一个或多个分区的日志文件中,每个分区都有一个单独的日志文件,其中的消息按顺序存储。

1.2.1 消息发送

每当往某个Topic发送数据时,数据会被hash到不同的partition,这些partition位于不同的集群节点上,所以每个消息都会被记录一个offset消息号,随着消息的增加逐渐增加,这个offset也会递增,同时,每个消息会有一个编号,就是offset号。消费者通过这个offset号去查询读取这个消息。

  • 发送消息流程

    1. 首先获取topic的所有Patition

    2. 如果客户端不指定Patition,也没有指定Key的话,使用自增长的数字取余数的方式实现指定的Partition。这样Kafka将平均的向Partition中生产数据。

    3. 如果想要控制发送的partition,则有两种方式,一种是指定partition,另一种就是根据Key自己写算法。继承Partitioner接口,实现其partition方法。

alt
  • 消息消费

    消费者有消费者族群的概念,当生产者将数据发布到topic时,消费者通过pull的方式,定期从服务器拉取数据,当然在pull数据的时候,,服务器会告诉consumer可消费的消息offset。

alt

2、源码解析与技术细节:

2.1 日志文件格式:

Kafka使用一种特殊的文件格式来存储消息日志,该格式称为“分段的日志(segmented log)”。

alt 根据你的需求,这里给出一个简单的针对Kafka日志文件源码解析的示例:

Kafka日志文件的源码实现位于Kafka项目的core模块中,主要包括以下几个关键类和接口:

  1. Log类:代表一个分区的日志文件,它负责对消息的追加、读取和索引等操作。在Log类中,核心的数据结构是Segment,它表示一个日志分段。

  2. Segment类:代表日志文件中的一个分段,是Kafka用于存储消息的基本单元。每个分段都有一个起始偏移量和一个结束偏移量,用于定位消息的位置。分段由多个消息组成,按照消息的追加顺序顺序存储。

  3. OffsetIndex类:用于支持高效的偏移量查找。Kafka在每个分段中维护一个偏移量索引,使得可以通过偏移量快速定位到消息的物理位置。

  4. OffsetPosition类:表示一个偏移量在日志文件中的位置信息,包括分段文件名、消息在文件中的位置和消息的大小等信息。

2.2 消息追加与索引:

Kafka使用追加写的方式将消息写入日志文件,并使用索引结构来提供高效地消息检索。我们将通过源码解析,详细讲解消息追加和索引的实现原理及相关技术细节。

下面是一个简化的示例,包括消息的追加、读取和索引等操作:

// Log类的部分关键源码
class Log(dir: File, config: LogConfig{
  // 初始化日志目录
  private val logDir = CoreUtils.createDirectory(dir)

  // 初始化日志片段
  private val segments: mutable.Map[LongLogSegment] = loadSegments()

  // 向日志中追加消息
  def append(messages: Seq[Message]): LogAppendInfo = {
    // ...
    // 将消息追加到当前活跃的日志片段中
    val currentSegment = segments(activeSegmentIndex)
    currentSegment.append(messages)
    // ...
  }

  // 从日志中读取消息
  def read(offset: Long, maxLength: Int): FetchDataInfo = {
    // ...
    // 根据偏移量找到对应的分段文件
    val segment = segments.floorEntry(offset).getValue
    segment.read(offset, maxLength)
    // ...
  }

  // 根据配置删除老旧的日志片段
  def deleteOldSegments(): Unit = {
    // ...
    // 删除所有小于日志保留大小的分段
    val deletableSegments = segments.filter(segment => segment.getSize <= config.retentionSize)
    deletableSegments.foreach {
      case (_, segment) => segment.delete()
    }
    // ...
  }

  // 加载已存在的日志片段
  private def loadSegments(): mutable.Map[LongLogSegment] = {
    // ...
    // 遍历日志目录下的所有分段文件,加载到内存中
    val segments = mutable.Map[LongLogSegment]()
    val segmentFiles = logDir.listFiles.filter(_.isFile).sortBy(_.getName)
    segmentFiles.foreach { file =>
      // 解析文件名中的偏移量
      val offset = parseOffset(file)
      // 创建并加载分段
      val segment = new LogSegment(file, offset)
      segment.load()
      // 添加到分段列表中
      segments.put(offset, segment)
    }
    // ...
    segments
  }
}

2.3 日志压缩:

在Kafka中,可以通过启用日志压缩来减小存储空间的占用和网络传输的开销。Kafka支持多种压缩算法,包括Gzip、Snappy和LZ4等。下面是一个简单的步骤,说明如何在Kafka中启用日志压缩:

  1. 在Kafka服务器配置文件中,找到以下配置项:

    compression.type = producer

    将该配置项的值设置为所需的压缩算法,例如:

    compression.type = gzip
  2. 如果你的Kafka集群有多个副本(replica),你还需要在Kafka服务器配置文件中为每个副本设置以下配置项:

    min.insync.replicas = 2

    该配置项指定了进行压缩的最小副本数,确保至少有指定数量的副本处于同步状态。这是为了防止数据丢失,在进行日志压缩时仍然能够保持高可靠性。

  3. 重启Kafka服务器,以使配置生效。

在启用日志压缩后,Kafka将会自动对生产者发送的消息进行压缩,并在消费者读取消息时自动解压缩。这样可以显著减小消息的存储空间和网络传输开销,提高系统的性能和效率。

需要注意的是,在启用日志压缩后,读写数据的性能会受到一些影响,因为压缩和解压缩需要一定的计算资源。因此,在选择压缩算法和配置压缩参数时,需要权衡存储空间的节省和性能的需求。

此外,还有一种在Kafka中压缩日志的方法是使用外部工具(如Hadoop的hadoop-archive-logs命令),先将日志文件打包成压缩文件(如tar.gz),然后再进行存储。这种方式需要额外的步骤和工具,并且不支持实时的压缩和解压缩。因此,如果需要实时的压缩和解压缩功能,建议使用Kafka内置的日志压缩功能。

3、存储性能优化:

优化Kafka存储性能可以提高消息的写入和读取速度,以及减少存储占用。下面是一些常见的Kafka存储性能优化策略建议:

  1. 批量发送:通过将多条消息合并为一个批次进行发送,可以减少网络传输开销和降低磁盘IO。在生产者端,可以设置batch.size参数来调整批次大小。较大的批次大小可以提高吞吐量,但可能会增加延迟。在消费者端,可以使用fetch.min.bytes参数来配置批量拉取的最小字节数,默认为1字节。

  2. 合理的副本因子:Kafka的消息是以副本的形式存储在不同的节点上。通过合理配置副本因子,可以在保证消息的可靠性的同时,提高写入性能。较小的副本因子可以减少副本间的同步开销。如有必要,可以将min.insync.replicas参数设置为小于副本因子的值。但同时要注意,较小的副本因子可能会增加消息的丢失风险。

  3. 启用压缩:Kafka支持对消息进行压缩,在减小存储占用和网络传输开销方面具有很大优势。可以配置生产者的compression.type参数来启用压缩功能,并选择合适的压缩算法(如Gzip、Snappy或LZ4)。压缩会增加一些额外的CPU开销,但通常能在存储和传输方面带来明显的性能收益。

  4. SSD存储:Kafka使用大量的磁盘IO,因此使用固态硬盘(SSD)可以显著提高性能。SSD具有更低的读写延迟和更高的吞吐量,适合处理大量的随机读写操作。

  5. 分区和副本的平衡:合理设置分区和副本的数量,可以提高负载均衡和并行处理能力。如果某个分区或副本的读写速度较慢,可以考虑增加其数量。

  6. 优化日志清理:Kafka会定期清理日志段文件来释放磁盘空间。通过调整log.retention.hourslog.retention.bytes等参数,可以控制日志的保留时间和大小。合理设置这些参数可以避免过早的数据清理和降低磁盘压力。

  7. 确保足够的磁盘带宽:Kafka的存储性能受限于磁盘带宽。确保磁盘子系统具有足够的带宽和IO吞吐量,可以避免磁盘成为性能瓶颈。

以上是一些常见的Kafka存储性能优化策略,根据实际情况和需求,可以选择适合的优化方法,并进行配置和调整。同时,定期监控系统性能并进行性能测试,可以帮助发现潜在的性能问题并进行进一步优化。

顶尖架构师栈

关注回复关键字

【C01】超10G后端学习面试资源

【IDEA】最新IDEA激活工具和码及教程

【JetBrains软件名】 最新软件激活工具和码及教程

工具&码&教程

本文由 mdnice 多平台发布

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/140691.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue重修003

文章目录 版权声明day03一、今日目标1.生命周期2.综合案例-小黑记账清单3.工程化开发入门4.综合案例-小兔仙首页 二、Vue生命周期三、Vue生命周期钩子四、生命周期钩子小案例1.在created中发送数据2.在mounted中获取焦点 五、案例-小黑记账清单1.需求图示&#xff1a;2.需求分析…

ChatGPT批量写作文章软件

什么是ChatGPT批量写作文章。简单来说&#xff0c;它是一种使用ChatGPT技术的方法&#xff0c;可以帮助您批量生成各种类型的文章和内容。无论您是需要新闻报道、博客文章、产品描述、社交媒体帖子还是其他类型的内容&#xff0c;ChatGPT都能满足您的需求。它可以在极短的时间内…

探索 GO 项目依赖包管理与Go Module常规操作

探索 GO 项目依赖包管理与Go Module常规操作 文章目录 探索 GO 项目依赖包管理与Go Module常规操作一.Go 构建模式的演变1.1 GOPATH &#xff08;初版&#xff09;1.1.1 go get 1.2 vendor 机制&#xff08;中版&#xff09;1.3 Go Module&#xff08;最新版&#xff09; 二.创…

C语言 cortex-A7核UART总线实验

一、C 1&#xff09;uart4.h #ifndef __UART4_H__ #define __UART4_H__ #include "stm32mp1xx_rcc.h" #include "stm32mp1xx_gpio.h" #include "stm32mp1xx_uart.h&quo…

4k、VR与万兆光网

“全光万兆”对VR意义重大。 pico4的分辨率 PICO 4 的单眼分辨率是 2160 2160&#xff0c;整体分辨率高达 4320 2160。这是一款高性能的 VR 一体机&#xff0c;采用了 2.56 英寸的 Fast-LCD 屏幕&#xff0c;最高可实现 90Hz 刷新率&#xff0c;还有 1200 PPI 和 20.6 PPD 的…

基于海康Ehome/ISUP接入到LiveNVR实现海康摄像头、录像机视频统一汇聚,做到物联网无插件直播回放和控制

LiveNVR支持海康NVR摄像头通EHOME接入ISUP接入LiveNVR分发视频流或是转GB28181 1、海康 ISUP 接入配置2、海康设备接入2.1、海康EHOME接入配置示例2.2、海康ISUP接入配置示例 3、通道配置3.1、直播流接入类型 海康ISUP3.2、海康 ISUP 设备ID3.3、启用保存3.4、接入成功 4、相关…

Mac磁盘空间满了怎么办?Mac如何清理磁盘空间

你是不是发现你的Mac电脑存储越来越满&#xff0c;甚至操作系统本身就占了100多G的空间&#xff1f;这不仅影响了电脑的性能&#xff0c;而且也让你无法存储更多的重要文件和软件。别担心&#xff0c;今天这篇文章将告诉你如何清除多余的文件&#xff0c;让你的Mac重获新生。 一…

Nginx的location作用

location Nginx 的 locaiton 作⽤是根据⽤户请求的 URI 不同&#xff0c;来执行不同的应用。针对用户请求的网站URL 进行匹配&#xff0c;匹配成功后进行对应的操作。 location [ | ~| ~* | ^~ ] url {#指定对应的动作 } 正则表达式解释 匹配符 匹配规则 优先级 精确匹配 1…

数据结构——二叉树层序遍历

链式二叉树的建立 前言一、层序遍历的概念和实现二、判断二叉树是否是完全二叉树总结 前言 来喽来喽~ 二叉树的层序遍历来喽~ 层序遍历那是相当有趣滴&#xff01; 我的朋友&#xff0c;请不要迷惘&#xff0c;你要记住&#xff0c;你终有鲲鹏一日&#xff01; 加油吧&#xf…

详解MySQL存储引擎

前言: 📕作者简介:热爱编程的小七,致力于C、Java、Python等多编程语言,热爱编程和长板的运动少年! 📘相关专栏Java基础语法,JavaEE初阶,数据库,数据结构和算法系列等,大家有兴趣的可以看一看。 😇😇😇有兴趣的话关注博主一起学习,一起进步吧! 一、MySQL存…

修改vscode底部栏背景和字体颜色

修改vscode底部栏背景和字体颜色 如图&#xff1a; 首先打开齿轮&#xff0c;打开设置搜索workbench.colorCustomizations,然后点击编辑setting.json修改setting.json内内容 "workbench.colorCustomizations": {"statusBar.foreground": "#FFFFFF…

5G通信与蜂窝模组之间的关系

5G通信是第五代移动通信技术的简称&#xff0c;它代表了一种新一代的无线通信技术标准。5G通信的主要目标是提供更高的数据传输速度、更低的延迟、更大的网络容量以及更可靠的连接&#xff0c;以支持各种新兴应用和服务&#xff0c;包括高清视频流、虚拟现实、物联网&#xff0…

安全生产一张图 安全生产三维地理信息平台

一、 建设目标 易图讯科技是一家专业从事大数据、移动互联网、物联网、三维GIS、AI系统研发&#xff0c;开发了三维电子沙盘、AI三维电子沙盘、WEB三维地球、移动端三维地球、数字武装三维电子沙盘、智慧动员三维电子沙盘、智慧公安三维电子沙盘、智慧安监三维电子沙盘、森林防…

【动手学深度学习-Pytorch版】序列到序列的学习(包含NLP常用的Mask技巧)

序言 这一节是对于“编码器-解码器”模型的实际应用&#xff0c;编码器和解码器架构可以使用长度可变的序列作为输入&#xff0c;并将其转换为固定形状的隐状态&#xff08;编码器实现&#xff09;。本小节将使用“fra-eng”数据集&#xff08;这也是《动手学习深度学习-Pytor…

linux 安装 wordpress

文章目录 linux 安装 wordpress1. wordpress 简介2. wordpress功能和特点3. 部署要求4. 环境搭建4.1 部署 nginx4.1.1 新增配置文件 4.2 部署 PHP74.2.1 查看当前版本4.2.2 YUM 安装 PHP74.2.3 查看 PHP 版本4.2.4 启动PHP-FPM4.2.5 修改配置文件4.2.6 重启服务 4.3 部署 mysql…

IDEA下使用Spring MVC

<?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org/POM/4.0.0 http://ma…

【Git】轻松学会 Git:深入理解 Git 的基本操作

文章目录 前言一、创建 Git 本地仓库1.1 什么是仓库1.2 创建本地仓库1.3 .git 目录结构 二、配置 Git三、认识 Git 的工作区、暂存区和版本库3.1 什么是 Git 的工作区、暂存区和版本库3.2 工作区、暂存区和版本库之间的关系 四、添加文件4.1 添加文件到暂存区和版本库中的命令4…

设计模式之备忘录模式

文章目录 游戏角色状态恢复问题传统方案解决游戏角色恢复传统的方式的问题分析备忘录模式基本介绍游戏角色恢复状态实例备忘录模式的注意事项和细节 游戏角色状态恢复问题 游戏角色有攻击力和防御力&#xff0c;在大战 Boss 前保存自身的状态(攻击力和防御力)&#xff0c;当大…

操作系统权限提升(二十八)之数据库提权-SQL Server 数据库安装

SQL Server 数据库安装 SQL Server介绍 SQL Server 是Microsoft 公司推出的关系型数据库管理系统。具有使用方便可伸缩性好与相关软件集成程度高等优点,可跨越从运行Microsoft Windows 98 的膝上型电脑到运行Microsoft Windows 2012 的大型多处理器的服务器等多种平台使用。…

ultraEdit正则匹配多行(xml用)

在ultraEdit中&#xff0c;我想选取<channel到</channel>之间的多行&#xff08;进行删除&#xff09;。在perl模式下&#xff0c;命令为“<channel[\s\S]?</channel>”。下面是xml文件&#xff1a; <!--This XML file does not appear to have any sty…