Apache zookeeper kafka 开启SASL安全认证

背景:我之前安装的kafka没有开启安全鉴权,在没有任何凭证的情况下都可以访问kafka。搜了一圈资料,发现有关于sasl、acl相关的,准备试试。

简介

Kafka是一个高吞吐量、分布式的发布-订阅消息系统。Kafka核心模块使用Scala语言开发,支持多语言(如Java、Python、Go等)客户端,它可以水平扩展和具有高吞吐量特性而被广泛使用,并与多类开源分布式处理系统进行集成使用。

  Kafka作为一款开源的、轻量级的、分布式、可分区和具备复制备份的、基于ZooKeeper协调管理的分布式流平台的功能强大的消息系统。与传统消息系统相比,Kafka能够更好的处理活跃的流数据,让数据在各个子系统中高性能、低延迟地不停流转。

自0.9.0.0版本开始Kafka社区添加了许多功能用于提高Kafka集群的安全性,Kafka提供SSL或者SASL两种安全策略。SSL方式主要是通过CA令牌实现,此方案主要介绍SASL方式。

SASL验证分类
验证方式    kafka版本    特点
SASL/PLAIN    0.10.0.0    不能动态添加用户
SASL/SCRAM    0.10.2.0    可以动态添加用户
SASL/Kerberos    0.9.0.0    需要独立部署验证服务
SASL/oauthbearer    2.0.0    需要自己实现接口,实现token的创建和验证,需要额外的oauth服务

使用SSL加密在代理和客户端之间,代理之间或代理和工具之间传输的数据

SCRAM认证配置的优点:

  如果使用PLAIN认证有个问题,就是不能动态新增用户,每次添加用户后,需要重启正在运行的Kafka集群才能生效。

  因此,在生产环境中,这种认证方式不符合实际业务场景,不利于后期扩展。然而使用SCRAM认证,可以动态新增用户,添加用户后,可以不用重启正在运行的Kafka集群即可进行鉴权。所以生产环境推荐使用SCRAM+PLAIN搭配的认证方案。

配置zookeeper集群启用SASL

1. 配置zookeeper,启用sasl认证,cat zoo.cfg查看到如下内容:

tickTime=2000
initLimit=1
syncLimit=5
dataDir=/tmp/zookeeper/data
dataLogDir=/tmp/zookeeper/log
clientPort=2181
admin.serverPort=8888
maxClientCnxns=3000
autopurge.snapRetainCount=3
autopurge.purgeInterval=24
server.1=localhost:2888:3888
4lw.commands.whitelist=conf,stat,srvr,mntr.envi
#zk SASL
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
jaasLoginRenew=3600000
requireClientAuthScheme=sasl
zookeeper.sasl.client=true

2. 配置zookeeper JAAS

cat zk_jaas.conf文件内容如下,如果没有改文件则使用vi命令编辑


Server {org.apache.zookeeper.server.auth.DigestLoginModule requiredusername="admin"password="admin123"user_kafka="kafka123";
};
  1. 注意:admin用户 是zk 集群之间使用的。kafka用户 是 broker 与 zk 之间使用的。

3. 修改zkEnv.sh

将上一步添加的 jaas 配置文件添加到zookeeper的环境变量中,zkEnv.sh文件最后添加一行:vim zkEnv.shZOOBINDIR="${ZOOBINDIR:-/usr/bin}"
ZOOKEEPER_PREFIX="${ZOOBINDIR}/.."# 添加如下 新增变量SERVER_JVMFLAGS:export SERVER_JVMFLAGS="-Djava.security.auth.login.config=../conf/zk_jaas.conf"

配置kafka sasl动态认证

SASL/SCRAM认证是把凭证(credential)存储在Zookeeper,使用kafka-configs.sh在Zookeeper中创建凭据。对于每个SCRAM机制,必须添加具有机制名称的配置来创建凭证,所以在启动Kafka broker之前需要创建代理间通信的凭据。这里配置的 Kafka和生产者/消费者之间 采用SASL/PLAIN和SASL/SCRAM两种方式共同完成认证,授权使用ACL方式。PLAIN方式的用户是在jaas文件中写死的,不能动态的添加;SCRAM支持动态的添加用户。

