Kafka安全模式之身份认证

一、简介

Kafka作为一个分布式的发布-订阅消息系统,在日常项目中被频繁使用,通常情况下无论是生产者还是消费者只要订阅Topic后,即可进行消息的发送和接收。而kafka在0.9.0.0版本后添加了身份认证和权限控制两种安全服务,本文主要介绍在实际项目使用过程中遇到第三方kafka需身份认证时如何解决,以及对可能会碰到的问题进行总结。

二、原理介绍

Kafka身份认证主要分为以下几种:

(1)客户端与broker之间的连接认证

(2)broker与broker之间的连接认证

(3)broker与zookeeper之间的连接认证

日常项目中,无论是生产者还是消费者,我们都是作为客户端与kafka进行交互,因此使用的最多的是客户端与broker之间的连接认证。图1是客户端与服务端broker之间的认证过程图,客户端提交认证数据,服务端会根据认证数据对当前客户端进行身份校验,校验成功后的客户端即可成功登录kafka,进行后续操作。

图1 客户端与broker之间认证过程图

目前Kafka提供了SASL、SSL、Delegation Tokem三种安全认证机制,而SASL认证又分为了以下几种方式:

(1)基于Kerberos的GSSAPI

SASL-GSSAPI提供了一种非常安全的身份验证方法,但使用前提是企业中有Kerberos基础,一般使用随机密码的keytab认证方式,密码是加密的,在0.9版本中引入,目前是企业中使用最多的认证方式。

(2)SASL-PLAIN

SASL-PLAIN方式是一个经典的用户名/密码的认证方式,其中用户名和密码是以明文形式保存在服务端的JAAS配置文件中的,当客户端使用PLAIN模式进行认证时,密码是明文传输的,因此安全性较低,但好处是足够简单,方便我们对其进行二次开发,在0.10版本引入。

(3)SASL-SCRAM

SASL-SCRAM是针对SASL-PLAIN方式的不足而提供的另一种认证方式,它将用户名/密码存储在zookeeper中,并且可以通过脚本动态增减用户,当客户端使用SCRAM模式进行认证时,密码会经过SHA-256或SHA-512哈希加密后传输到服务器,因此安全性较高,在0.10.2版本中引入。

对Kafka集群来说,要想实现完整的安全模式,首先为集群中的每台机器生成密钥和证书是第一步,其次利用SASL对客户端进行身份验证是第二步,最后对不同客户端进行读写操作的授权是第三步,这些步骤即可以单独运作也可以同时运作,从而提高kafka集群的安全性。

三、具体实现

本文主要介绍作为kafka生产者,如何基于Kerberos进行身份认证给第三方kafka发送数据。

Kerberos主要由三个部分组成:密钥分发中心Key Distribution Center(即KDC)、客户端Client、服务端Service,大致关系图如下图2所示,其中KDC是实现身份认证的核心组件,其包含三个部分:

  1. Kerberos Database:储存用户密码以及其他信息
  2. Authentication Service(AS):进行用户身份信息验证,为客户端提供Ticket Granting Tickets(TGT)
  3. Ticket Granting Service(TGS):验证TGT,为客户端提供Service Tickets

我们作为生产者向第三方kafka发送数据,因此需要第三方提供以下安全认证文件:

  • 用户名principle:标识客户端的用户身份,也即用于登录的用户名
  • 指定用户名对应的秘钥文件xx.keytab:存储了用户的加密密码
  • 指定安全认证的服务配置文件krb5.conf:客户端根据该文件中的信息去访问KDC

获取以上安全认证文件后,即可编写java代码连接第三方kafka,步骤如下:

1、将安全认证文件xx.keytabkrb5.conf放置于某一路径下,确保后续java代码可进行读取

2、添加kafka配置文件,开启安全模式认证,其中kerberos.path是第一步中认证文件所在的目录

3、修改Kafka生产者配置,开启安全连接

4、调用认证工具类进行登录认证

