Kafka3.4 SASL/kerberos ACL 证以及 SSL 加密连接
序
前面我们使用 kafka3.3.1 on zookeeper
的模式进行多网段监听的 kafka
集群,顺便搭建起 kafkaui
后发现一些问题,我们 kafka
集群没有连接认证,万一谁知道了我们的 kafka
连接地址,岂不是随随便便就能消费数据、清空数据、胡乱修改数据了吗?
所以本章节进行认证连接的搭建,参考官网:
https://kafka.apache.org/documentation/#security
认证类型
如果对 Kafka
连接协议有疑问的,请优先查看 Kafka SASL ACL SSL 分别代表什么意思。
测试环境
版本信息
Kafka 3.4
CDH 6.3.2
Zookeeper 3.4.5
Krb5kdc 已经启动
服务器规划信息
环境检查项
- 关闭
selinux
- 关闭防火墙
- 如有条件请配置好
ansible
,在本文中bigdata_cluster
代指udmax0[1-4].udmax.com
Krb5Kdc 安装配置
如果已经安装 krb5kdc
可忽略这部分内容。
在主节点 udmax01.udmax.com
安装 krb5 server
,使用如下命令:
yum install krb5-server krb5-libs krb5-auth-dialog krb5-workstation openldap-clients -y
如果速度过慢可以更改
yum
源。
其他节点只需要安装执行环境:
ansible bigdata_cluster -m shell -a 'yum install krb5-workstation krb5-libs -y '
更改 /var/kerberos/krb5kdc/kdc.conf
文件,如果不存在则创建:
[kdcdefaults]kdc_ports = 88kdc_tcp_ports = 88[realms]UDMAX.COM = {#master_key_type = aes256-ctsacl_file = /var/kerberos/krb5kdc/kadm5.acldict_file = /usr/share/dict/wordsadmin_keytab = /var/kerberos/krb5kdc/kadm5.keytabsupported_enctypes = aes256-cts:normal aes128-cts:normal des3-hmac-sha1:normal arcfour-hmac:normal camellia256-cts:normal camellia128-cts:normal des-hmac-sha1:normal des-cbc-md5:normal des-cbc-crc:normalmax_life = 3dmax_renewable_life = 7d}
这里 reaml
叫做 UDMAX.COM
,后续大量地方会使用到。
更改 /etc/krb5.conf
文件:
# Configuration snippets may be placed in this directory as well
includedir /etc/krb5.conf.d/[logging]default = FILE:/var/log/krb5libs.logkdc = FILE:/var/log/krb5kdc.logadmin_server = FILE:/var/log/kadmind.log[libdefaults]dns_lookup_realm = falseticket_lifetime = 24hrenew_lifetime = 7dforwardable = truerdns = falsepkinit_anchors = FILE:/etc/pki/tls/certs/ca-bundle.crtdefault_realm = UDMAX.COMdefault_cache_name = KEYRING:persistent:%{uid}[realms]UDMAX.COM = {kdc = udmax01.udmax.comadmin_server = udmax01.udmax.com}[domain_realm].udmax.com = UDMAX.COM
udmax.com = UDMAX.COM
需要注意的有:
default_realm
指向realms
中的一项realm
中的kdc
和admin_server
都指向我们的主节点。
配置完毕后分发文件:
ansible bigdata_cluster -m copy -a "src=/etc/krb5.conf dest=/etc/krb5.conf"
如果没有
ansible
,请使用scp
命令。
初始化 kdc database
:
kdb5_util create –r UDMAX.COM -s
Enter KDC database master key:
这里需要输入密码,在这篇文章中称之为
{{kdc_master_key}}
。
开启 krb5kdc
服务:
systemctl start krb5kdc
systemctl status krb5kdc
Kafka 3.4 安装
# 先安装scala
curl -fL https://github.com/coursier/coursier/releases/latest/download/cs-x86_64-pc-linux.gz | gzip -d > cs && chmod +x cs && ./cs setup# 或者离线安装包
wget https://downloads.lightbend.com/scala/2.12.17/scala-2.12.17.tgz
ansible bigdata_cluster -m shell -a "mkdir -p /usr/scala/"
ansible bigdata_cluster -m copy -a "src=scala-2.12.17.tgz dest=/usr/scala/scala-2.12.17.tgz"
ansible bigdata_cluster -m shell -a "cd /usr/scala/; tar -zxvf scala-2.12.17.tgz"# 环境变量
vim /etc/profile.d/scala.sh
export SCALA_HOME=/usr/scala/scala-2.12.17
export SCALA_PATH=${SCALA_HOME}/bin
export PATH=${PATH}:${SCALA_PATH}ansible bigdata_cluster -m copy -a "src=/etc/profile.d/scala.sh dest=/etc/profile.d/scala.sh"
解压离线安装包,规划安装路径:
mkdir /component/kafka/
cd /component/kafka
wget https://downloads.apache.org/kafka/3.4.0/kafka_2.12-3.4.0.tgz
tar -zxvf kafka_2.12-3.4.0.tgz
注册 Kafka 的 kerberos 用户
# 创建kafka用户
addprinc -randkey kafka/udmax01@UDMAX.COM
addprinc -randkey kafka/udmax02@UDMAX.COM
addprinc -randkey kafka/udmax03@UDMAX.COM
addprinc -randkey kafka/udmax04@UDMAX.COM# 导出票据,可以导到一个文件中,或者分开也行
xst -norandkey -k /etc/security/keytab/kafkaX.keytab kafka/udmax01@UDMAX.COM
xst -norandkey -k /etc/security/keytab/kafkaX.keytab kafka/udmax02@UDMAX.COM
xst -norandkey -k /etc/security/keytab/kafkaX.keytab kafka/udmax03@UDMAX.COM
xst -norandkey -k /etc/security/keytab/kafkaX.keytab kafka/udmax04@UDMAX.COM
此时生成的 kafkaX.keytab
文件需要分发到各个服务器上:
ansible bigdata_cluster -m copy -a "src=/etc/security/keytab/kafkaX.keytab dest=/etc/security/keytab/"
在 9092 端口开启 SASL_PLAINTEXT
认证
首先,我们可以先配置一个 SASL_PLAINTEXT
认证,这里采用 SASL/GSSAPI
进行,主体就是刚刚生成的 kafka
。
配置文件更改
conf/server.properties
备份相关文件
# 备份
cd /component/kafka/kafka_2.12-3.4.0/config
cp server.properties server.properties.backup
修改 server.properties
中的内容:
broker.id=001102
reserved.broker.max.id=255255
listeners=SASL_PLAINTEXT://udmax02.udmax.com:9092
advertised.listeners=SASL_PLAINTEXT://udmax02.udmax.com:9092
inter.broker.listener.name=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.kerberos.service.name=kafkaauto.create.topics.enable=false
delete.topic.enable=true
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/component/kafka/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.retention.check.interval.ms=300000
zookeeper.connect=udmax02.udmax.com:2181,udmax03.udmax.com:2181,udmax04.udmax.com:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
注意 listeners
中为 Kafka broker
监听的地址,这里监听了端口 9092
,协议使用的为 SASL_PLAINTEXT
。而 sasl.mechanism.inter.broker.protocol
指定的 SASL
使用 GSSAPI
。那么对应的 SASL
相关配置为:
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.kerberos.service.name=kafka
这里表明使用
SASL/GSSAPI(Kerberos)
进行身份验证,并且采用的principal
名为kafka
conf/kerberos/krb5_kafka.conf
该文件以及目录不存在,需要创建:
# 将krb5.conf 抽离一个出来,主要是某些krb5.conf可能有问题(特别是CDH接管后)。
cp /etc/krb5.conf conf/kerberos/krb5_kafka.conf# 主要的内容如下:
[logging]default = FILE:/var/log/krb5libs.logkdc = FILE:/var/log/krb5kdc.logadmin_server = FILE:/var/log/kadmind.log[kdc]profile = /var/kerberos/krb5kdc/kdc.conf[libdefaults]dns_lookup_realm = falseticket_lifetime = 24hrenew_lifetime = 7dforwardable = truerdns = falsepkinit_anchors = FILE:/etc/pki/tls/certs/ca-bundle.crtdefault_realm = UDMAX.COMdefault_ccache_name = KEYRING:persistent:%{uid}storeKey=false[realms]
UDMAX.COM = {kdc = udmax01.udmax.comadmin_server = udmax01.udmax.com}[domain_realm].udmax.com = UDMAX.COM
udmax.com = UDMAX.COM
conf/kerberos/client.properties
用于在服务器环境内进行连接,因为当前需要连接 kafka
集群,所以都需要认证。
我们开启了端口 9092
,为 SASL_PLAINTEXT
,所以连接时也需要配置对应的连接认证,GSSAPI
对应的就是 com.sun.security.auth.module.Krb5LoginModule
认证:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \useKeyTab=true \keyTab="/etc/security/keytab/kafkaX.keytab" \principal="kafka/udmax02.udmax.com@UDMAX.COM" \renewTGT=true \storeKey=false \useTicketCache=true;
这里使用的 principal
为最高权限的 kafka
用户,一般来说应当设置为租户。比如这样的配置:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \useKeyTab=true \keyTab="/etc/security/keytab/udmax.keytab" \principal="udmax@UDMAX.COM" \renewTGT=true \storeKey=false \useTicketCache=true;
注意 sasl.kerberos.service.name
应当是连接 kafka broker
的名称,也是 server.properties
中的 sasl.kerberos.service.name
。
conf/kerberos/kafka_server_jaas.conf
当 kafka broker
启动的时候,会根据该文件获取自身的 SASL
信息,可以理解为当 Kafka
集群启动时,他们也需要互相认证对方是否合法。常规内容如下:
KafkaServer {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truestoreKey=trueuseTicketCache=falsekeyTab="/etc/security/keytab/kafkaX.keytab"principal="kafka/udmax02.udmax.com@UDMAX.COM";
};KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truestoreKey=trueuseTicketCache=falseprincipal="kafka/udmax02.udmax.com@UDMAX.COM"keyTab="/etc/security/keytab/kafkaX.keytab";
};Client {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truestoreKey=trueuseTicketCache=falseprincipal="kafka/udmax02.udmax.com@UDMAX.COM"keyTab="/etc/security/keytab/kafkaX.keytab";
};
请注意每个 kafka broker
的 principal
应当与自己的服务器名 hostname
一致。
Client {}
中的内容为 kafka broker
连接 zookeeper
时需要的认证信息,如果 zookeeper
未开启认证,则不需要相关配置。值得注意的是作者在测试时发现CDH 6.3.2中的Zookeeper显示开启认证,实际情况并没有。
执行文件配置
到此我们相关的配置已经配置完毕,但是常用的启停程序还需要统一更改。
涉及到的启动程序包括:
- bin/kafka-server-start.sh
- bin/kafka-console-consumer.sh
- bin/kafka-console-producer.sh
- bin/kafka-topics.sh
- bin/kafka-acls.sh
其实就是我们会在环境中使用到的 shell
都需要进行更改,更改的方式也很简单在上述文件的倒数第二行加上我们设定的环境变量:
export KAFKA_OPTS="-Dzookeeper.sasl.client=ture -Dzookeeper.sasl.client.username=kafka -Djava.security.krb5.conf=/component/kafka/kafka_2.12-3.4.0/config/kerberos/krb5_kafka.conf -Djava.security.auth.login.config=/component/kafka/kafka_2.12-3.4.0/config/kerberos/kafka_server_jaas.conf"
如果 zookeeper 未开启 sasl 请将
-Dzookeeper.sasl.client
设置为false
。
此时如果 SASL_PLAINTEXT
已经足够,则不需要继续开启 SSL
认证,跳到 启动 Kafka
即可。
在 9094 端口开启 SASL_SSL
认证
在刚刚的配置中,已经使用 SASL/GSSAPI
完成了身份验证,现在添加 SSL
传输加密。
在 SSL
中,我们还需要生成证书以及两个 jks
文件。
生成证书并配置
创建文件夹进行操作
ansible bigdata_cluster -m shell -a " mkdir -p /opt/ca "
cd /opt/ca
每个 broker 生成 server.keystore.jks
先在每台服务器生成 server.keystore.jks
文件,使用命令:
keytool -keystore server.keystore.jks \
-alias udmax01 \
-validity 36500 \
-genkey \
-keypass {{ssl.key.password}} \
-keyalg RSA \
-dname "CN=udmax01.udmax.com,OU=yd,O=yd,L=beijing,S=beijing,C=cn" \
-ext SAN=IP:{{内.网.i.p}},IP:{{外.网.i.p}},DNS:udmax01.udmax.com,DNS:udmax01 \
-storepass {{ssl.keystore.password}}
注意:
-dname
中的CN
要与每台服务器名hostname
一致-ext
中对应ip
以及DNS
最好写全,{{内.网.i.p}}
与{{外.网.i.p}}
是代指如192.168.1.100
。-keypass
与-storepass
在后续需要用到,本文使用{{ssl.key.password}}
和{{ssl.keystore.password}}
进行代指。
在主节点生成 CA 证书
通过 openssl
生成证书,如果没有可以进行安装:
yum install openssl -y
使用如下命令创建 CA 证书:
openssl req -new -x509 -keyout ca-key -out ca-cert -days 36500 \
-passin pass:{{openssl_password}} -passout pass:{{openssl_password}} \
-subj "/C=CN/ST=Beijing/L=Beijing/O=yd/CN=ud"
注意:
- pass 中的密码也被
{{openssl_password}}
代替。
生成后进行分发文件:
ansible bigdata_cluster -m copy -a " src=ca-cert dest=/opt/ca/ "
ansible bigdata_cluster -m copy -a " src=ca-key dest=/opt/ca/ "
ansible bigdata_cluster -m shell -a " ls -ltr /opt/ca "
此时的 /opt/ca
目录中有如下文件:
途中 old 文件可以忽略
每个 broker 通过 CA 证书创建客户端信任证书
# 创建 client.truststore.jks
keytool -keystore client.truststore.jks -alias UdmaxCA -import -file ca-cert -storepass {{ssl.keystore.password}} -keypass {{ssl.key.password}}# 创建 server.keystore.jks
keytool -keystore server.keystore.jks -alias udmax01 -certreq -file cert-file -storepass {{ssl.keystore.password}} -keypass {{ssl.key.password}}# 创建 cert-signed 后续进行盖章
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 36500 -CAcreateserial -passin pass:{{openssl_password}}# 将 ca-cert 导入server.keystore.jks
keytool -keystore server.keystore.jks -alias UdmaxCA -import -file ca-cert -storepass {{ssl.keystore.password}} -keypass {{ssl.key.password}}# 将 cert-signed 导入server.keystore.jks
keytool -keystore server.keystore.jks -alias udmax01 -import -file cert-signed -storepass {{ssl.keystore.password}} -keypass {{ssl.key.password}}
此时就已经完成了 SSL
文件的准备:
实操截图如下(为了区分证书,每台的 alias
进行改动,其实根本也没必要):
Kafka SSL 配置
conf/server.properties
相关的 SSL
配置为:
ssl.keystore.location=/opt/ca/server.keystore.jks
ssl.truststore.location=/opt/ca/server.truststore.jks
ssl.keystore.password={{ssl.keystore.password}}
ssl.key.password={{ssl.key.password}}
ssl.client.auth=required
其他文件
由于 SSL
其实涉及到加解密的过程,一般只是对外使用,在内网环境可以不开启,所以内网中的各类 shell
连接 9092
端口,无需再进行 SSL
配置,如果需要请参考后续 Java SASL_SSL 程序连接 Demo
。
开启 ACL
如果需要开启 ACL
请优先开启 SASL
,ACL
配置为:
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
super.users=User:kafka
开启了 ACL 认证,采用
kafka
自带的授权验证,其中超级用户名叫做kafka
。
启动 Kafka
使用如下命令进行启动:
bin/kafka-server-start.sh -daemon /component/kafka/kafka_2.12-3.4.0/config/server.properties
使用如下命令查看 topics
信息:
/component/kafka/kafka_2.12-3.4.0/bin/kafka-topics.sh --describe --bootstrap-server udmax02.udmax.com:9092 --command-config /component/kafka/kafka_2.12-3.4.0/config/kerberos/client.properties
可能刚刚创建,并未有 topic 信息,但查看是否有报错。
使用如下命令查看 kafka broker
日志:
tailf /component/kafka/kafka_2.12-3.4.0/logs/server.log
Java SASL_SSL 程序连接 Demo
package com.huangyichun;import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SslConfigs;import java.util.Properties;public class KafkaConnectionProducerSaslSsl {private static final String TOPIC_NAME = "test-topic";public static void main(String[] args) {System.setProperty("java.security.krb5.conf", "src/main/resources/krb5_kafka.conf");System.setProperty("java.security.auth.login.config", "src/main/resources/kafka_client_kafka.conf");System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "udmax02.udmax.com:9094,udmax03.udmax.com:9094,udmax04.udmax.com:9094");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "src/main/resources/ca/client.truststore.jks");props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "{{ssl.truststore.password}}");props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS");props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");props.put("sasl.kerberos.service.name", "kafka");KafkaProducer<String, String> producer = new KafkaProducer<>(props);try {for (int i = 0; i < 1000; i++) {Thread.sleep(5000L);ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key" + (i / 10), "message: " + i);producer.send(record);}} catch (InterruptedException e) {e.printStackTrace();}}
}
其中 kafka_client_udmax.conf
中的内容为:
KafkaClient {com.sun.security.auth.module.Krb5LoginModule requiredrdns = falseuseKeyTab=truestoreKey=trueuseTicketCache=falseprincipal="udmax@UDMAX.COM"keyTab="src/main/resources/keytab/udmax.keytab";
};
报错锦集
Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner authentication information from the user
keytab
文件与 principal
密码不匹配。
Not authorized to access topics [XXX]
没有权限访问 XXX
topic,被 ACL 挡住,使用 kafka-acl.sh
进行赋权。
General SSLEnginge problem, No name matching xxxx.xxx.xx found
SSL 无法找到匹配的 server
证书,大概率是在生成 server.keystore.jks
时将 CN
填写错误,请填写每台 hostname
的值。
总结
如果对您有帮助,请点赞!