1. 创建用户

配置SASL/SCRAM认证的第一步,是配置可以连接到kafka集群的用户。本案例创建了3个用户:admin,producer,consumer。kafka_server_admin用户用于broker之间的认证通信,producer用户用于生产者连接kafka,consumer用户用于消费者连接kafka 。

./kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=admin123],SCRAM-SHA-512=[password=admin123]' --entity-type users --entity-name admin./kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=admin123],SCRAM-SHA-512=[password=admin123]' --entity-type users --entity-name producer./kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=admin123],SCRAM-SHA-512=[password=admin123]' --entity-type users --entity-name consumer

2. 查看创建的用户信息

kafka-configs 脚本是用来设置主题级别参数的。其实,它的功能还有很多。比如在这个例子中,我们使用它来创建 SASL/SCRAM 认证中的用户信息。可以使用下列命令来查看刚才创建的用户数据。

./kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users #(可以单独指定某个用户 --entity-name producer,如下)
./kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name producer

ZK客户端命令行查看:./zkCli.sh  -server localhost:2181ls /config/users

3. 配置kafka jaas文件

配置了用户之后,我们需要为 Broker 创建一个对应的 JAAS 文件。在实际场景中,需要为每台单独的物理 Broker 机器都创建一份 JAAS 文件。

Kafka 的 jaas认证配置文件,配置的是登录类,超管密码和管理的帐号密码列表

vim kafka_server_jaas.conf

KafkaServer {org.apache.kafka.common.security.scram.ScramLoginModule requiredusername ="admin"password="admin123"user_admin="admin123"user_producer="producer123"user_consumer="consumer123";
};
KafkaClient {org.apache.kafka.common.security.scram.ScramLoginModule requiredusername="admin"password="admin123"user_producer="producer123"user_consumer="consumer123";
};
Client {org.apache.kafka.common.security.scram.ScramLoginModule requiredusername="kafka"password="kafka123";
};

KafkaServer中usename配置的是kafka服务端使用的账号和密码,后面的user_xxx事预设的普通帐号认证信息。
 
中间部分配置的是PLAIN认证方式的账户和密码,其中producer1是账户名,producer123是密码。
Client配置了broker到Zookeeper的连接用户名密码,这里要和前面zookeeper配置中的zk_jaas.conf.conf 中 user_kafka 的账号和密码相同。
 
关于这个文件内容,需要注意以下两点:
 
1)不要忘记最后一行和倒数第二行结尾处的分号;
 
2)JAAS 文件中不需要任何空格键。

4. kafka 配置文件启用SASL认证

Kafka 服务配置文件 server.propertis,配置认证协议及认证实现类
cat server.properties其它内容都注释掉,然后追加如下内容:

broker.id=0
listeners=SASL_PLAINTEXT://:9092
advertised.listeners=SASL_PLAINTEXT://localhost:9092
sasl.enabled.mechanisms=SCRAM-SHA-256,PLAIN
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
security.inter.broker.protocol=SASL_PLAINTEXT
allow.everyone.if.no.acl.found=false
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka/logs
num.partitions=3
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
delete.topic.enable=true
auto.create.topics.enable=false
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=60000
group.initial.rebalance.delay.ms=0Host.name=43.138.0.199

5. kafka 启动脚本添加认证文件路径的环境变量

Kafka 安全认证可以直接通过环境变量 -Djava.security.auth.login.config 设置,修改 Kafka 启动脚本 kafka-start-server.sh 文件最后一行,增加一个参数指向 jaas 配置文件的绝对路径

vi kafka-server-start.sh

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/home/lighthouse/kafka_2.12-2.2.1/config/kafka_server_jaas.conf  kafka.Kafka "$@"

 6.  kafka客户端配置

1) 配置consumer.properties和producer.properties,都要加入以下配置

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512

2) 生产者配置

