【《Kafka 入门指南:从零基础到精通》】

前言:
💞💞大家好,我是书生♡,本阶段和大家一起分享和探索KAFKA,本篇文章主要讲述了:消息队列的基础知识,KAFKA消息队列等等。欢迎大家一起探索讨论!!!
💞💞代码是你的画笔,创新是你的画布,用它们绘出属于你的精彩世界,不断挑战,无限可能!

个人主页⭐: 书生♡
gitee主页🙋‍♂:闲客
专栏主页💞:大数据开发
博客领域💥:大数据开发,java编程,前端,算法,Python
写作风格💞:超前知识点,干货,思路讲解,通俗易懂
支持博主💖:关注⭐,点赞、收藏⭐、留言💬

目录

  • 1. 消息队列
    • 1.1 消息队列概念
    • 1.2 消息队列的主要特点
    • 1.3 消息队列应用场景
    • 1.4 消息队列的模式
      • 1.4.1 点对点模式
      • 1.4.2 发布与订阅
      • 1.4.3 总结
    • 1.5 常用消息队列比较
  • 2. Kafka消息队列
    • 2.1 Kafka的概念
    • 2.2 Kafka 的特性
    • 2.3 Kafka 的架构
  • 3. kafka的启动
  • 4. Kafka的使用
    • 4.1 主题操作
      • 4.1.1 创建主题
      • 4.1.2 查看主题
      • 4.1.3 增加主题分区数
      • 4.1.4 删除主题
    • 4.2 读取写入消息
    • 4.3 kafka tool工具
    • 4.3.1 使用Kafka Tool
      • 4.3.2 使用工具写入数据
  • 5. spark操作kafka
    • 5.1 读取消息数据
    • 5.2 写入数据
      • 5.2.1 粘性写入策略
      • 5.2.2 指定partition写入策略
      • 5.2.3 指定key写入策略
    • 5.3 读取写入演示
  • 6. python操作kafka
    • 6.1 读取消息数据
    • 6.2 写入消息数据
    • 6.3 演示操作
  • 7. kafka消息数据存储机制
    • 7.1 kafka消息数据存储介绍
      • Kafka 存储架构概述
    • 7.2 存储文件
    • 7.3 修改段文件的配置
  • 8. kafka读写流程
    • 8.1 写入消息流程
    • 8.2 读取消息流程

1. 消息队列

1.1 消息队列概念

消息队列是一种软件系统,它允许在两个或多个进程之间发送消息。这种通信模式非常适合分布式系统,因为它提供了进程间通信的解耦机制,使得生产者和消费者可以独立地进行扩展而不影响彼此。

  • 定义: 存储计算机之间传递的消息的容器(消息队列是用来存储传递的消息)
  • 作用: 存储数据, 防止数据丢失

在这里插入图片描述

1.2 消息队列的主要特点

消息队列的主要特点包括:

  1. 异步通信:消息队列允许一个程序向另一个程序发送消息,而不需要等待接收方处理完毕。
    • 同步: 同时执行, 应用B完成上一次指令后应用A再次发送指令
    • 异步: 应用之间不需要进行等待
  2. 解耦:发送者和接收者并不需要直接相互了解。它们只需要知道消息队列的存在。(解除应用之间的耦合性(应用之间的直接关联关系))
  3. 可靠性:消息队列通常会保证消息至少被传递一次,并且在消息被成功处理之前不会丢失。
  4. 可扩展性:由于消息队列可以处理大量并发的消息,因此很容易扩展系统以处理更多的负载。

常见的消息队列技术包括:

  • AMQP (Advanced Message Queuing Protocol):一种提供统一消息服务的应用层协议。
  • RabbitMQ:基于AMQP的一个消息中间件。
  • Apache Kafka:最初由LinkedIn开发,现在是Apache项目的一部分,主要用于构建实时数据管道和流应用。
  • Amazon Simple Notification Service (SNS)Simple Queue Service (SQS):AWS提供的消息服务。

1.3 消息队列应用场景

  • 解耦

    • 解除应用之间的耦合性(应用之间的直接关联关系)
      在这里插入图片描述
  • 异步

    • 同步: 同时执行, 应用B完成上一次指令后应用A再次发送指令
    • 异步: 应用之间不需要进行等待
      在这里插入图片描述
  • 削峰

    • 解决高并发问题
    • 某一刻突然产生大量数据, 应用无法立即处理完, 可以将数据存储在消息队列中
      在这里插入图片描述

1.4 消息队列的模式

消息队列是一种常用的设计模式,用于实现应用程序之间的异步通信。消息队列允许一个或多个生产者(Producer)发送消息到队列中,而一个或多个消费者(Consumer)可以从队列中取出消息并进行处理。这种模式有助于解耦系统组件,并提高系统的可扩展性和可靠性。

  1. 点对点(Point-to-Point, P2P)模式

    • 每个消息只被一个消费者消费。
    • 生产者发送消息后,消息队列负责将消息传递给一个消费者。
    • 一旦消息被消费者接收,它就会从队列中移除。
  2. 发布/订阅(Publish/Subscribe, Pub/Sub)模式

    • 生产者发布消息到一个特定的主题(Topic)。
    • 多个消费者可以订阅同一个主题。
    • 所有订阅该主题的消费者都会接收到生产者发布的消息副本。
  3. 请求/响应(Request/Response)模式

    • 一个客户端发送请求到服务器。
    • 服务器处理请求并返回响应。
    • 这种模式通常使用应答队列来处理往返通信。
  4. 广播(Broadcast)模式

    • 生产者发送消息到所有消费者。
    • 这种模式类似于 Pub/Sub,但通常不涉及订阅机制,而是所有消费者都能直接接收到消息。
  5. 扇入/扇出(Fan-In/Fan-Out)模式

    • 扇入:多个生产者将消息发送到一个队列,由一个或多个消费者处理。
    • 扇出:一个生产者将消息发送到多个队列,由不同的消费者分别处理。
  6. 优先级队列(Priority Queue)模式

    • 消息具有不同的优先级。
    • 消费者总是先处理优先级最高的消息。
  7. 工作队列(Work Queue)模式

    • 工作队列用于分发任务到多个工作者(Worker)之间。
    • 每个任务只被一个工作者处理一次。

