目录
引言
Kafka 的核心概念和组件
Kafka 的主要特性
使用场景
申请云服务器
安装docker及docker-compose
VSCODE配置
开发环境搭建
搭建Kafka的python编程环境
Kafka的python编程示例
引言
Apache Kafka 是一个分布式流处理平台,由 LinkedIn 开发并在 2011 年贡献给 Apache 软件基金会。虽然 Kafka 常被归类为消息队列(也称为消息传递系统或消息中间件),但它实际上提供了比传统消息队列更丰富的功能,特别是在处理大规模数据流方面。Kafka 最初被设计用于处理 LinkedIn 的高吞吐量日志数据,但现在已广泛应用于各种场景,包括网站活动跟踪、日志收集、实时分析、监控数据聚合以及流处理等。
Kafka 的核心概念和组件
-
Broker(代理):Kafka 集群中的服务器被称为 broker。每个 broker 都可以独立地处理来自生产者的数据,并响应消费者的请求。
-
Topic(主题):Kafka 中的消息被分类存储在名为 topic 的容器中。每个 topic 可以有多个分区(partition),每个分区都有序地存储消息。
-
Partition(分区):分区是 Kafka 中实现水平扩展和容错的关键。每个分区可以分布在不同的 broker 上,同时每个分区内的消息都是有序的。
-
Producer(生产者):生产者负责向 Kafka 集群发送消息到指定的 topic。生产者可以指定消息的键(key),Kafka 使用这个键来决定消息被发送到哪个分区。
-
Consumer(消费者):消费者从 Kafka 集群订阅 topic 并消费数据。Kafka 支持多个消费者群组(consumer group)同时消费同一个 topic,每个消费者群组内的消费者可以共同分担处理数据的任务。
-
Consumer Group(消费者群组):同一个消费者群组内的消费者可以并行地消费同一个 topic 的不同分区,但每个分区只能被一个消费者群组内的一个消费者消费,以确保消息的有序性。
-
Offset(偏移量):Kafka 中的每条消息都有一个唯一的偏移量,用于标识消息在分区中的位置。消费者通过记录自己消费到的偏移量来跟踪消息的读取进度。
Kafka 的主要特性
- 高吞吐量:Kafka 被设计用来处理高吞吐量的数据,可以轻松处理成千上万条消息/秒。
- 可扩展性:Kafka 的集群可以通过增加更多的 broker 来水平扩展,以处理更大的数据量和更高的吞吐量。
- 持久性:Kafka 通过将消息存储在磁盘上来保证消息的持久性,即使在服务器故障的情况下也不会丢失数据。
- 容错性:Kafka 提供了强大的容错机制,包括自动的副本复制和数据冗余,以确保数据的可靠性和可用性。
- 实时性:Kafka 支持实时数据处理,使得它可以用于构建实时流处理应用程序。
使用场景
- 消息传递:作为传统的消息队列使用,支持解耦的生产者和消费者。
- 网站活动跟踪:收集和分析用户的点击流、搜索查询等网站活动数据。
- 日志收集:从分布式系统中收集日志数据,用于监控和分析。
- 实时分析:对实时数据流进行实时处理和分析,以支持实时决策。
- 事件流处理:处理实时事件流,如传感器数据、金融交易数据等。
申请云服务器
(以京东云为例,阿里云、腾讯云、华为云、天翼云类似)
注意在选择操作系统的时候选择ubuntu22.04或ubuntu20.04
管理员账户root
管理员密码:在安装的时候设置,记住密码
下载安装mobaXterm
https://mobaxterm.mobatek.net/download-home-edition.html
安装docker及docker-compose
#以下只安装一次即可!
sudo apt update
sudo apt install -y docker.io # intel x86_64
sudo curl -SL https://github.com/docker/compose/releases/download/v2.21.0/docker-compose-linux-x86_64 \-o /usr/local/bin/docker-compose
# 如果github不能访问,可用hub.njuu.cf或521github.com/镜像站替换github.com重试
sudo chmod +x /usr/local/bin/docker-compose #如报错,去掉sudo重试
sudo ln -s /usr/local/bin/docker-compose /usr/bin/docker-compose #如报错,去掉sudo重试
查看docker和dockers是否安装好?
docker version
docker-compose version
VSCODE配置
开发环境搭建
-
将如下文件保存为docker-compose.yml,并上传至服务器,例如/home/ubuntu/iiot/kafka
-
将下面代码中的localhost替换为云服务公网IP
version: '3'
services:zookeeper:image: confluentinc/cp-zookeeper:latestenvironment:ZOOKEEPER_CLIENT_PORT: 2181ports:- "2181:2181"
dkafka:image: confluentinc/cp-kafka:latestcontainer_name: kafka-devdepends_on:- zookeeperenvironment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1ports:- "9092:9092"
-
上传metaldigi.zip
解压缩文件
sudo apt updatesudo apt install zipunzip metaladigi.zip
cd metaldigi/kafka
-
启动kafka消息服务器
-
在命令行执行消息生产者
-
docker-compose up -d
搭建Kafka的python编程环境
-
进入metaldigi文件夹
-
执行 docker ps
cd metaldigi
docker-compose up -d
Kafka的python编程示例
-
进入metaldigi文件夹
-
执行 docker ps
docker exec -it metal-digi-backend bash
消息生产者
from kafka import KafkaProducer# 创建一个 Kafka 生产者实例# 这里指定了 Kafka 服务器的地址和端口
producer = KafkaProducer(bootstrap_servers='150.158.11.142:9092')# 循环发送 10 条消息到 'demo-topic' 主题
for_in range(10):
# 将要发送的消息转换为字节格式message =f'message{_}'.encode('utf-8')# 发送消息到 Kafka 的 'demo-topic' 主题producer.send('demo-topic', value=message)# 打印已发送的消息print(f'Sent message: message{_}')# 关闭生产者实例
producer.close()这段代码创建一个 Kafka 生产者,用于向 Kafka 集群发送消息。它循环发送10条消息到名为 'demo-topic' 的主题。每条消息都是一个简单的文本字符串,转换为字节格式后发送。
消息消费者
from kafka import KafkaConsumer# 创建一个 Kafka 消费者实例
# 指定 Kafka 服务器地址、端口以及其他一些配置
consumer = KafkaConsumer('demo-topic', # 指定要消费的主题bootstrap_servers='150.158.11.142:9092', # Kafka 服务器地址和端口auto_offset_reset='earliest', # 从最早的消息开始消费enable_auto_commit=True, # 自动提交偏移量group_id='demo-group' # 消费者组标识
)# 循环消费并打印收到的消息
for message in consumer:# 解码并打印消息内容print(f"Received message: {message.value.decode()}")