使用kafka-console-producer.sh脚本测试生产者,由于开启安全认证和授权,此时使用console-producer脚本来尝试发送消息,那么消息会发送失败,原因是没有指定合法的认证用户,因此客户端需要做相应的配置,需要创建一个名为producer.conf的配置文件给producer程序使用。

config目录下创建一个producer.conf的文件,cat producer.conf文件内容如下:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.Scra
mLoginModule required username="producer" password="producer123";

注意:Topic设置写权限

3) 消费者配置

使用kafka-console-consumer.sh脚本测试生产者,由于开启安全认证和授权,因此客户端需要做相应的配置。需要为 consumer 用户创建consumer.conf给消费者程序,同时设置对topic的读权限。

config目录下创建一个consumer.conf的文件,cat consumer.conf文件内容如下:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.Scra
mLoginModule required username="consumer" password="consumer123";

注意:Topic设置读权限。

4) 在生产者和消费者启动脚本中引入JAAS文件

vim bin/kafka-console-producer.shif [ "x$KAFKA_HEAP_OPTS" = "x" ]; thenexport KAFKA_HEAP_OPTS="-Xmx512M"
fi# 添加这行
export KAFKA_OPTS="-Djava.security.auth.login.config=../config/kafka_server_jaas.conf"exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"# vim bin/kafka-console-consumer.shif [ "x$KAFKA_HEAP_OPTS" = "x" ]; thenexport KAFKA_HEAP_OPTS="-Xmx512M"
fi# 添加这行
export KAFKA_OPTS="-Djava.security.auth.login.config=../config/kafka_server_jaas.conf"exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"

启动kafka

nohup kafka-server-start.sh /path-to-kafka/config/server.properties &

我自己写了一个脚本,同时启动zookeeper和kafka

文件名称叫startup.sh内容如下:

cd /home/lighthouse/zk-3.4.14/zookeeper-3.4.14/bin
./zkServer.sh start
cd /home/lighthouse/kafka_2.12-2.2.1/
nohup bin/kafka-server-start.sh config/server.properties > output.txt &

zookeeper没有启动成功,我找下原因。把整个过程重新整理了一遍,发现zookeeper、kafka启动成功了。

检查验证

./zkCli.sh  -server localhost:2181ls /brokers/ids

1. 生产者测试

1) 创建一个测试主题test_topic

./kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic test_topic

2)查看创建的Topic

./kafka-topics.sh  --list --zookeeper localhost:2181 test_topic

./kafka-topics.sh --describe --zookeeper localhost:2181 --topic test_topic

注:kafka开启认证后,当生产者往Topic写数据需要为主题配置权限(write),即对生产者赋予写的权限。

这里使用producer用户认证授权,通过ACL为producer 用户分配操作test_topic权限:

./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:producer --operation Write --topic 'test_topic'

注意allow-后面不要换行(会报错),注意整段命令要放在一行上。

  • 启动生产者发送消息

特别注意:在生产者生产消息之前如果不设置生产者用户的ACL权限会报错:

./kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic --producer.config ../config/producer.conf./kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic --producer.config /home/lighthouse/kafka_2.12-2.2.1/config/producer.conf 

使用相对路径和绝对路径都报错了,错误相同

org.apache.kafka.common.KafkaException: Failed to construct kafka producerat org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:431)at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:299)at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:44)at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)
Caused by: java.lang.IllegalArgumentException: Login module control flag not specified in JAAS configat org.apache.kafka.common.security.JaasConfig.parseAppConfigurationEntry(JaasConfig.java:110)at org.apache.kafka.common.security.JaasConfig.<init>(JaasConfig.java:63)at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:90)at org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:84)at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:124)at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99)at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:439)at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:420)... 3 more

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/277030.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MySql入门教程--MySQL数据库基础操作

꒰˃͈꒵˂͈꒱ write in front ꒰˃͈꒵˂͈꒱ ʕ̯•͡˔•̯᷅ʔ大家好&#xff0c;我是xiaoxie.希望你看完之后,有不足之处请多多谅解&#xff0c;让我们一起共同进步૮₍❀ᴗ͈ . ᴗ͈ აxiaoxieʕ̯•͡˔•̯᷅ʔ—CSDN博客 本文由xiaoxieʕ̯•͡˔•̯᷅ʔ 原创 CSDN …