1.4.1 点对点模式

点对点(Point-to-Point, P2P)模式:

在点对点模式下,消息被发送到队列中,每个消息只能被一个消费者接收。一旦消息被接收,它就会从队列中移除。这意味着消息是专有的,并且不会被其他消费者重复处理。

点对点模式包括三个角色:消息队列发送者(生产者)、接收者(消费者)

特点:

  • 每个消息只被一个消费者消费。
  • 每个消息只有一个接收者(Consumer),一旦被消费,消息就不再存在于消息队列中。
  • 发送者和接收者间没有依赖性,发送者发送消息之后,不管有没有接收者在运行,都不会影响到发送者下次发送消息。
  • 接收者在成功接收消息之后需要向队列应答成功,以便消息队列删除当前接收的消息。
  • 消费者可以是临时的,即消费者不需要事先存在就能接收消息。
  • 如果消费者在消息到达之前没有连接,则会丢失消息,除非使用持久化或其他机制。

消息发送者生产消息发送到queue中,然后消息接收者从queue中取出并且消费消息。消息被消费后,queue中不再有存储,所以消息接收者不可能消费到已经被消费的消息。

实际例子:
假设有一个电商网站,每当有新的订单产生时,需要通过电子邮件通知客服部门。在这个场景中,我们可以设置一个订单队列,每当有新订单时,订单系统将订单信息作为消息发送到这个队列中。客服部门的邮件服务订阅这个队列,一旦有新消息出现,就立即处理(发送邮件通知)。因为每条订单消息只需要被处理一次,所以这是一个典型的点对点模式的应用场景。

在这里插入图片描述

1.4.2 发布与订阅

发布/订阅(Publish/Subscribe, Pub/Sub)模式

在发布/订阅模式下,消息的生产者并不直接与消费者通信。相反,生产者将消息发送到一个特定的主题(Topic),而消费者则订阅这些主题。所有订阅了同一主题的消费者都会接收到生产者发送的所有消息。

发布/订阅模式包括三个角色:角色主题(Topic)、发布者(Publisher)、订阅者(Subscriber)。

特点:

  • 生产者和消费者之间是松散耦合的。
  • 消费者可以订阅多个主题。
  • 每个消息可以有多个订阅者。
  • 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。
  • 为了消费消息,订阅者需要提前订阅该角色主题,并保持在线运行。
  • 同一条消息可以被多个消费者接收。

实际例子:
假设有一个新闻聚合器应用,它需要从多个来源收集新闻并将其展示给用户。在这种情况下,不同类型的新闻源(如体育、科技、娱乐等)可以被视为不同的主题。新闻聚合器作为生产者,将新闻发送到相应的主题中。用户(消费者)可以根据自己的兴趣订阅不同的主题,比如订阅“科技”和“娱乐”主题。这样,每当有新的科技或娱乐新闻时,订阅了相应主题的用户就会接收到这些新闻。
在这里插入图片描述

1.4.3 总结

  • 点对点模式适用于消息需要被单一消费者处理的情况,例如订单处理或文件上传任务。
  • 发布/订阅模式适用于消息需要被多个消费者同时处理的情况,例如实时数据更新或新闻推送。

1.5 常用消息队列比较

在这里插入图片描述

  • 如果消息队列不是将要构建系统的重点,对消息队列功能和性能没有很高的要求,只需要一个快速上手易于维护的消息队列,建议使用 RabbitMQ
  • 如果系统使用消息队列主要场景是处理在线业务,比如在交易系统中用消息队列传递订单,需要低延迟和高稳定性,建议使用 RocketMQ
  • 如果需要处理海量的消息,像收集日志、监控信息或是埋点这类数据,或是你的应用场景大量使用了大数据、流计算相关的开源产品,那 Kafka 是最适合的消息队列。

2. Kafka消息队列

2.1 Kafka的概念

Apache Kafka 是一款高性能、分布式、基于发布/订阅的消息系统,它最初由 LinkedIn 开发,后来成为 Apache 软件基金会的一个顶级项目。Kafka 被设计用于处理大量实时数据流,它的设计目标是提供高吞吐量、低延迟的数据传输能力,并且能够处理大规模的数据流。

