kafka动态认证 自定义认证 安全认证-亲测成功

kafka动态认证 自定义认证 安全认证-亲测成功

背景

Kafka默认是没有安全机制的,一直在裸奔。用户认证功能,是一个成熟组件不可或缺的功能。在0.9版本以前kafka是没有用户认证模块的(或者说只有SSL),好在kafka0.9版本以后逐渐发布了多种用户认证功能,弥补了这一缺陷(这里仅介绍SASL),认证机制是SASL/PLAIN。

kafka下载安装

我这里用windows做的测试,部署到Linux上也是一样

官方下载地址:https://kafka.apache.org/downloads

我这里下载的kafka版本是:kafka_2.12-3.5.0.tgz

直接解压,如下图

在这里插入图片描述

启动zookeeper

这里的zookeeper配置其实没有做任何修改,zookeeper这里不做认证控制。

zookeeper配置文件在kafka_2.12-3.5.0\config\zookeeper.properties下,不用做任何修改

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# 
#    http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=D:\kafka_2.12-3.5.0\zookeeper# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080#authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
#requireClientAuthScheme=sasl
#jaasLoginRenew=3600000

进入kafka主目录,打开cmd

#启动zookeeper
bin\windows\zookeeper-server-start.bat  config\zookeeper.properties

在这里插入图片描述

zookeeper-server-start.bat 启动脚本

@echo off
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements.  See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License.  You may obtain a copy of the License at
rem
rem     http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.IF [%1] EQU [] (echo USAGE: %0 zookeeper.propertiesEXIT /B 1
)rem set KAFKA_OPTS=-Djava.security.auth.login.config=D:\kafka_2.12-3.5.0\config\kafka_zookeeper_jaas.conf
SetLocal
IF ["%KAFKA_LOG4J_OPTS%"] EQU [""] (set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%~dp0../../config/log4j.properties)
IF ["%KAFKA_HEAP_OPTS%"] EQU [""] (set KAFKA_HEAP_OPTS=-Xmx512M -Xms512M
)
"%~dp0kafka-run-class.bat" org.apache.zookeeper.server.quorum.QuorumPeerMain %*
EndLocal

在这里插入图片描述

kafka自定义认证配置

kafka的用户认证,是基于java的jaas。所以我们需要先添加jaas服务端的配置文件。

在kafka_2.12-3.5.0\config目录下新建kafka_jaas.conf 配置信息如下:

KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="admin-liang"user_admin="admin-123456"user_liang="liang-123456";
};

注意最后一个属性后面需要加分号!配置是不难理解的,第一行指定PlainLoginModule,算是声明这是一个SASL/PLAIN的认证类型,如果是其他的,那么就需要reqired其他的类。username和password则是用于集群内部broker的认证用的。

这里会让人疑惑的,应该是user_admin和user_liang这两个属性了。这个其实是用来定义用户名和密码的,形式是这样:user_userName=password。所以这里其实是定义了用户admin和用户liang对应的密码。

这一点可以在源码的PlainServerCallbackHandler类中找到对应的信息,kafka源码中显示,对用户认证的时候,就会到jaas配置文件中,通过user_username属性获取对应username用户的密码,再进行校验。当然这样也导致了该配置文件只有重启才会生效,即无法动态添加用户。

写完配置后,需要在kafka的配置中添加jaas文件的路径。在kafka_2.12-3.5.0/bin/kafka-run-class.sh中,找到下面的配置,修改KAFKA_OPTS到配置信息。如下:

rem Generic jvm settings you want to add
IF ["%KAFKA_OPTS%"] EQU [""] (set KAFKA_OPTS=""
)

将上述到KAFKA_OPTS修改为:

rem Generic jvm settings you want to add
IF ["%KAFKA_OPTS%"] EQU [""] (set KAFKA_OPTS="-Djava.security.auth.login.config=D:\kafka_2.12-3.5.0\config\kafka_jaas.conf"
)

修改Kafka配置文件

配置文件在kafka_2.12-3.5.0\config\server.properties 主要增加如下配置

sasl.enabled.mechanisms = PLAIN
sasl.mechanism.inter.broker.protocol = PLAIN
security.inter.broker.protocol = SASL_PLAINTEXT
listeners = SASL_PLAINTEXT://localhost:9092