k8s的pod和svc相互访问时网络链路解析

k8s的pod和svc相互访问时网络链路解析 1. k8s环境中pod相互访问1.1. k8s中pod相互访问的整体流程1.2. k8s的相同机器的不同pod相互访问1.3. k8s的不同机器的不同pod相互访问 2. k8s访问svc2.1 nat操作2.2 流量进入到后端pod 3. 疑问和思考3.1 访问pod相互访问为什么不用做nat?…

【InternLM 实战营笔记】基于 InternLM 和 LangChain 搭建MindSpore知识库

InternLM 模型部署 准备环境 拷贝环境 /root/share/install_conda_env_internlm_base.sh InternLM 激活环境 conda activate InternLM 安装依赖 # 升级pip python -m pip install --upgrade pippip install modelscope1.9.5 pip install transformers4.35.2 pip install…

数据结构之单链表

目录 1.问题引入 2.主题介绍 2.1链表的概念和结构 2.2链表的分类 2.3单链表的实现 2.3.1接口实现 2.3.2函数实现 2.3.3函数测试 3.小结 halo&#xff0c;又和大家见面了&#xff0c;今天要给大家分享的是单链表的知识&#xff0c;跟着我的脚步&#xff0c;包学包会哦~ …

中电金信:技术实践|Flink维度表关联方案解析

导语&#xff1a;Flink是一个对有界和无界数据流进行状态计算的分布式处理引擎和框架&#xff0c;主要用来处理流式数据。它既可以处理有界的批量数据集&#xff0c;也可以处理无界的实时流数据&#xff0c;为批处理和流处理提供了统一编程模型。 维度表可以看作是用户来分析数…

人工智能|机器学习——K-means系列聚类算法k-means/ k-modes/ k-prototypes/ ......(划分聚类)

1.k-means聚类 1.1.算法简介 K-Means算法又称K均值算法&#xff0c;属于聚类&#xff08;clustering&#xff09;算法的一种&#xff0c;是应用最广泛的聚类算法之一。所谓聚类&#xff0c;即根据相似性原则&#xff0c;将具有较高相似度的数据对象划分至同一类簇&#xff0c;…

精读《精通 console.log》

1 引言 本周精读的文章是 Mastering JS console.log like a Pro&#xff0c;一起来更全面的认识 console 吧&#xff01; 2 概述 & 精读 console 的功能主要在于控制台打印&#xff0c;它可以打印任何字符、对象、甚至 DOM 元素和系统信息&#xff0c;下面一一介绍。 c…

PSCA电源控制集成之电压和电源域边界

电压域之间的跨越必须是异步的。电源域之间的跨越可以是同步的&#xff0c;也可以是异步的。 在电压域或异步电源域之间的边界处&#xff0c;需要使用域桥来实现所需的协议。 对于电压域之间的边界&#xff0c;或者是异步电源域之间的边界&#xff0c;域桥被分割成两半&#…

基于springboot的七彩云南文化旅游网站的设计与实现(论文+源码)_kaic

摘 要 传统办法管理信息首先需要花费的时间比较多&#xff0c;其次数据出错率比较高&#xff0c;而且对错误的数据进行更改也比较困难&#xff0c;最后&#xff0c;检索数据费事费力。因此&#xff0c;在计算机上安装七彩云南文化旅游网站软件来发挥其高效地信息处理的作用&am…

Linux系列

安装系列 1.MySQL安装 我们要通过rpm&#xff0c;进行MySQL数据库的安装&#xff0c;主要的步骤如下&#xff1a; rpm -qa 查询当前系统中安装的所有软件 rpm -qa | grep mysql 查询当前系统中安装的名称带mysql的软件 rpm -…

七月论文审稿GPT第3.2版和第3.5版:通过paper-review数据集分别微调Mistral、gemma