LoginUtil认证工具类的核心是根据第一步中提供的安全认证文件自动生成jaas配置文件,该文件是kafka安全模式下认证的核心。代码如下:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.File;
import java.io.FileWriter;
import java.io.IOException;/*** @ProjectName: stdp-security-demo* @Package: * @ClassName: LoginUtil* @Author: stdp* @Description: ${description}*/
public class LoginUtil {public enum Module {KAFKA("KafkaClient"), ZOOKEEPER("Client");private String name;Module(String name) {this.name = name;}public String getName() {return name;}}private static final Logger LOGGER = LoggerFactory.getLogger(LoginUtil.class);/*** line operator string*/private static final String LINE_SEPARATOR = System.getProperty("line.separator");/*** jaas file postfix*/private static final String JAAS_POSTFIX = ".jaas.conf";private static final String JAVA_SECURITY_KRB5_CONF_KEY = "java.security.krb5.conf";public static final String JAVA_SECURITY_LOGIN_CONF_KEY = "java.security.auth.login.config";private static final String ZOOKEEPER_SERVER_PRINCIPAL_KEY = "zookeeper.server.principal";private static final boolean IS_IBM_JDK = System.getProperty("java.vendor").contains("IBM");/*** oracle jdk login module*/private static final String SUN_LOGIN_MODULE = "com.sun.security.auth.module.Krb5LoginModule required";public synchronized static void login(String userPrincipal, String userKeytabPath, String krb5ConfPath)throws IOException{// 1.check input parametersif ((userPrincipal == null) || (userPrincipal.length() <= 0)){LOGGER.error("input userPrincipal is invalid.");throw new IOException("input userPrincipal is invalid.");}if ((userKeytabPath == null) || (userKeytabPath.length() <= 0)){LOGGER.error("input userKeytabPath is invalid.");throw new IOException("input userKeytabPath is invalid.");}if ((krb5ConfPath == null) || (krb5ConfPath.length() <= 0)){LOGGER.error("input krb5ConfPath is invalid.");throw new IOException("input krb5ConfPath is invalid.");}// 2.check file exsitsFile userKeytabFile = new File(userKeytabPath);if (!userKeytabFile.exists()){LOGGER.error("userKeytabFile(" + userKeytabFile.getAbsolutePath() + ") does not exsit.");throw new IOException("userKeytabFile(" + userKeytabFile.getAbsolutePath() + ") does not exsit.");}if (!userKeytabFile.isFile()){LOGGER.error("userKeytabFile(" + userKeytabFile.getAbsolutePath() + ") is not a file.");throw new IOException("userKeytabFile(" + userKeytabFile.getAbsolutePath() + ") is not a file.");}File krb5ConfFile = new File(krb5ConfPath);if (!krb5ConfFile.exists()){LOGGER.error("krb5ConfFile(" + krb5ConfFile.getAbsolutePath() + ") does not exsit.");throw new IOException("krb5ConfFile(" + krb5ConfFile.getAbsolutePath() + ") does not exsit.");}if (!krb5ConfFile.isFile()){LOGGER.error("krb5ConfFile(" + krb5ConfFile.getAbsolutePath() + ") is not a file.");throw new IOException("krb5ConfFile(" + krb5ConfFile.getAbsolutePath() + ") is not a file.");}// 3.set and check krb5configsetKrb5Config(krb5ConfFile.getAbsolutePath());//        LOGGER.info("check zookeeper server Principal =============================================");setZookeeperServerPrincipal(userPrincipal);
//        LOGGER.info("check jaas.conf +++++++++++++++++++++++++++++++++++++++++++++++++");setJaasFile(userPrincipal,userKeytabPath);LOGGER.info("Login success!!!!!!!!!!!!!!");}public static void setKrb5Config(String krb5ConfigFile) throws IOException {System.setProperty(JAVA_SECURITY_KRB5_CONF_KEY,krb5ConfigFile);String ret = System.getProperty(JAVA_SECURITY_KRB5_CONF_KEY);if (ret == null) {LOGGER.error(JAVA_SECURITY_KRB5_CONF_KEY + " is null.");throw new IOException(JAVA_SECURITY_KRB5_CONF_KEY + " is null.");}if (!ret.equals(krb5ConfigFile)){LOGGER.error(JAVA_SECURITY_KRB5_CONF_KEY + " is " + ret + " is not " + krb5ConfigFile + ".");throw new IOException(JAVA_SECURITY_KRB5_CONF_KEY + " is " + ret + " is not " + krb5ConfigFile + ".");}}public static void setJaasFile(String userPrincipal,String userKeytabPath) throws IOException {String jaasPath = new File(System.getProperty("java.io.tmpdir")) + File.separator + System.getProperty("user.name") + JAAS_POSTFIX;LOGGER.info("jaasPath = {}",jaasPath);//windows路径下分隔符替换jaasPath = jaasPath.replace("\\","\\\\");userKeytabPath = userKeytabPath.replace("\\","\\\\");//删除jaas文件deleteJaasFile(jaasPath);writeJaasFile(jaasPath,userPrincipal,userKeytabPath);System.setProperty(JAVA_SECURITY_LOGIN_CONF_KEY,jaasPath);}private static void deleteJaasFile(String jaasPath) throws IOException {File jaasFile = new File(jaasPath);if (jaasFile.exists()){if (!jaasFile.delete()){throw new IOException("failed to delete exists jaas file.");}}}private static void writeJaasFile(String jaasPath,String userPrincipal,String userKeytabPath) throws IOException {FileWriter writer = new FileWriter(new File(jaasPath));try{writer.write(getJaasConfContext(userPrincipal,userKeytabPath));writer.flush();}catch (IOException e){throw new IOException("Failed to create jaas.conf File.");}finally {writer.close();}}private static String getJaasConfContext(String userPrincipal,String userKeytabPath) throws IOException{Module[] allModule = Module.values();StringBuffer builder = new StringBuffer();for (Module module: allModule){String serviceName = null;if ("Client".equals(module.getName())){serviceName = "zookeeper";}else if ("KafkaClient".equals(module.getName())){serviceName = "kafka";}builder.append(getModuleContext(userPrincipal,userKeytabPath,module,serviceName));}return builder.toString();}private static String getModuleContext(String userPrincipal,String userKeytabPath,Module module,String serviceName) throws IOException {StringBuffer builder = new StringBuffer();if (IS_IBM_JDK){builder.append(module.getName()).append(" {").append(LINE_SEPARATOR);builder.append("credsType=both").append(LINE_SEPARATOR);builder.append("principal=\"" + userPrincipal.trim() + "\"").append(LINE_SEPARATOR);builder.append("useKeytab=\"" + userKeytabPath + "\"").append(LINE_SEPARATOR);builder.append("serviceName=\""+serviceName + "\"").append(LINE_SEPARATOR);builder.append("debug=true;").append(LINE_SEPARATOR);builder.append("};").append(LINE_SEPARATOR);}else {builder.append(module.getName()).append(" {").append(LINE_SEPARATOR);builder.append(SUN_LOGIN_MODULE).append(LINE_SEPARATOR);builder.append("useKeyTab=true").append(LINE_SEPARATOR);builder.append("keyTab=\"" + userKeytabPath + "\"").append(LINE_SEPARATOR);builder.append("principal=\"" + userPrincipal.trim() + "\"").append(LINE_SEPARATOR);builder.append("serviceName=\""+serviceName + "\"").append(LINE_SEPARATOR);builder.append("useTicketCache=false").append(LINE_SEPARATOR);builder.append("storeKey=true").append(LINE_SEPARATOR);builder.append("debug=true;").append(LINE_SEPARATOR);builder.append("};").append(LINE_SEPARATOR);}return builder.toString();}public static void setZookeeperServerPrincipal(String zkServerPrincipal) throws IOException {System.setProperty(ZOOKEEPER_SERVER_PRINCIPAL_KEY,zkServerPrincipal);String ret = System.getProperty(ZOOKEEPER_SERVER_PRINCIPAL_KEY);if (ret == null) {LOGGER.error(ZOOKEEPER_SERVER_PRINCIPAL_KEY + " is null.");throw new IOException(ZOOKEEPER_SERVER_PRINCIPAL_KEY + " is null.");}if (!ret.equals(zkServerPrincipal)){LOGGER.error(ZOOKEEPER_SERVER_PRINCIPAL_KEY + " is " + ret + " is not " + zkServerPrincipal + ".");throw new IOException(ZOOKEEPER_SERVER_PRINCIPAL_KEY + " is " + ret + " is not " + zkServerPrincipal + ".");}}
}

经过以上四步的配置,启动项目后即可自动连接kafka进行身份校验,若登录成功,会输出如下提示信息:Login success,并且会将生成的jaas文件路径打印出来。

四、常见问题

1、认证文件找不到

这是因为步骤1中kerberos.path配置有问题,检查path路径下是否存在认证文件keytab和krb5.conf。

2、 principal和keytab不匹配

不同的用户名对应不同的密码,在身份校验时,需保证用户名principle和密码keytab的一致性,否则无法验证通过。而principal和keytab不匹配可能存在以下两种场景:

  •  配置文件中出现问题:检查kerberos.principle和kerberos.keytab中的用户名(即hkjj)是否一致。

  •  检查生成的jaas文件中用户名和配置的用户名是否相同

如果步骤1检查没用问题,则可根据日志中输出的jaas文件路径查看自动生成的jaas文件中的principal和配置文件中的kerberos.principle是否一致。比如我的这个项目中,就是由于现场技术配置kerberos.principle时后面多打了一个空格,导致自动生成的jaas文件中的principle后多一个空格,因此和keytab认证失败。

为了彻底解决这个误打空格的问题,可以直接修改认证工具类LoginUtil,在生成jaas文件的principle时去掉可能存在的空格。

3、用户密码keytab更新,导致出现checksum failed

这是由于principal对应的密码修改了,但是程序中使用的还是旧的密码,就会出现这个问题。解决办法是找第三方提供principal对应的最新的密码文件keytab

4jaas文件找不到

该问题是由于找不到jaas.conf 这个文件导致的,而基于kerberos认证时一般不会出现,这是因为kerberos认证时jaas文件是由LoginUtil工具类根据安全认证文件自动生成并且存储在指定路径下的。

该问题通常出现在SASL-PLAIN方式的认证中,因为该方式需要添加一个配置参数java.security.auth.login.config来标识jaas文件的路径,如果文件路径出错则会报以上错误。

五、总结

在kafka身份认证的过程中,需要的principal,keytab,ServiceName等信息均配置在jaas文件中,因此保证认证的服务可以读取到正确的文件及正确的配置是kafka安全模式下认证的核心。

基于kerberos认证时,可根据安全认证文件自动生成jaas配置文件,从而保证了密码加密传输,相比于SASL-PLAIN模式更具安全性,并且认证实现过程也较为简单。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/267834.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

逻辑回归与决策边界解析

目录 前言1 逻辑回归基础1.1 Sigmoid函数&#xff1a;打开分类之门1.2 决策函数&#xff1a;划定分类界限1.3 逻辑回归详解 2 决策边界2.1 线性决策边界2.2 非线性决策边界2.3 决策边界的优化 3 应用与实例3.1 垃圾邮件分类&#xff1a;精准过滤3.2 金融欺诈检测&#xff1a;保…

张俊将出席用磁悬浮技术改变生活演讲

演讲嘉宾&#xff1a;张俊 空压机销售总监 亿昇(天津)科技有限公司 演讲题目&#xff1a;用磁悬浮技术改变生活 会议简介 “十四五”规划中提出&#xff0c;提高工业、能源领城智能化与信息化融合&#xff0c;明确“低碳经济”新的战略目标&#xff0c;热能产业是能源产业和…

逆向案例四:360k静态和精灵数据动态AES解密,用js的方法

一、360K 网页链接:https://www.36kr.com/p/2672600261670407 页面中有静态的需要解密的内容&#xff0c;确定html包&#xff0c;确定方法 1.1方法步骤 在下方的搜索中输入decrypt(或者关键字window.initialState &#xff0c;进入js文件 在AES.decrypt处打上断点&#xff0…

Tomcat基础及与Nginx实现动静分离,搭建高效稳定的个人博客系统

目录 引言 一、TOMCAT基础功能 &#xff08;一&#xff09;自动解压war包 &#xff08;二&#xff09;状态页 1.登录状态页 2.远程登录 &#xff08;三&#xff09;服务管理界面 &#xff08;四&#xff09;Host虚拟主机 1.设置虚拟主机 2.建立站点目录与文件 二、实…

【IC前端虚拟项目】inst_buffer子模块DS与RTL编码

【IC前端虚拟项目】数据搬运指令处理模块前端实现虚拟项目说明-CSDN博客 需要说明一下的是,在我所提供的文档体系里,并没有模块的DS文档哈,因为实际项目里我也不怎么写DS毕竟不是每个公司都和HISI一样对文档要求这么严格的。不过作为一个培训的虚拟项目,还是建议在时间充裕…

6U VPX全国产飞腾D2000/8核+复旦微FPGA信息处理主板

产品特性 产品功能 飞腾计算平台&#xff0c;国产化率100% VPX-MPU6503是一款基于飞腾D2000/8核信息处理主板&#xff0c;采用由飞腾D2000处理器飞腾X100桥片的高性能计算机模块&#xff0c;双通道16G贴装内存&#xff0c;板载128G 固态SSD&#xff1b;预留固态盘扩展接口&…

Image Fusion via Vision-Language Model【文献阅读】

阅读目录 文献阅读AbstractIntroduction3. Method3.1. Problem Overview3.2. Fusion via Vision-Language Model 4. Vision-Language Fusion Datasets5. Experiment5.1Infrared and Visible Image Fusion 6. Conclusion个人总结 文献阅读 原文下载&#xff1a;https://arxiv.or…

ScanDomainEuorg:批量查询 eu.org 域名注册情况(附带源码)

引言 eu.org 很长时间都没有审批了&#xff0c;但是我觉得只是时间长短问题&#xff0c;早晚会再次审批的。 既然如此&#xff0c;大可以未雨绸缪一般&#xff0c;趁着大家对其“失望”的时间段&#xff0c;看看有哪些好看的前缀没有被注册。 原理 灵感来源于 域名 .eu.org…

Python推导式大全与实战:精通列表、字典、集合和生成器推导式【第115篇—python:推导式】

Python推导式大全与实战&#xff1a;精通列表、字典、集合和生成器推导式 Python语言以其简洁、优雅的语法而闻名&#xff0c;其中推导式是其独特之处之一。推导式是一种在一行代码中构建数据结构的强大方式&#xff0c;它涵盖了列表、字典、集合和生成器。本篇博客将全面介绍…

解决 MySQL 未运行但锁文件存在的问题

查看mysql状态时&#xff0c;显示错误信息"ERROR! MySQL is not running, but lock file (/var/lock/subsys/mysql) exists"。 解决步骤 1、检查 MySQL 进程是否正在运行 在继续之前&#xff0c;我们首先需要确定 MySQL 进程是否正在运行。我们可以使用以下命令检查…

BUGKU bp

打开环境&#xff0c;他提示了弱密码top1000&#xff0c;随便输入密码123抓包爆破 发现长度都一样&#xff0c;看一下响应发现一段js代码&#xff0c;若r值为{code: bugku10000}&#xff0c;则会返回错误&#xff0c;通过这一句“window.location.href success.php?coder.cod…

JAVAEE初阶 JVM(二)

垃圾回收和双亲委派模型 1.双亲委派模型2.垃圾回收机制(1) 识别垃圾1.引用计数2.可达性分析 (2) 销毁垃圾1.标记清除2.复制算法3.标记整理 3.分代回收 1.双亲委派模型 描述了如何查找.class文件的策略. 同时JVM中有专门进行类加载的操作,有一个模块,叫做类加载器. 上述就是为了…

数据库-第二/三章 关系数据库和标准语言SQL【期末复习|考研复习】

前言 总结整理不易&#xff0c;希望大家点赞收藏。 给大家整理了一下计数据库系统概论中的重点概念&#xff0c;以供大家期末复习和考研复习的时候使用。 参考资料是王珊老师和萨师煊老师的数据库系统概论(第五版)。 文章目录 前言第二、三章 关系数据库和标准语言SQL2.1 关系2…

论文阅读-CheckFreq:频繁、精细的DNN检查点操作。

论文名称&#xff1a;CheckFreq: Frequent, Fine-Grained DNN Checkpointing. 摘要 训练深度神经网络(DNNs)是一项资源密集且耗时的任务。在训练过程中&#xff0c;模型在GPU上进行计算&#xff0c;重复地学习权重&#xff0c;持续多个epoch。学习到的权重存在GPU内存中&…

视频在线压缩

video2edit 一款免费的在线视频编辑软件&#xff0c;可以进行视频合并、视频剪辑、视频压缩以及转换视频格式等。 链接地址&#xff1a;在线视频编辑器和转换器 - 编辑&#xff0c;转换和压缩视频文件 打开视频压缩页面&#xff0c;上传想要压缩视频&#xff0c;支持MP4&…

智能边缘小站 CloudPond(低延迟、高带宽和更好的数据隐私保护)

智能边缘小站 CloudPond(低延迟、高带宽和更好的数据隐私保护) 边缘小站的主要功能是管理用户在线下部署的整机柜设施&#xff0c;一个边缘小站关联一个华为云指定的区域和一个用户指定的场地&#xff0c;相关的资源运行状况监控等。 边缘计算 迈入5G和AI时代&#xff0c;新…

Matlab 机器人工具箱 Link类

文章目录 1 Link类1.1 机械臂Link类1.2 构造函数1.3 信息/显示方法1.4 转换方法1.5 操作方法1.6 测试方法1.7 重载操作1.8 属性(读/写)1.9 例子2 Link.Link2.1 创建机器人连杆对象2.2 OPTIONS2.3 注意2.4 旧语法2.5 例子3 Link的其他函数3.1 Link.A3.2 Link.char3.3 Link.displ…

RFID(Radio Frequency Identification)技术笔记

一、RFID的介绍 RFID&#xff0c;全称为Radio Frequency Identification&#xff0c;即射频识别技术&#xff0c;也常被称为电子标签或无线射频识别。它是一种非接触式的自动识别技术&#xff0c;通过射频信号自动识别目标对象并获取相关数据&#xff0c;识别过程无需人工干预&…

Netty01NIO

NIO基础 NIO &#xff1a;non-blocking io 非阻塞 IO 笔记 www.zgtsky.top 网课&#xff1a;黑马Netty 三大组件 Channel & Buffer channel 有一点类似于 stream&#xff0c;它就是读写数据的双向通道&#xff0c;可以从 channel 将数据读入 buffer&#xff0c;也可以…

电子电气架构——汽车DoIP诊断通信建立流程

电子电气架构——汽车DoIP诊断通信建立流程 我是穿拖鞋的汉子,魔都中坚持长期主义的工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 人们会在生活中不断攻击你。他们的主要武器是向你灌输对自己的怀疑:你的价值、你的能力、你的潜力。他们往往会…