其中SASL_PLAINTEXT的意思,是明文传输的意思,如果是SSL,那么应该是SASL_SSL。

这样就算是配置好kafka broker了,接下来启动kafka,观察输出日志,没有错误一般就没问题了。

进入kafka主目录,另外打开一个cmd

#启动kafka
bin\windows\kafka-server-start.bat config\server.properties

在这里插入图片描述
在这里插入图片描述

使用Kafka客户端工具Kafka Tool连接

此时就可以根据上面配置的用户admin和用户liang和相应的密码去连接了

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

其他用户或错误的密码连接就会提示没有权限,用户或密码错误
在这里插入图片描述

动态认证

以上的配置方案除了没有使用SSL加密之外,还存在一个严重的缺陷:用户信息是通过静态配置文件的方式存储的,当对用户信息进行添加、删除和修改的时候都需要重启Kafka集群,而我们知道,作为消息中间件,Kafka的上下游与众多组件相连,重启可能造成数据丢失或重复,Kafka应当尽量避免重启。

如果要动态增加一个用户,得修改kafka_jaas.conf的配置,新增加一个用户,而且还得重启Kafka,这样显然不合适。

解决方案

还好,Kafka允许用户为SASL/PLAIN认证机制提供了自定义的回调函数,如果不希望采用静态配置文件存储用户认证信息的话,只需要编写一个实现了 AuthenticateCallbackHandler 接口的类,然后在配置文件中指明这个类即可,指明的方法为在Kafka配置文件中添加如下内容

listener.name.sasl_plaintext.plain.sasl.server.callback.handler.class=com.liang.kafka.auth.handler.MyPlainServerCallbackHandler

引入相关的maven依赖包,pom添加如下依赖包

        <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.13</artifactId><version>2.8.1</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-cache</artifactId><version>5.7.21</version></dependency>

动态认证的完整代码如下