Kafka 的核心概念

  1. 主题(Topics)

    • 主题是 Kafka 中消息分类的一种方式。
    • 生产者将消息发送到特定的主题,而消费者则订阅这些主题来接收消息。
    • 一个主题可以有多个分区(Partitions),以支持分布式存储和处理。
  2. 分区(Partitions)

    • 每个主题可以分为多个分区,这使得数据可以分布在多个服务器上。
    • 分区有助于提高系统的吞吐量和可用性。
    • 每个分区是一个有序的消息序列。
  3. 偏移量(Offsets)

    • 偏移量是消费者跟踪其在分区中位置的方式。
    • 消费者可以使用偏移量来确定从哪个位置开始读取消息。
  4. 生产者(Producers)

    • 生产者是向 Kafka 主题发布消息的客户端。
    • 生产者可以选择将消息发布到特定的主题和分区。
  5. 消费者(Consumers)

    • 消费者是从 Kafka 主题中读取消息的客户端。
    • 消费者可以组成消费者组(Consumer Groups),以便更好地管理消息的处理。
  6. 消费者组(Consumer Groups)

    • 消费者组是一组可以一起工作的消费者。
    • 每个消费者组内的消费者可以分配不同的分区,从而实现并行处理。
    • 当消息被一个消费者组内的任意消费者消费后,其他消费者就不会再接收到这条消息。
  7. Broker

    • Broker 是 Kafka 集群中的服务器节点。
    • Kafka 集群由一个或多个 Broker 组成。
    • Broker 负责存储消息、维护元数据、处理生产者和消费者的请求。

2.2 Kafka 的特性

Kafka 的特性

  • 高吞吐量:Kafka 能够处理大量的数据流,每秒可以处理数百万条消息。
  • 低延迟:Kafka 能够实现实时数据处理,延迟极低。
  • 持久性:Kafka 使用磁盘存储消息,因此消息不会丢失。
  • 容错性:Kafka 支持复制和分区,即使某些节点失败也能保证数据完整性和服务可用性。
  • 可伸缩性:Kafka 可以很容易地在集群中添加或删除节点。

使用场景

  • 日志收集:Kafka 常用于收集和聚合来自不同数据源的日志数据。
  • 流式处理:Kafka 可以作为实时数据流处理管道的基础,例如与 Apache Storm 或 Apache Flink 结合使用。
  • 消息系统:Kafka 可以替代传统的消息队列系统,如 RabbitMQ 或 ActiveMQ。
  • 事件处理:Kafka 可以用于处理各种事件,例如用户活动、设备状态变化等。

2.3 Kafka 的架构

在这里插入图片描述

  • 发布者(producer): 将生产的消息数据存储到消息队列的角色主题
  • 订阅者(consumer): 逻辑上是一个消费者组(由多个消费者组成), 消费消息队列中的消息数据(数据处理)
    • 消费者组中的每个消费者消费同一个主题下的不同分区消息
  • 消息队列(kafka)
    • broker: 服务器代理节点, 每台服务器上的kafka就是一个broker
    • topic: 角色主题, 人为地将消息数据分类
    • partition: 分区, 角色主题下的消息数据是分区(RDD分区,HDFS分块)存储
    • Replication: 副本, 每个分区的消息数据可以有多个副本
      • leader: 领导者副本, 处理消息的读写请求, 由broker主节点选取领导者
      • follower: 追随者副本, 只是用于备份
  • zookeeper: 管理消息队列
    • 选择kafka的broker主节点(控制器)
    • 管理消息队列中的元数据(主题名称, 偏移量等)
  • 偏移量(offset)
    • 消息数据是有顺序存储, 每个消息都有一个偏移量
    • 在同一个分区中, 消息数据按顺序存储, 偏移量是从0开始
    • 不同分区的消息数据是没有顺序, 偏移量都是从0开始

3. kafka的启动

  1. 安装 Kafka
    首先需要安装 Kafka 和 ZooKeeper。ZooKeeper 是 Kafka 集群所需的协调服务。