前言 我司第二项目组一直在迭代论文审稿GPT(对应的第二项目组成员除我之外&#xff0c;包括&#xff1a;阿荀、阿李、鸿飞、文弱等人)&#xff0c;比如 七月论文审稿GPT第1版&#xff1a;通过3万多篇paper和10多万的review数据微调RWKV七月论文审稿GPT第2版&#xff1a;用一万…

Android Kotlin知识汇总(三)Kotlin 协程

Kotlin的重要优势及特点之——结构化并发 Kotlin 协程让异步代码像阻塞代码一样易于使用。协程可大幅简化后台任务管理&#xff0c;例如网络调用、本地数据访问等任务的管理。本主题介绍如何使用 Kotlin 协程解决以下问题&#xff0c;从而让您能够编写出更清晰、更简洁的应用代…

【蓝桥杯选拔赛真题67】python奇偶数位相乘 第十五届青少年组蓝桥杯python选拔赛真题 算法思维真题解析

目录 python奇偶数位相乘 一、题目要求 1、编程实现 2、输入输出 二、算法分析 三、程序编写 四、程序说明 五、运行结果 六、考点分析 七、 推荐资料 1、蓝桥杯比赛 2、考级资料 3、其它资料 python奇偶数位相乘 第十五届蓝桥杯青少年组python比赛选拔赛真题 一…

Qt教程 — 3.4 深入了解Qt 控件:Input Widgets部件(3)

目录 1 Input Widgets简介 2 如何使用Input Widgets部件 2.1 Dial 组件-模拟车速表 2.2 QScrollBar组件-创建水平和垂直滚动条 2.3 QSlider组件-创建水平和垂直滑动条 2.4 QKeySequenceEdit组件-捕获键盘快捷键 Input Widgets部件部件较多&#xff0c;将分为三篇文章介绍…

前端和后端权限控制【笔记】

前端权限设置【笔记】 前言版权推荐前端权限设置需求效果实现资源 后端权限控制1.给所有前端请求都携带token2.添加拦截器3.配置到WebMvcConfiguration4.更多的权限验证 最后 前言 2024-3-15 18:27:26 以下内容源自《【笔记】》 仅供学习交流使用 版权 禁止其他平台发布时删…

鸿蒙Harmony应用开发—ArkTS声明式开发(基础手势:NavDestination)

作为子页面的根容器&#xff0c;用于显示Navigation的内容区。 说明&#xff1a; 该组件从API Version 9开始支持。后续版本如有新增内容&#xff0c;则采用上角标单独标记该内容的起始版本。 该组件从API Version 11开始默认支持安全区避让特性(默认值为&#xff1a;expandSaf…

ubuntu安装docker的详细教程

检查卸载老版本docker ubuntu下自带了docker的库,不需要添加新的源。 但是ubuntu自带的docker版本太低,需要先卸载旧的再安装新的。 注:docker的旧版本不一定被称为docker,docker.io 或 docker-engine也有可能,所以卸载的命令为: sudo apt-get remove -y docker docke…

部署prometheus+Grafana可视化仪表盘监控服务

一、部署prometheus及监控仪表盘 简介 Prometheus是开源监控报警系统和时序列数据库(TSDB)。 Prometheus的基本原理是通过HTTP协议周期性抓取被监控组件的状态&#xff0c;任意组件只要提供对应的HTTP接口就可以接入监控&#xff0c;输出被监控组件信息的HTTP接口被叫做expo…

论文阅读——VSA

VSA: Learning Varied-Size Window Attention in Vision Transformers 方法&#xff1a; 给定输入特征X&#xff0c;VSA首先按照基线方法的例程&#xff0c;将这些标记划分为几个窗口Xw&#xff0c;窗口大小为预定义的w。我们将这些窗口称为默认窗口&#xff0c;并从默认窗口中…

easyExcel 导入、导出Excel 封装公共的方法

文档包含三部分功能 1、easyExcel 公共导出list<对象>方法&#xff0c;可以自定义excel中第一行和样式 2、easyExcel 导入逻辑&#xff0c;结合spring Validator 验证导入数据是否符合规范 3、easyExcel 自定义导出 list<map> 、 list<对象> &#xff08;可…