package com.liang.kafka.auth.handler;import com.alibaba.druid.pool.DruidDataSource;
import com.liang.kafka.auth.util.DataSourceUtil;
import com.liang.kafka.auth.util.PasswordUtil;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.plain.PlainAuthenticateCallback;
import org.apache.kafka.common.security.plain.PlainLoginModule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import javax.security.auth.callback.Callback;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.AppConfigurationEntry;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;import static com.liang.kafka.auth.constants.Constants.*;/***  kafka自定义认证 sasl/plain二次开发*  liang*/
public class MyPlainServerCallbackHandler implements AuthenticateCallbackHandler  {private static final Logger logger = LoggerFactory.getLogger(MyPlainServerCallbackHandler.class);/*** 数据源*/private DruidDataSource dataSource = null;/*** 是否开启数据库验证开关*/private boolean enableDbAuth;private static final String JAAS_USER_PREFIX = "user_";private List<AppConfigurationEntry> jaasConfigEntries;@Overridepublic void configure(Map<String, ?> configs, String mechanism, List<AppConfigurationEntry> jaasConfigEntries) {//jaas配置信息,初始化一次,这就是为什么plain无法添加用户this.jaasConfigEntries = jaasConfigEntries;logger.info("==============configs:{}", JSON.toJSONString(configs));Object endbAuthObject = configs.get(ENABLE_DB_AUTH);if (Objects.isNull(endbAuthObject)) {logger.error("==============缺少开关配置 enable_db_auth!");enableDbAuth = Boolean.FALSE;return;}enableDbAuth = TRUE.equalsIgnoreCase(endbAuthObject.toString());if (!enableDbAuth) {return;}dataSource = DataSourceUtil.getInstance(configs);}//核心类,获取用户密码后,调用authenticate方法@Overridepublic void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {String username = null;for (Callback callback: callbacks) {if (callback instanceof NameCallback)username = ((NameCallback) callback).getDefaultName();else if (callback instanceof PlainAuthenticateCallback) {PlainAuthenticateCallback plainCallback = (PlainAuthenticateCallback) callback;boolean authenticated = authenticate(username, plainCallback.password());plainCallback.authenticated(authenticated);logger.info("===============认证 username:{},result:{}", username, authenticated);} elsethrow new UnsupportedCallbackException(callback);}}//用户密码是通过获取jaas文件的属性,属性名就是JAAS_USER_PREFIX变量当前缀+usernameprotected boolean authenticate(String username, char[] password) throws IOException {if (username == null || password == null) {logger.error("===========用户名或密码为空!");return false;} else {//先读取配置文件里的用户验证String expectedPassword = JaasContext.configEntryOption(jaasConfigEntries,JAAS_USER_PREFIX + username,PlainLoginModule.class.getName());logger.info("===============读取密码 username:{},pwd:{}", username, expectedPassword);boolean jaasUserBool = expectedPassword != null && Arrays.equals(password, expectedPassword.toCharArray());if (jaasUserBool) {return true;}//是否开启数据库验证if (enableDbAuth) {return dbAuthenticate(username, password);}return false;}}protected boolean dbAuthenticate(String usernameInfo, char[] passwordCharArray) throws IOException {String password = new String(passwordCharArray);logger.info("=====================begin dbAuthenticate usernameInfo:{},password:{}", usernameInfo, password);String username = usernameInfo;String userQuery = "select\n" +" u.username, u.password\n" +" from u_user u \n" +" where u.state='1' and u.username=?";Connection conn = null;try {conn = dataSource.getConnection();PreparedStatement statement = conn.prepareStatement(userQuery);statement.setString(1, username);ResultSet resultSet = statement.executeQuery();if (resultSet.next()) {String dbPassword = resultSet.getString("password");Boolean bl = PasswordUtil.matches(password, dbPassword);if (Boolean.TRUE.equals(bl)) {logger.info("=====================密码验证成功username:{}", username);} else {logger.error("=====================密码验证失败username:{}", usernameInfo);}return bl;} else {logger.error("=====================认证失败,username:{} 没有找到", usernameInfo);return false;}} catch (Exception e) {logger.error("=====================数据库查询用户异常{}", e);throw new RuntimeException(e);} finally {if (conn != null) {try {conn.close();} catch (SQLException e) {throw new RuntimeException(e);}}}}@Overridepublic void close() throws KafkaException {if (dataSource != null) {dataSource.close();}}}

获取数据源代码

package com.liang.kafka.auth.util;import com.alibaba.druid.pool.DruidDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.Map;
import java.util.Properties;/*** @author liang* @desc 获取数据源*/
public class DataSourceUtil {private static final Logger LOG = LoggerFactory.getLogger(DataSourceUtil.class);/*** 保证 instance 在所有线程中同步*/private static volatile DruidDataSource dataSource = null;public static synchronized DruidDataSource getInstance(Map<String, ?> configs) {if (dataSource == null || dataSource.isClosed()) {dataSource = initDataSource(configs);}return dataSource;}private static final DruidDataSource initDataSource(final Map<String, ?> configs) {Properties properties = new Properties();for (Map.Entry<String, ?> entry : configs.entrySet()) {if (entry.getKey().startsWith("druid.")) {String key = entry.getKey();String value = (String) entry.getValue();LOG.info("datasource connection config: {}:{}", key, value);properties.setProperty(key, value);}}dataSource = new DruidDataSource();dataSource.configFromPropety(properties);return dataSource;}}

Kafka配置文件中添加数据源的相关配置

enable_db_auth = true
listener.name.sasl_plaintext.plain.sasl.server.callback.handler.class=com.liang.kafka.auth.handler.MyPlainServerCallbackHandler
druid.name = mysql_db
druid.type = com.alibaba.druid.pool.DruidDataSource
druid.url = jdbc:mysql://127.0.0.1:3306/test?useSSL=FALSE&useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&serverTimezone=Asia/Shanghai
druid.username = root
druid.password = root
druid.filters = stat
druid.driverClassName = com.mysql.cj.jdbc.Driver
druid.initialSize = 5
druid.minIdle = 2
druid.maxActive = 50
druid.maxWait = 60000
druid.timeBetweenEvictionRunsMillis = 60000
druid.minEvictableIdleTimeMillis = 300000
druid.validationQuery = SELECT 'x'
druid.testWhileIdle = true
druid.testOnBorrow = false
druid.poolPreparedStatements = false
druid.maxPoolPreparedStatementPerConnectionSize = 20

其中:enable_db_auth来控制是否开启动态认证。

编译打成jar包后,需要放到kafka_2.12-3.5.0\libs目录,还使用了相关的依赖包也要放入
在这里插入图片描述

重启Kafka后生效,Kafka的连接认证就会从数据库去查询,想增加,修改,删除用户,直接在数据库表里操作。

参考链接:
https://www.top8488.top/kafka/458.html/
https://zhuanlan.zhihu.com/p/301343840?utm_medium=social&utm_oi=886243404000944128&utm_id=0
https://www.jianshu.com/p/e4c50e4affb8

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/181809.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【C/C++】虚函数表

class Animal { public:virtual void speak(){cout << "动物在说话" << endl;} };class Cat :public Animal { public://重写 函数返回值类型 函数名 参数列表 完全相同void speak(){cout << "小猫在说话" << endl;} };void DoSpe…

Linux下yum源配置实战

一、Linux下软件包的管理 1、软件安装方式 ① RPM包管理&#xff08;需要单独解决依赖问题&#xff09; ② YUM包管理&#xff08;需要有网络及YUM仓库的支持&#xff0c;会自动从互联网下载软件&#xff0c;自动解决依赖&#xff09; ③ 源码安装&#xff08;安装过程比较…

GEE数据集——2019、2020、2021、2022和2023年全球固定宽带和移动(蜂窝)网络性能Shapefile 格式数据集

全球固定宽带和移动&#xff08;蜂窝&#xff09;网络性能 全球固定宽带和移动&#xff08;蜂窝&#xff09;网络性能&#xff0c;分配给缩放级别 16 网络墨卡托图块&#xff08;赤道处约 610.8 米 x 610.8 米&#xff09;。数据以 Shapefile 格式和 Apache Parquet 格式提供&…

3.线性神经网络-3GPT版

#pic_center R 1 R_1 R1​ R 2 R^2 R2 目录 知识框架No.1 线性回归基础优化算法一、线性回归1、买房案例2、买房模型简化3、线性模型4、神经网络5、损失函数6、训练数据7、参数学习8、显示解9、总结 二、 基础优化算法1、梯度下降2、学习率3、小批量随机梯度下降4、批量大小5、…

项目实战:通过axios加载水果库存系统的首页数据

1、创建静态页面 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>Title</title><link rel"stylesheet" href"style/index.css"><script src"script/axios.mi…

版本控制系统-SVN

SVN Apache Subversion 通常被缩写成 SVN&#xff0c;是一个开放源代码的版本控制系统。 官网&#xff1a;https://subversion.apache.org 资料&#xff1a;https://svnbook.red-bean.com、https://www.runoob.com/svn/svn-tutorial.html 下载&#xff1a;https://sourceforg…

5.数据表基本操作

目录 1.创建数据表 创建数据表的语法格式&#xff1a; 查看当前数据库的表&#xff1a; 主键 1.单字段主键 (1)在定义列的同时指定主键&#xff0c;语法规则如下&#xff1a; (2)在定义完所有列之后指定主键。 2.多字段联合主键 外键&#xff1a; 非空约束&#xff1…

周报4_YMK

FlashAttention 硬件知识 以 A100 (40GB HBM) 为例&#xff0c;下面显示其内存层次结构的粗略图。SRAM内存分布在108个流式多处理器(SMs)上&#xff0c;每个处理器192KB。片上SRAM比HBM快得多&#xff0c;但比HBM小得多&#xff0c;在计算方面&#xff0c;使用Tensor Core的B…

【漏洞复现】Aapache_Tomcat_AJP协议_文件包含漏洞(CVE-2020-1938)

感谢互联网提供分享知识与智慧&#xff0c;在法治的社会里&#xff0c;请遵守有关法律法规 文章目录 1.1、漏洞描述1.2、漏洞等级1.3、影响版本1.4、漏洞复现1、基础环境2、漏洞扫描3、漏洞验证 说明内容漏洞编号CVE-2020-1938漏洞名称Aapache_Tomcat_AJP文件包含漏洞漏洞评级高…

thinkPHP5怎么打开页面调试,查看网页运行时间

开启trace 在config.php中找到 ‘app_trace’ > false, 修改为&#xff1a; ‘app_trace’ > true,

环形链表和相交链表OJ题

环形链表和相交链表OJ题 这篇博客详细讲解了环形数组和相交数组的问题 文章目录 环形链表和相交链表OJ题一、环形链表e.g.1e.g.2 二、相交链表 一、环形链表 环形列表是指尾结点next不指向NULL了&#xff0c;而是指向包括自己的前面任意一个结点。 e.g.1 题目及要求&#xf…

线程的创建、等待、退出

多线程开发在Linux平台上已经有成熟的pthread库支持&#xff0c;所以使用pthread库在编译时要加上-pthread。其设计的多线程开发的基本概念主要包含3点&#xff1a;线程、互斥锁、条件。其中线程操作又分线程的创建、退出、等待三种。互斥锁包含4种操作&#xff0c;分别是创建、…

【Windows】Google和火狐浏览器禁用更新的操作方式

想必很多网民常用的浏览器是Edge&#xff0c;Google&#xff0c;火狐这三种&#xff0c;但是浏览器都有后台自动更新&#xff0c;更新提示会一直显示&#xff0c;要用户去点击才关掉&#xff0c;有点强迫症的用户就会想要把它一直关掉&#xff0c;可每次打开都关不掉&#xff0…

中期科技:智慧公厕打造智能化城市设施,提升公共厕所管理与服务体验

智慧公厕是利用先进的技术和创新的解决方案来改进公厕的设施和管理。借助物联网、互联网、5G/4G通信、人工智能、大数据、云计算等新兴技术的集成&#xff0c;智慧公厕具备了一系列令人惊叹的应用功能。从监测公厕内部人体活动状态、人体存在状态&#xff0c;到空气质量情况、环…

软件开发项目文档系列之十如何撰写测试用例

目录 1 概述1.1 编写目的1.2 定义1.3 使用范围1.4 参考资料1.5 术语定义 2 测试用例2.1 功能测试2.1.1 用户登录功能2.1.2 商品搜索功能 2.2 性能测试2.2.1 网站响应时间2.2.2 并发用户测试 附件&#xff1a; 测试用例撰写的要素和注意事项附件1 测试用例要素附件2 测试用例的注…

虹科示波器 | 汽车免拆检修 | 2010款江铃陆风X8车发动机怠速抖动、加速无力

一、故障现象 一辆2010款江铃陆风X8车&#xff0c;搭载4G6GS4N发动机&#xff0c;累计行驶里程约为20万km。该车在其他修理厂进行发动机大修&#xff0c;维修后试车&#xff0c;发动机怠速抖动、加速无力。用故障检测仪检测&#xff0c;发动机控制模块&#xff08;ECM&#xff…

时间序列预测模型实战案例(八)(Informer)BestPaper论文模型Informer代码实战讲解

论文地址->Informer论文地址PDF点击即可阅读 代码地址-> 论文官方代码地址点击即可跳转下载GIthub链接 本文介绍 本篇博客带大家看的是Informer模型进行时间序列预测的实战案例&#xff0c;它是在2019年被提出并在ICLR 2020上被评为Best Paper&#xff0c;可以说Inform…

postman做接口测试

之前搞自动化接口测试&#xff0c;由于接口的特性&#xff0c;要验证接口返回xml中的数据&#xff0c;所以没找到合适的轮子&#xff0c;就自己用requests造了个轮子&#xff0c;用着也还行&#xff0c;不过就是case管理有些麻烦&#xff0c;近几天又回头看了看postman也可以玩…

【漏洞复现】Metinfo6.0.0任意文件读取漏洞复现

感谢互联网提供分享知识与智慧&#xff0c;在法治的社会里&#xff0c;请遵守有关法律法规 文章目录 1.1、漏洞描述1.2、漏洞等级1.3、影响版本1.4、漏洞复现代码审计漏洞点 1.5、深度利用EXP编写 1.6、漏洞挖掘1.7修复建议 1.1、漏洞描述 漏洞名称&#xff1a;MetInfo任意文件…

强大的pdf编辑软件:Acrobat Pro DC 2023中文

Acrobat Pro DC 2023是一款强大的PDF编辑和管理软件&#xff0c;它提供了广泛的功能&#xff0c;使用户能够轻松创建、编辑、转换和共享PDF文档。通过直观的界面和先进的工具&#xff0c;用户可以快速进行文本编辑、图像调整、页面管理等操作&#xff0c;同时支持OCR技术&#…