【Kafka】Linux+KRaft集群部署指南
- 摘要
- 本地环境说明
- 官网
- 准备工作
- 快速开始
- 修改config/kraft/server.properties
- 初始化数据存储目录
- 新节点加入集群
- 启动
- 停止
- 测试
- 创建topic
- 创建生产者
- 创建消费者
摘要
Kafka
是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。其核心组件包含Producer
、Broker
、Consumer
,以及依赖的Zookeeper
集群。其中Zookeeper
集群是Kafka
用来负责集群元数据的管理、控制器的选举等。
用过kafka
的开发者应该知道,每次启动kafka
服务时,都是需要先把Zookeeper
启动,然后启动kafka
,步骤相当繁琐。
Kafka
在使用的过程当中,会出现一些问题。由于重度依赖Zookeeper
集群,当Zookeeper
集群性能发生抖动时,Kafka
的性能也会收到很大的影响。因此,在Kafka
发展的过程当中,为了解决这个问题,提供KRaft
模式3.0+
版本,来取消Kafka
对Zookeeper
的依赖。
Kafka
是依赖于JDK
的,需要先把java
环境配置一下。
本地环境说明
依赖 | 版本 |
---|---|
JDK | 21 |
Linux | Red Hat Enterprise Linux release 8.0 (Ootpa) |
Kafka | 3.9.0 |
官网
- 下载页面:
https://kafka.apache.org/downloads
3.9.0
下载地址:https://downloads.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz
- 官方文档:
https://kafka.apache.org/documentation/
准备工作
sudo sh -c 'cat >> /etc/hosts << EOF
10.34.88.110 kafka1
10.34.88.111 kafka2
10.34.88.112 kafka3
EOF'
快速开始
修改config/kraft/server.properties
# 表示此节点,既是broker又可以当controller
process.roles=broker,controller
# 节点id,不重名即可
node.id=1
# controller竞争者,也就是controller将从它们之中诞生(这里的kafka1是刚刚设置的本机的域名解析,或者直接写localhost也行)
controller.quorum.voters=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
# 监听地址(也就是客户端连接时访问的地址)
advertised.listeners=PLAINTEXT://kafka1:9092,CONTROLLER://kafka1:9093
controller.listener.names=CONTROLLER
# kafka数据存放地址
log.dirs=data
advertised.listeners
参数里面的host
一定要是其它节点能够访问通的地址,不要写localhost
等回环地址,否则无法成功加入集群
初始化数据存储目录
cd /opt/elk/kafka_2.13-3.9.0
# 生成uuid,后面需要用
./bin/kafka-storage.sh random-uuidKAFKA_CLUSTER_ID=n7M-O9yUTi6DqJoFbopyUg
KAFKA1_UUID=DdF3W7v0RviSZj7k4H9yJw
KAFKA2_UUID=XZDIN4GzScmnfhjsYwMt3g
KAFKA3_UUID=HwsQUApfSbenMYBWS7j_-A
# 每个节点都需要运行
# If you would like the formatting process to fail if a dynamic quorum cannot be achieved, format your controllers using the --feature kraft.version=1. (Note that you should not supply this flag when formatting brokers -- only when formatting controllers.)
./bin/kafka-storage.sh format --feature kraft.version=1 --cluster-id ${KAFKA_CLUSTER_ID} \--initial-controllers "1@kafka1:9093:${KAFKA1_UUID},2@kafka2:9093:${KAFKA2_UUID},3@kafka3:9093:${KAFKA3_UUID}" \--config config/kraft/server.properties
新节点加入集群
./bin/kafka-storage.sh format --feature kraft.version=1 --cluster-id ${KAFKA_CLUSTER_ID} --config config/kraft/server.properties --no-initial-controllers
启动
# 格式化完毕后,可以启动节点了(守护进程启动加-daemon 参数)。
mkdir -pv logs
nohup ./bin/kafka-server-start.sh ./config/kraft/server.properties >logs/server.log 2>&1 & echo \$! > kafka.pid
tail -fn200 logs/server.log
停止
kill -9 \$(cat kafka.pid)
测试
创建topic
./bin/kafka-topics.sh --create --topic kafka-test --partitions 1 --replication-factor 1 --bootstrap-server 127.0.0.1:9092
创建生产者
./bin/kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic kafka-test
创建消费者
./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic kafka-test --from-beginning