1、下载安装
Kafka下载地址:Apache Kafka
下载文件
wget https://downloads.apache.org/kafka/3.8.0/kafka_2.12-3.8.0.tgz
文件解压缩
tar -zxvf kafka_2.12-3.8.0.tgz
进入目录
cd kafka_2.12-3.8.0
2、Zookeeper 配置
2.1、修改 Zookeeper 配置文件 config/zookeeper.properties
编辑 zookeeper 配置文件
vim config/zookeeper.properties
2.2、Zookeeper 配置文件修改内容
log.dirs=/home/paastest/component/kafka_2.12-3.8.0/data
dataDir=/home/paastest/component/kafka_2.12-3.8.0/zookeeper/data
dataLogDir=/home/paastest/component/kafka_2.12-3.8.0/zookeeper/log
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
主要是以下三行
# 身份验证提供程序
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider# 需要客户端身份验证方案
requireClientAuthScheme=sasl# jaas登录续订
jaasLoginRenew=3600000
2.4、Zookeeper 配置JAAS文件
配置JAAS文件
vi config/zookeeper_jaas.conf
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username=“admin”
password=“admin1234”
user_admin=“admin1234”
};
2.5、Zookeeper 修改启动脚本 bin/zookeeper-server-start.sh
编辑启动脚本
vim bin/zookeeper-server-start.sh
倒数第二行增加如下内容
export KAFKA_OPTS=“-Djava.security.auth.login.config=/home/paastest/component/kafka_2.12-3.8.0/config/zookeeper_jaas.conf”
3、Kafka 配置
3.1、修改 Kafka 配置文件 config/server.properties
编辑 Kafka 配置文件
vim config/server.properties
listeners=SASL_PLAINTEXT://xxx:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
super.users=User:admin
#Listener name, hostname and port the broker will advertise to clients.
#If not set, it uses the value for "listeners".
advertised.listeners=SASL_PLAINTEXT://r-apm-1:9092
其中xxx是机器的ip,r-apm-1是我的主机名(linux命令行输hostname可得),这里是为了对外提供访问服务,因此/etc/hosts也需要配置映射:
33.0.0.241 r-apm-1 r-apm-1
3.3、Kafka 配置JAAS文件
配置JAAS文件
vi config/kafka_server_jaas.conf
KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="admin1234"user_admin="admin1234"user_alice="1234";
};
KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="admin1234";
};
这里要注意user_xxx=yyy,xxx是用户名,yyy是密码
3.4、Kafka 修改启动脚本 bin/kafka-server-start.sh
编辑启动脚本
vim bin/kafka-server-start.sh
倒数第二行增加如下内容
export KAFKA_OPTS=" -Djava.security.auth.login.config=/home/paastest/component/kafka_2.12-3.8.0/config/kafka_server_jaas.conf"
4、启动 Zookeeper
执行启动 Zookeeper 命令
./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
5、启动 Kafka
执行启动 kafka 命令
./bin/kafka-server-start.sh -daemon config/server.properties
6、验证
这里为了验证broker是否能正常对外提供服务,我们使用producer和consumer来验证,由于kafka配置了加密,因此我们需要加上加密参数:
6.1、配置producer
配置密钥
vi config/kafka_client_jaas.conf
KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="admin1234";
};
配置密钥
vi config/producer.properties
在最后加上
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
配置启动参数
vi bin/kafka-console-producer.sh
倒数第二行添加如下命令:
export KAFKA_OPTS=" -Djava.security.auth.login.config=/home/paastest/component/kafka_2.12-3.8.0/config/kafka_client_jaas.conf"
6.2、配置consumer
配置密钥
跟producer是一样的配置。上面配置了就不用再配
vi config/kafka_client_jaas.conf
KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="admin1234";
};
配置密钥
vi config/consumer.properties
在最后加上
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
配置启动参数
vi bin/kafka-console-consumer.sh
倒数第二行添加如下命令:
export KAFKA_OPTS=" -Djava.security.auth.login.config=/home/paastest/component/kafka_2.12-3.8.0/config/kafka_client_jaas.conf"
6.3、启动生产者
bin/kafka-console-producer.sh --broker-list ip:9092 --topic test --producer.config config/producer.properties
在出现的界面输入要生产的字符按回车即可发送
6.3、启动消费者
bin/kafka-console-consumer.sh --bootstrap-server ip:9092 --topic test --from-beginning --consumer.config config/consumer.properties
发现能消费到上一步生产的字符,即表示成功
7、客户端代码配置
在构建的配置属性中加入security.protocol,sasl.mechanism,sasl.jaas.config即可,其他的配置跟普通版一样。
Properties props = new Properties();
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\""+KafkaConfig.of().getUsername()+"\" password=\""+KafkaConfig.of().getPassword()+"\";");
producer = new KafkaProducer<>(props);
consumer = new KafkaConsumer<>(props);
8、有用的命令
创建topic
./kafka-topics.sh --create --topic test --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1
检查broker是否正常
bin/kafka-broker-api-versions.sh --bootstrap-server 127.0.0.1:9092
生产demo
bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test --producer.config config/producer.properties
消费demo
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning --consumer.config config/consumer.properties
zookeeper客户端
bin/zookeeper-shell.sh localhost:2181
查看集群broker节点信息
[id]是具体的节点id,根据实际情况替换
ls /brokers/ids/[id]
get /brokers/ids/[id]
查看消费组信息
[id]是具体的节点id,根据实际情况替换
ls /consumers/[id]
查看主题信息
[topic]是具体的topic名称,根据实际情况替换
ls /config/topics/[topic]
get /config/topics/[topic]
9、注意事项
如果配置后还是不行,请停止kafak和zookeeper程序,并清理kafka和zookeeper的数据目录后,再检查一遍配置是否都配置对了,不然旧数据目录会对重新启动的产生影响。
kafak数据目录在config/server.properties:
log.dirs=/home/paastest/component/kafka_2.12-3.8.0/data
zookeeper在config/zookeeper.properties:
dataDir=/home/paastest/component/kafka_2.12-3.8.0/zookeeper/data
dataLogDir=/home/paastest/component/kafka_2.12-3.8.0/zookeeper/log