下载和安装 下载:访问 Apache Kafka 的官方网站 (http://kafka.apache.org/downloads)
下载最新版本的 Kafka。 解压:将下载的压缩包解压到一个合适的目录中。

  1. 配置
    ZooKeeper:编辑 config/zookeeper.properties 文件来配置 ZooKeeper 的参数。
    Kafka:编辑 config/server.properties 文件来配置 Kafka 的参数,例如监听端口、日志目录等。
  2. 启动 ZooKeeper 和 Kafka
    启动 ZooKeeper:运行 zookeeper-server-start.sh config/zookeeper.properties。
    启动 Kafka:运行 kafka-server-start.sh -daemon /export/server/kafka/config/server.properties

在这里插入图片描述
停止kafka

kafka-server-stop.sh

4. Kafka的使用

4.1 主题操作

4.1.1 创建主题

kafka-topics.sh --bootstrap-server node1:9092 --create --partitions 分区数 --replication-factor 副本数 --topic 主题名

  • –bootstrap-server: 连接kakfa
  • –create: 创建命令
  • –partitions: 分区数, 不设置默认为1
  • –replication-factor: 副本数, 不设置默认为1
kafka-topics.sh --bootstrap-server node1:9092 --create --partitions 3 --replication-factor 3 --topic itcast

在这里插入图片描述

4.1.2 查看主题

list: 查看所有主题名称
–describe: 查看所有主题详情信息
–topic:查看某个具体主题详请信息

查看全部分区列表
kafka-topics.sh --bootstrap-server node1:9092 --list

# list: 查看所有主题名称
kafka-topics.sh --bootstrap-server node1:9092 --list

在这里插入图片描述

查看全部分区的描述
kafka-topics.sh --bootstrap-server node1:9092 --describe

# --describe: 查看所有主题详情信息
kafka-topics.sh --bootstrap-server node1:9092 --describe

在这里插入图片描述

查看指定分区的信息
kafka-topics.sh --bootstrap-server node1:9092 --describe --topic 主题名


# --topic:查看某个具体主题详请信息
kafka-topics.sh --bootstrap-server node1:9092 --describe --topic it

在这里插入图片描述

分区信息
在这里插入图片描述

4.1.3 增加主题分区数

只能增加分区数, 不能减少(减少分区会丢失消息数据)

创建好主题后不能修改副本数, 副本中涉及消息元数据, 不能同步元数据

  • 必须要增加副本的话, 只能重新创建主题设置好副本数, 然后将原主题消息迁移到新主题中

语法:
kafka-topics.sh --bootstrap-server node1:9092 --alter --partitions 设置的分区数 --topic 主题名

# --alter: 修改命令
kafka-topics.sh --bootstrap-server node1:9092 --alter --partitions 3 --topic it

在这里插入图片描述

4.1.4 删除主题

–delete: 删除命令
kafka-topics.sh --bootstrap-server node1:9092 --delete --topic 主题名

# --delete: 删除命令
# 也可以在每台服务器节点上通过 rm -rf 删除目录 linux指令删除
# 延迟删除: 不会立即删除目录
kafka-topics.sh --bootstrap-server node1:9092 --delete --topic it

在这里插入图片描述

4.2 读取写入消息

此操作是kafka提供的测试服务, 测试kafka读写消息功能是否正常

  • 写入数据

创建生产者应用程序, 将其他系统产生的消息数据写入到kafka中
多个生产者可以同时将消息写入到同一个主题

kafka-console-producer.sh --broker-list node1:9092 --topic 主题名

  • kafka-console-producer.sh: 生产脚本
  • –broker-list: 连接kafka
# kafka-console-producer.sh: 生产脚本
# --broker-list: 连接kafka
kafka-console-producer.sh --broker-list node1:9092 --topic it

在这里插入图片描述

  • 读取数据

创建消费者应用程序, 读取kafka中存储的消息数据进行数据处理操作
多个消费者可以同时消费同一个主题消息

kafka-console-consumer.sh --bootstrap-server node1:9092 --topic 主题名

  • kafka-console-consumer.sh: 消费脚本
  • –bootstrap-server: 连接kafka
# kafka-console-consumer.sh: 消费脚本
# --bootstrap-server: 连接kafka
kafka-console-consumer.sh --bootstrap-server node1:9092 --topic it

在这里插入图片描述

  • 数据传递

注意点:写入数据和读取不能在一个页面中,可以在一台服务器的两个客户端中,也可以在不同的服务器中
读取数据只能读取连接之后写入的数据,未连接之前写入的是读取不了的

node1上写入数据
在这里插入图片描述
查看node2(成功读取刚刚写入的数据)在这里插入图片描述

4.3 kafka tool工具

4.3.1 使用Kafka Tool

我们一系列Kafka的操作是可以使用工具的
这个工具就叫kafka tool

安装好之后
在这里插入图片描述

修改node1下的 /etc/ssh/sshd_config 配置文件

  • 将sshd_config文件中 UseDNS yes 改为 UseDNS no,并取消注释
  • 重启sshd服务 systemctl restart sshd.service
  • 在windows系统的 C:\Windows\System32\drivers\etc\hosts 文件中添加映射
192.168.88.100 node1 node1.itcast.cn
192.168.88.101 node2 node2.itcast.cn
192.168.88.102 node3 node3.itcast.cn

连接服务器
在这里插入图片描述
三台都连接,窗机node1 ,node2,node3 变为绿色就可以了
在这里插入图片描述
创建删除查看主题,点击tyopic,右键可以创建主题和刷新,
点击进入页面之后,就可以窗创建。删除和其他的操作。
在这里插入图片描述

4.3.2 使用工具写入数据

在这里插入图片描述
在这里插入图片描述
查看数据
在这里插入图片描述
查看其他服务器
在这里插入图片描述
写入数据的方式
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

5. spark操作kafka

5.1 读取消息数据

我们使用spark.read.load(format='kafka', **options)读取数据,
**options里面是我们的配置文件

  • kafka.bootstrap.servers — kafka集群地址
  • subscribe ---- 订阅角色主题
  • startingOffsets — 起始偏移量,可选参数, 起始偏移量, -2->从第一条消息开始
  • endingOffsets ---- 结束偏移量,可选参数, 结束偏移量, -1->到最后一条消息结束(包含)

注意点:
① 左闭右开原则
② 起始偏移量不能超过结束偏移量
③ 必须要指定所有分区的偏移量
④ 起始偏移量不写, 表示从第一条消息开始;结束偏移量不写, 表示到最后一条消息结束(包含)

# 当前应用程序是消费者(订阅者)
from pyspark.sql import SparkSession# 创建sparksession对象
spark = SparkSession.builder.getOrCreate()# 读取kafka数据源的消息数据
"""
startingOffsets:可选参数, 起始偏移量, -2->从第一条消息开始
endingOffsets:可选参数, 结束偏移量, -1->到最后一条消息结束(包含)
注意点:
① 左闭右开原则
② 起始偏移量不能超过结束偏移量
③ 必须要指定所有分区的偏移量
④ 起始偏移量不写, 表示从第一条消息开始; 结束偏移量不写, 表示到最后一条消息结束(包含) 
"""
options = {# kafka集群地址: ip:port'kafka.bootstrap.servers': 'node1:9092',# 订阅角色主题'subscribe': 'itcast',# 根据偏移量获取消息数据# {主题名称:{'分区编号':偏移量}}# -2:起始偏移量, 从第一条消息开始  -1:结束偏移量, 到最后一条消息数据'startingOffsets': """{"itcast":{"0":0, "1":2, "2":-2}}""",'endingOffsets': """{"itcast":{"0":2, "1":-1, "2":-1}}"""
}
df = spark.read.load(format='kafka', **options)
df.show()
"""
key:key值, kafka中的非结构化数据是以key-value形式存储
value:消息数据, 字节bytes类型
topic:主题
partition:分区
offset:偏移量
timestamp:时间
timestampType:时间类型
+----+--------------------+------+---------+------+--------------------+-------------+
| key|               value| topic|partition|offset|           timestamp|timestampType|
+----+--------------------+------+---------+------+--------------------+-------------+
"""
# 将bytes字节类型消息数据转换成字符串类型
df2 = df.select(df['value'].cast('string'), 'topic', 'partition', 'offset')
df2.show()

key:key值, kafka中的非结构化数据是以key-value形式存储 value:消息数据, 字节bytes类型 topic:主题
partition:分区
offset:偏移量
timestamp:时间
timestampType:时间类型
±—±-------------------±-----±--------±-----±-------------------±-------- | key| value| topic|partition|offset| timestamp|timestampType|
±—±-------------------±-----±--------±-----±-------------------±--------

5.2 写入数据

5.2.1 粘性写入策略

2版本默认写入策略, 保证应用程序中的消息数据有顺序的存储到同一个分区中,第一条消息写入到哪个分区, 剩余消息都写入和第一条消息相同的分区中

  • 第一步创建df对象
# 创建sparksession对象
spark = SparkSession.builder.getOrCreate()# 创建df对象
df = spark.createDataFrame(data=[[1, '小明', 18, '男'],[2, '小红', 16, '女'],[3, '张三', 22, '男'],[4, '李四', 20, '男']], schema='id int, name string, age int, gender string')
df.show()
  • 将df的数据进行拼接,重命名为value,kafka中的消息数据是存储在value字段中的
new_df = df.select(F.concat_ws(',', 'id', 'name', 'age', 'gender').alias('value'))
new_df.show()
  • 第三步,写入到kafka中

df.write.save(format='kafka', mode='append', **options)

options = {'kafka.bootstrap.servers': 'node1:9092, node2:9092',# 指定写入的主题, 注意参数名, 和消费者的参数名不一样'topic': 'itheima'
}
new_df.write.save(format='kafka', mode='append', **options)
# 当前应用程序是生产者(发布者)
# 粘性写入策略: 默认策略, 第一条消息写入到哪个分区, 剩余消息都写入和第一条消息相同的分区中
from pyspark.sql import SparkSession
from pyspark.sql import functions as F# 创建sparksession对象
spark = SparkSession.builder.getOrCreate()# 创建df对象
df = spark.createDataFrame(data=[[1, '小明', 18, '男'],[2, '小红', 16, '女'],[3, '张三', 22, '男'],[4, '李四', 20, '男']], schema='id int, name string, age int, gender string')
df.show()# 将df的数据保存到kafka中
# 注意点: 将df的数据保存到value字段中, kafka中的消息数据是存储在value字段中的
new_df = df.select(F.concat_ws(',', 'id', 'name', 'age', 'gender').alias('value'))
new_df.show()
options = {'kafka.bootstrap.servers': 'node1:9092, node2:9092',# 指定写入的主题, 注意参数名, 和消费者的参数名不一样'topic': 'itheima'
}
new_df.write.save(format='kafka', mode='append', **options)

5.2.2 指定partition写入策略

指定partition写入策略: 创建partition字段, 字段名:partition 字段值:分区编号
根据字段的值将消息数据存储到对应的分区中, 消息数据存储分布均匀

只需要改动,df 拼接这一步骤,判断不同的值,为不同的分区编号即可

字段名一定是partition

new_df = df.select(F.concat_ws(',', 'id', 'name', 'age', 'gender').alias('value'),F.when(F.col('gender') == "男", 0).when(df['gender'] == '女', 1).otherwise(2).alias('partition'))

5.2.3 指定key写入策略

指定key写入策略: 创建key字段, 字段名:key 字段值:值可以是某列的值
hash(字段值)%分区数=结果值…余数
根据字段的值进行哈希取余计算(余数=分区编号), 将余数相同的消息数据存储到相同的分区中, 消息数据存储分布均匀

只需要改动,df 拼接这一步骤,新添加一列

字段名一定是key

new_df = df.select(F.concat_ws(',', 'id', 'name', 'age', 'gender').alias('value'),df['gender'].alias('key'))

5.3 读取写入演示

  1. 启动写入数据的程序

在这里插入图片描述
在这里插入图片描述
2. 启动读取数据程序代码
在这里插入图片描述

6. python操作kafka

6.1 读取消息数据

  • 创建消费者对象

consumer = KafkaConsumer(‘主题名’,bootstrap_servers=‘node1:9092’)

# 消费者应用程序
# 需要借助kafka-python第三方库, 需要安装 pip install kafka-python
from kafka import KafkaConsumer# 创建消费者对象
consumer = KafkaConsumer('itheima', bootstrap_servers='node1:9092')
print(consumer)
# 遍历消息,此时应用程序会一直阻塞(死循环),等待接受消息
for msg in consumer:# ConsumerRecord()类的对象print(msg)print(type(msg))# 获取对象属性, bytes字节类型print(msg.value)# 字符串类型print(msg.value.decode('utf-8'))

6.2 写入消息数据

  • 创建生产者对象

producer = KafkaProducer(bootstrap_servers=[‘node1:9092’])
producer.send(topic=‘主题名’, value=‘写入的数据’.encode(‘utf-8’))

# 生产者应用程序
# 需要借助kafka-python第三方库, 需要安装 pip install kafka-python
from kafka import KafkaProducer# 创建生产者对象
producer = KafkaProducer(bootstrap_servers=['node1:9092'])# 调用producer对象的send方法, 发送消息
# str.encode('utf-8'): 将字符串类型转换成字节bytes类型
# bytes.decode('utf-8'): 将字节bytes类型转换成字符串类型
producer.send(topic='itheima', value='hello world'.encode('utf-8'))# 关闭生产对象
producer.close()

6.3 演示操作

  • 先启动读取数据的程序
    在这里插入图片描述
  • 启动写入数据的程序
    在这里插入图片描述
  • 再次查看读取数据的控制台
    在这里插入图片描述

7. kafka消息数据存储机制

7.1 kafka消息数据存储介绍

Apache Kafka 是一个分布式流处理平台,它提供了一种高效、可靠的方式来发布和订阅消息。Kafka 的消息存储机制是其核心特性之一,下面将详细介绍 Kafka 如何存储消息。

Kafka 存储架构概述

Kafka 的消息存储在磁盘上,并以一种高效的文件格式进行组织。每条消息都归属于一个特定的主题(Topic),每个主题可以被划分为多个分区(Partition),每个分区对应一个有序的消息序列。

主题(Topic)

  • 定义:主题是 Kafka 中消息分类的逻辑单位。每个主题可以有多个分区。
  • 分区:主题中的每个分区都是一个有序的消息队列。

分区(Partition)

  • 定义:分区是物理存储的基本单位,每个分区对应一个文件夹,该文件夹下存储着一系列的消息文件。
  • 复制因子:为了提高可用性和容错能力,每个分区可以有多个副本(Replica),这些副本分布在不同的 Broker 上。

消息存储

  • 消息文件:每个分区的消息存储在一个或多个消息文件中,这些文件按照时间顺序被分成多个段(Segment)。
  • 段文件:每个段文件都有一个唯一的名称,表示其在时间轴上的位置。例如,一个段文件可能命名为 00000000000.offset,其中 00000000000 表示该段的起始偏移量。
  • 索引文件:为了快速定位消息,每个段文件都有一个对应的索引文件,它记录了消息的位置信息。

存储细节

  • 文件格式:消息文件使用二进制格式存储,以提高读写效率。
  • 偏移量:每条消息都有一个全局唯一的偏移量,用于标识消息的位置。偏移量是在分区内的唯一标识符。
  • 保留策略:Kafka 支持两种消息保留策略:基于时间的保留和基于大小的保留。可以根据需要配置保留策略,以决定何时删除旧消息。

总结

  • 高吞吐量:Kafka 的消息存储设计使其能够实现高吞吐量和低延迟。
  • 持久化:消息存储在磁盘上,即使 Kafka Broker 故障也能保证数据不丢失。
  • 容错性:通过复制因子和分区副本,Kafka 提供了高度的容错性。

7.2 存储文件

每个段文件包含【.index、.timeindex、.log】三个文件, 每个段文件默认存储数据量为1G, 超过1G会创建新的segment段文件, 存储时间为168小时

消息数据是以二进制形式存储在.log文件中,需要通过特定的kafka指令查看数据

# --files: log文件名
kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.log --print-data-log

您提到的关于 Kafka 段文件的描述是正确的。Kafka 的每个段文件确实包含 .index.timeindex.log 三个文件,并且默认情况下,每个段文件的最大大小为 1GB。当达到这个大小限制时,Kafka 会自动创建一个新的段文件。此外,您提到的存储时间为 168 小时,这通常是指基于时间的消息保留策略。

下面是对 Kafka 段文件及其相关文件的详细说明:

段文件组成
每个段文件由以下三个文件组成:

  1. .log 文件:这是主要的消息文件,包含了所有消息的数据。消息是以二进制格式存储的,这使得读写效率非常高。
  2. .index 文件:这是索引文件,记录了消息的位置信息。它允许快速定位到消息在 .log 文件中的位置。
  3. .timeindex 文件:这是时间索引文件,记录了消息的时间戳和位置信息。它允许根据时间戳快速定位消息。

段文件的命名
段文件的命名规则如下:

  • .log 文件的命名格式为:<start_offset>.log,例如 00000000000.log
  • .index 文件的命名格式为:<start_offset>.index,例如 00000000000.index
  • .timeindex 文件的命名格式为:<start_offset>.timeindex,例如 00000000000.timeindex

段文件的大小限制
默认情况下,每个段文件的最大大小为 1GB。当段文件达到这个大小时,Kafka 会自动创建一个新的段文件,并将后续写入的消息存储在这个新的段文件中。这可以通过配置 log.segment.bytes 参数来更改。

消息保留策略
Kafka 支持两种消息保留策略:

  1. 基于时间的保留:可以配置消息保留的时间长度。例如,您可以设置消息保留 168 小时(7 天),这意味着超过 168 小时的消息会被删除。
  2. 基于大小的保留:可以配置消息保留的总大小。当达到设定的大小限制时,较旧的消息会被删除。

总结

  • 高效存储:Kafka 使用 .log 文件以二进制格式高效地存储消息。
  • 快速定位.index.timeindex 文件使得根据偏移量或时间戳快速定位消息成为可能。
  • 灵活的保留策略:可以通过配置灵活地设置消息的保留时间或大小。

7.3 修改段文件的配置

配置信息存储在kafka/config/server.properties文件中

# 指定存储数据路径
log.dirs=/export/server/kafka/data
# 指定数据的保存时间 单位:小时
log.retention.hours=168
# 指定存储数据的文件最大存储空间
log.segment.bytes=1073741824

8. kafka读写流程

8.1 写入消息流程

  • 基本写入流程
    • 生产者应用程序将消息写入到leader副本中
    • leader副本将写入的消息同步给follower副本
  • 多副本消息数据写入流程
    • 生产者应用程序将消息写入到leader副本中
    • leader副本将写入的消息根据ISR列表的副本编号顺序同步给follower副本
  • 多条消息数据写入时间间隔(如何保证消息数据不丢失)
    • 每条消息写入后都是有一个时间间隔, 可以进行设置
    • ack应答机制
      • 0: 不管当前条消息是否成功写入到leader副本, 直接写入下一条消息
      • 1: 默认机制, 保证当前条消息成功写入到leader副本后, 再写入下一条消息
      • -1或all: 保证当前条消息成功写入到leader和follower副本, 再写入下一条消息

在这里插入图片描述

8.2 读取消息流程

  • 单分区读取

    • 根据偏移量读取消息数据
    • 将消费过的消息元数据保存到 __consumer__offsets 主题下
      • 主题名称, 分区编号, 偏移量
    • 当前消费者对应当前分区
  • 多分区读取

    spark操作kafka, spark会自行进行资源分配, 分区数由executor执行器创建task线程进行处理, 设置了多少个executor, 当前订阅者(消费者组)就有多少个消费者

    • 分区数=消费者数:一个消费者对应一个分区, 不能读取其他分区消息, 负载均衡
    • 分区数>消费者数:采用轮询方式, 读取完第一个分区的第一条数据后再读取第二个分区的第一条数据
    • 分区数<消费者数:部分消费者不工作, 造成资源浪费问题

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/397547.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

rocketMQ5.0事务消息实战

事务消息逻辑 docker部署容器&#xff0c;并且创建消息 dd首先我们来docker 部署rocketMQ与rocketMQDashBoard docker ps查看rocketMQ 容器名称 docker ps 进入容器内部 docker exec -it rmqnamesrv /bin/bash 创建事务消息 MeessageType: TRANSACTION sh mqadmin upda…

Linux驱动.之I2C,iic驱动层(二)

一、 Linux下IIC驱动架构 本篇只分析&#xff0c;一个整体框架。 1、首先说说&#xff0c;单片机&#xff0c;的i2c硬件接口图&#xff0c;一个i2c接口&#xff0c;通过sda和scl总线&#xff0c;外接了多个设备device&#xff0c;通过单片机&#xff0c;来控制i2c的信号发生&…

解锁数据“智能”背后的秘密

在这个被数据洪流包围的时代&#xff0c;每一秒都有无数的信息在生成、传递、分析。但你是否曾好奇&#xff0c;这些数据是如何从简单的数字、文字转化为推动社会进步、改变生活方式的“智能”力量的&#xff1f;今天&#xff0c;就让我们一起揭开数据“智能”背后的神秘面纱&a…

C语言文达学院班级管理系统-计算机毕业设计源码03499

摘 要 本文阐述了一个C语言文达学院班级管理系统的设计与实现过程。该系统充分利用ASP.NET的轻量级、灵活性和可扩展性&#xff0c;旨在为文达学院提供高效、便捷的班级管理系统。通过详细的需求分析、技术选型、系统设计、开发实现、测试与调试以及部署与上线等步骤&#xff0…

通过python管理mysql

打开防火墙端口&#xff1a; 使用 firewall-cmd 命令在防火墙的 public 区域中永久添加 TCP 端口 7500&#xff08;FRP 控制台面板端口&#xff09;、7000&#xff08;FRP 服务端端口&#xff09;以及端口范围 6000-6100&#xff08;一组客户端端口&#xff09;。这些端口是 FR…

Unity补完计划 之 动态控制TileMap

本文仅作笔记学习和分享&#xff0c;不用做任何商业用途 本文包括但不限于unity官方手册&#xff0c;unity唐老狮等教程知识&#xff0c;如有不足还请斧正 1.TileMap &TileBase Unity - Scripting API: Tilemap &#xff0c;看手册内容太多了故介绍几个常用的公共方法 首…

【凌鸥学园】电机电控课程学习,挑战自我!

电控达人集结号&#xff01;凌鸥学园精心打造的电机电控课程&#xff0c;现面向全体爱好者及专业人士免费开放&#xff01; 课程内容从基础原理到高级应用&#xff0c;全方位助力你快速掌握电机电控精髓&#xff0c;实现技能飞跃&#xff01; 挑战成功&#xff0c;双重奖励等…

12. 矩阵中的路径

comments: true difficulty: 中等 edit_url: https://github.com/doocs/leetcode/edit/main/lcof/%E9%9D%A2%E8%AF%95%E9%A2%9812.%20%E7%9F%A9%E9%98%B5%E4%B8%AD%E7%9A%84%E8%B7%AF%E5%BE%84/README.md 面试题 12. 矩阵中的路径 题目描述 给定一个 m x n 二维字符网格 board…

Java的反射原理

反射允许程序在运行时检查或修改其类、接口、字段和方法的行为。反射主要通过java.lang.reflect包中的类和接口实现&#xff0c;它主要用于以下目的&#xff1a; 在运行时分析类的能力&#xff1a;通过反射&#xff0c;可以在运行时检查类的结构&#xff0c;比如它的方法、构造…

【RAG检索增强生成】Ollama+AnythingLLM本地搭建RAG大模型私有知识库

目录 前言一、Ollama&#xff1a;革新性的本地LLM服务工具1.核心优势2.技术亮点 二、AnythingLLM 概览1.核心特性2.技术生态支持 三、搭建本地智能知识库1. Ollama的安装启航2. AnythingLLM的安装对接3. AnythingLLM的配置精调4. 工作区与文档管理5. 聊天与检索的智能交互 四、…

计算机毕业设计 校园失物招领网站 Java+SpringBoot+Vue 前后端分离 文档报告 代码讲解 安装调试

&#x1f34a;作者&#xff1a;计算机编程-吉哥 &#x1f34a;简介&#xff1a;专业从事JavaWeb程序开发&#xff0c;微信小程序开发&#xff0c;定制化项目、 源码、代码讲解、文档撰写、ppt制作。做自己喜欢的事&#xff0c;生活就是快乐的。 &#x1f34a;心愿&#xff1a;点…

封装el-table 基于element封装可配置JSON表格组件

基于element封装可配置JSON表格组件 话不多说直接贴代码&#xff0c;复制运行即可查看效果 子组件全部代码 <template><div class"custom-table"><el-table:data"tableData"borderstyle"width: 100%"size"mini"max-h…

负载均衡之HAProxy超全内容!!!

一、负载均衡 1.1 负载均衡概念 负载均衡&#xff08;Load Balance&#xff0c;简称 LB&#xff09;是高并发、高可用系统必不可少的关键组件&#xff0c;目标是尽力将网络流量平均分发到多个服务器上&#xff0c;以提高系统整体的响应速度和可用性。 1.2 软件负载均衡 软件…

做报表用什么工具?不想再用Excel了!!!

一、什么是中国式报表&#xff1f; 不知道大家现在还是使用Excel来制作报表&#xff0c;然后跟领导汇报工作吗&#xff1f;虽然Excel功能很强大&#xff0c;但是用Excel做过中国式报表的小伙伴一定知道它的制作过程有多复杂。 中国式报表可以用一句话简单概括&#xff1a;格式…

Mozilla Firefox侧边栏和垂直标签在131 Nightly版本中开始试用

垂直选项卡和全新的侧边栏体验现已在Mozilla Firefox Nightly 131 中提供。这一更新备受社区期待和要求&#xff0c;我们期待看到它如何提高您的浏览效率和工作效率。如果您想体验一下这项正在进行中的工作&#xff0c;请这样操作&#xff1a; 更新到最新的Nightly版 转到设置…

uniapp本地打包app安装说明

uniapp本地打包app安装说明 目录 uniapp本地打包app安装说明一、打包说明1.HBuilder X 生成本地打包资源2.Android Studio和App离线SDK环境准备2.1 下载Android Studio和 App离线SDK2.2 资源替换2.3 id属性值修改。2.4 添加provider信息到AndroidManifest.xml中的<applicati…

使用Hugging Face构建大型语言模型应用

在本文中&#xff0c;我们将介绍如何使用Hugging Face的大型语言模型&#xff08;LLM&#xff09;构建一些常见的应用&#xff0c;包括摘要&#xff08;Summarization&#xff09;、情感分析&#xff08;Sentiment analysis&#xff09;、翻译&#xff08;Translation&#xff…

Leetcode JAVA刷刷站(14)最长公共前缀

一、题目概述 二、思路方向 在Java中&#xff0c;要编写一个函数来查找字符串数组中的最长公共前缀&#xff0c;我们可以遵循以下步骤&#xff1a; 处理边界条件&#xff1a;如果数组为空或长度为0&#xff0c;直接返回空字符串。初始化最长公共前缀&#xff1a;将数组的第一个…

HarmonyOS 3.1/4.0应用升级到HarmonyOS NEXT改动点

在 “2024鸿蒙零基础快速实战-仿抖音App开发&#xff08;ArkTS版&#xff09;”&#xff08;https://coding.imooc.com/class/843.html&#xff09;视频课程中&#xff0c;因为讲师在该课程授课时是使用的HarmonyOS 3.1/4.0应用&#xff08;API 9&#xff09;&#xff0c;如果部…

在亚马逊云科技上搭建云原生生成式AI教育学习平台

项目简介&#xff1a; 小李哥将继续每天介绍一个基于亚马逊云科技AWS云计算平台的全球前沿AI技术解决方案&#xff0c;帮助大家快速了解国际上最热门的云计算平台亚马逊云科技AWS AI最佳实践&#xff0c;并应用到自己的日常工作里。 本次介绍的是如何利用亚马逊云科技大模型托…