本文内容来自尚硅谷B站公开教学视频,仅做个人总结、学习、复习使用,任何对此文章的引用,应当说明源出处为尚硅谷,不得用于商业用途。
如有侵权、联系速删
视频教程链接:【尚硅谷】Kafka3.x教程(从入门到调优,深入全面)
文章目录
- 1 安装部署
- 1.1 集群规划
- 1.2 集群部署
- 2 命令行操作
- 2.1 主题
- 2.2 生产者
- 2.3 消费者
- 2.4 消费者组
1 安装部署
1.1 集群规划
这里采用的是三节点的kafka集群,名称为hadoop102、hadoop103、hadoop104
1.2 集群部署
官方下载地址:http://kafka.apache.org/downloads.html
- 解压安装包
tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/module/
- 修改解压后的文件名称
mv kafka_2.12-3.0.0/ kafka
- 进入到/opt/module/kafka 目录,修改配置文件
按需修改以下内容:cd config/vim server.properties
#broker 的全局唯一编号,不能重复,只能是数字(一般修改这个)。 broker.id=0 #处理网络请求的线程数量 num.network.threads=3 #用来处理磁盘 IO 的线程数量 num.io.threads=8 #发送套接字的缓冲区大小 socket.send.buffer.bytes=102400 #接收套接字的缓冲区大小 socket.receive.buffer.bytes=102400 #请求套接字的缓冲区大小 socket.request.max.bytes=104857600 #kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔。(一般修改这个) log.dirs=/opt/module/kafka/datas #topic 在当前 broker 上的分区个数 num.partitions=1 #用来恢复和清理 data 下数据的线程数量 num.recovery.threads.per.data.dir=1 # 每个 topic 创建时的副本数,默认时 1 个副本 offsets.topic.replication.factor=1 #segment 文件保留的最长时间,超时将被删除 log.retention.hours=168 #每个 segment 文件的大小,默认最大 1G log.segment.bytes=1073741824 # 检查过期数据的时间,默认 5 分钟检查一次是否数据过期 log.retention.check.interval.ms=300000 #配置连接 Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理)(一般修改这个) zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
- 另外两个节点也装安装包
- 分别在 hadoop103 和 hadoop104 上修改配置文件/opt/module/kafka/config/server.properties中的 broker.id=1、broker.id=2
注:broker.id 不得重复,整个集群中唯一。 - 配置环境变量
- 在/etc/profile.d/my_env.sh 文件中增加 kafka 环境变量配置
增加如下内容:sudo vim /etc/profile.d/my_env.sh
#KAFKA_HOME export KAFKA_HOME=/opt/module/kafka export PATH=$PATH:$KAFKA_HOME/bin
- 刷新一下环境变量。
source /etc/profile
- 其他节点一样的操作。
在另外两个节点 source刷新source /etc/profile
- 在/etc/profile.d/my_env.sh 文件中增加 kafka 环境变量配置
- 启动集群
先启动 Zookeeper 集群,然后启动 Kafka。zk.sh start kafka-server-start.sh -daemon config/server.properties
- 关闭集群
注意:停止 Kafka 集群时,一定要等 Kafka 所有节点进程全部停止后再停止 Zookeeper集群。因为 Zookeeper 集群当中记录着 Kafka 集群相关信息,Zookeeper 集群一旦先停止,Kafka 集群就没有办法再获取停止进程的信息,只能手动杀死 Kafka 进程了。bin/kafka-server-stop.sh
2 命令行操作
2.1 主题
1 查看操作主题命令参数
bin/kafka-topics.sh + 参数
参数 | 描述 |
---|---|
–bootstrap-server <String: server toconnect to> | 连接的 Kafka Broker 主机名称和端口号。 |
–topic <String: topic> | 操作的 topic 名称。 |
–create | 创建主题。 |
–delete | 删除主题。 |
–alter | 修改主题。 |
–list | 查看所有主题。 |
–describe | 查看主题详细描述。 |
–partitions <Integer: # of partitions> | 设置分区数。 |
–replication-factor<Integer: replication factor> | 设置分区副本。 |
–config <String: name=value> | 更新系统默认的配置。 |
操作时要先连接,所以后面的命令行都有--bootstrap-server hadoop102:9092
2 查看当前服务器中的所有 topic
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --list
3 创建一个topic,这里我起名为first
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 1 --replication-factor 3 --topic first
选项说明:
--topic
定义topic名
--replication-factor
定义副本数
--partitions
定义分区数
4 查看 first 主题的详情
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first
5 修改分区数(注意:分区数只能增加,不能减少)
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3
6 再次查看 first 主题修改后的详情
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first
7 删除 topic
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --delete --topic first
2.2 生产者
1 查看生产者
bin/kafka-console-producer.sh
参数 | 描述 |
---|---|
–bootstrap-server <String: server toconnect to> | 连接的 Kafka Broker 主机名称和端口号。 |
–topic <String: topic> | 操作的 topic 名称。 |
2 发送消息
bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
hello world
atguigu atguigu
2.3 消费者
1 查看消费者
bin/kafka-console-consumer.sh
参数 | 描述 |
---|---|
–bootstrap-server <String: server toconnect to> | 连接的 Kafka Broker 主机名称和端口号。 |
–topic <String: topic> | 操作的 topic 名称。 |
–from-beginning | 从头开始消费。 |
–group <String: consumer group id> | 指定消费者组名称。 |
2 消费消息
- 消费指定主题的数据
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
- 把主题中所有的数据都读取出来(包括历史数据)。
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first
2.4 消费者组
1 查看消费者组
bin/kafka-consumer-groups.sh
参数 | 描述 |
---|---|
–bootstrap-server <String: server toconnect to> | 连接的 Kafka Broker 主机名称和端口号。 |
–describe | 列出消费者组详情信息。 |
–list | 列出所有消费者。 |
–group <String: consumer group id> | 指定消费者组名称。 |
如果我们要查询testGroup消费者组的详细信息,执行如下命令
bin/kafka-consumer-groups.sh --bootstrap-server hadoop102:9092 --describe --group testGroup
其中列出的参数含义为:
参数 | 描述 |
---|---|
GROUP | 消费者组名 |
TOPIC | 主题名称 |
PARTITION | 该主题消息的分区ID列表 |
CURRENT-OFFSET | 最后被消费的消息的偏移量 |
LOG-END-OFFSET | 该主题最后一条消息的偏移量 |
LAG | 消息积压量 |
CONSUMER-ID | 该组消费者ID |
HOST | 该组消费者主机IP/brokerID |
CLIENT-ID | 该组消费者客户端ID |