aws msk加密方式和问控制连接方式

msk加密方式

msk提供了两种加密方式

  • 静态加密
  • 传输中加密

创建集群时可以指定加密方式,参数如下

aws kafka create-cluster --cluster-name "ExampleClusterName" --broker-node-group-info file://brokernodegroupinfo.json --encryption-info file://encryptioninfo.json --kafka-version "{YOUR MSK VERSION}" --number-of-broker-nodes 3// encryptioninfo.json
{"EncryptionAtRest": {"DataVolumeKMSKeyId": "arn:aws:kms:us-east-1:123456789012:key/abcdabcd-1234-abcd-1234-abcd123e8e8e"},"EncryptionInTransit": {"InCluster": true,"ClientBroker": "TLS"}
}

查看证书位置

$ pwd
/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.372.b07-1.amzn2.0.1.x86_64/jre/lib/security
$ ls -al ../../../../../../../etc/pki/java/cacerts
lrwxrwxrwx 1 root root 40 May  8 09:44 ../../../../../../../etc/pki/java/cacerts -> /etc/pki/ca-trust/extracted/java/cacerts
$ cp /etc/pki/ca-trust/extracted/java/cacerts /tmp/kafka.client.truststore.jks

测试tls加密,创建client.properties

security.protocol=SSL
ssl.truststore.location=/tmp/kafka.client.truststore.jks

列出端点

$ aws kafka get-bootstrap-brokers --cluster-arn arn:aws-cn:kafka:cn-north-1:037047667284:cluster/mytest/93d5cf51-9e82-4049-a4bc-cefb6bd61716-3
{"BootstrapBrokerString": "b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9092,b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9092,b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9092","BootstrapBrokerStringTls": "b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9094,b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9094,b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9094","BootstrapBrokerStringSaslScram": "b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9096,b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9096,b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9096","BootstrapBrokerStringSaslIam": "b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098,b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098,b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098"
}

测试tls连接

$ ./kafka-topics.sh --bootstrap-server b-2.test320t.ivec50.c3.kafka.cn-north-1.amazonaws.com.cn:9094,b-1.test320t.ivec50.c3.kafka.cn-north-1.amazonaws.com.cn:9094 --command-config client.properties --list
__amazon_msk_canary
__consumer_offsets
first# 连接string端点报错
$ ./kafka-topics.sh --bootstrap-server b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9092 --command-config client.properties --list[2023-07-20 12:40:04,944] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn/172.31.28.80:9092) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue. (org.apache.kafka.clients.NetworkClient)

指定iam的客户端配置,之后连接tls端口会报错

  • 可见这个tls broker连接仅仅是给Unauthenticated用的,并且如果开了iam认证会失败
./kafka-topics.sh --bootstrap-server b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9094 --command-config client.properties --list[2023-07-20 12:26:31,401] ERROR [AdminClient clientId=adminclient-1] Connection to node -1 (b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn/172.31.14.174:9094) failed authentication due to: Unexpected handshake request with client mechanism AWS_MSK_IAM, enabled mechanisms are [] (org.apache.kafka.clients.NetworkClient)
[2023-07-20 12:26:31,403] WARN [AdminClient clientId=adminclient-1] Metadata update failed due to authentication error (org.apache.kafka.clients.admin.internals.AdminMetadataManager)

msk访问控制

https://docs.amazonaws.cn/msk/latest/developerguide/kafka_apis_iam.html

kafka客户端配置,https://kafka.apache.org/documentation/#security_configclients

msk可选的访问控制/加密组合如下,访问控制方式决定了能够选择的加密方式

AuthenticationClient-broker encryption optionsBroker-broker encryption
UnauthenticatedTLS, PLAINTEXT, TLS_PLAINTEXTCan be on or off
mTLSTLS, TLS_PLAINTEXTMust be on
SASL/SCRAMTLSMust be on
SASL/IAMTLSMust be on

集群完毕后提供了多种连接终端节点

端口信息,https://docs.amazonaws.cn/en_us/msk/latest/developerguide/port-info.html

在这里插入图片描述

plaintext

采取Unauthenticated方式,客户端使用PLAINTEXT

在这里插入图片描述

查找bootstrap-server端点

在这里插入图片描述

(可选)在bin/client.properties中加入客户端配置

security.protocol=PLAINTEXT

测试连接,不需要特意配置tls连接

./bin/kafka-topics.sh --bootstrap-server b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9092,b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9092,b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9092 --list

java客户端开启ssl连接

// 开启 tls 连接
properties.put("security.protocol", "SSL");
properties.put("sasl.mechanism", "SCRAM-SHA-512");// 创建kafka对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

IAM认证

msk对kafka的源码进行了修改,允许使用iam进行认证,访问事件会发送到cloudtrail中。注意事项

  • 不适用于zk节点

  • 开启iam认证后,allow.everyone.if.no.acl.found配置无效

  • 使用iam认证后创建的kafka acl(存储在zk中),对iam认证无效

  • client和broker之间必须启用tls加密

  • 和连接kafka相关的权限以kafka-cluster作为前缀,https://docs.amazonaws.cn/en_us/msk/latest/developerguide/iam-access-control.html

  • 需要使用9098和9198端口

shell连接

需要在客户端配置如下参数

# config/client.properties
# ssl.truststore.location=<PATH_TO_TRUST_STORE_FILE> # if don't specify a value for ssl.truststore.location, the Java process uses the default certificate.
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
awsProfileName="admin";

下载客户端依赖jar到/libs目录下

https://github.com/aws/aws-msk-iam-auth/releases

aws s3 cp s3://zhaojiew/software/aws-msk-iam-auth-1.1.7-all.jar .

测试连接

./bin/kafka-topics.sh --bootstrap-server b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098 --list

可能出现以下报错

./bin/kafka-topics.sh --bootstrap-server b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098 --list                     Error while executing topic command : Call(callName=listTopics, deadlineMs=1689850718887, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s)
[2023-07-20 10:57:39,293] ERROR org.apache.kafka.common.errors.TimeoutException: Call(callName=listTopics, deadlineMs=1689850718887, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited. Call: listTopics(kafka.admin.TopicCommand$)
[2023-07-20 10:57:39,316] ERROR Uncaught exception in thread 'kafka-admin-client-thread | adminclient-1': (org.apache.kafka.common.utils.KafkaThread)
java.lang.OutOfMemoryError: Java heap spaceat java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:113)at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:452)at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:402)at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:674)at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:576)at org.apache.kafka.common.network.Selector.poll(Selector.java:481)at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1333)at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1264)at java.lang.Thread.run(Thread.java:750)

指定client配置后成功连接

  • .aws/config中的profile需要写成[profile prod]
./kafka-topics.sh --bootstrap-server b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098  --command-config client.properties --list
[2023-07-20 11:21:06,517] WARN The configuration 'awsProfileName' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
__amazon_msk_canary
__consumer_offsets

创建topic

./kafka-topics.sh --bootstrap-server b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098  --command-config client.properties --topic first --create --partitions 2 --replication-factor 2

发送消息

./kafka-console-producer.sh --bootstrap-server b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098  --producer.config client.properties --topic first

消费信息

./kafka-console-consumer.sh --bootstrap-server b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098 --consumer.config client.properties --from-beginning --topic first

java代码连接

加入依赖

<dependency><groupId>software.amazon.msk</groupId><artifactId>aws-msk-iam-auth</artifactId><version>1.0.0</version>
</dependency>
// 完整配置
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "AWS_MSK_IAM");
properties.put("sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;");
properties.put("sasl.client.callback.handler.class",IAMClientCallbackHandler.class.getName());
properties.put("awsProfileName","admin");

相关报错

// 没有导上面包的报错如下
Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: No LoginModule found for software.amazon.msk.auth.iam.IAMLoginModuleat org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:184)at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81)at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:448)at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:429)... 4 more// 如果没有找到凭证
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Bootstrap broker b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098 (id: -1 rack: null) disconnected
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Failed authentication with b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn/172.31.28.80 (An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: Failed to find AWS IAM Credentials [Caused by com.amazonaws.SdkClientException: Unable to load AWS credentials from any provider in the chain: [com.amazonaws.auth.DefaultAWSCredentialsProviderChain@7e8d5309: Unable to load AWS credentials from any provider in the chain: [EnvironmentVariableCredentialsProvider: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY)), SystemPropertiesCredentialsProvider: Unable to load AWS credentials from Java system properties (aws.accessKeyId and aws.secretKey), WebIdentityTokenCredentialsProvider: To use assume role profiles the aws-java-sdk-sts module must be on the class path.,

mTLS

目前中国区不可用,需要依赖于Private CA

SASL/SCRAM

https://docs.amazonaws.cn/en_us/msk/latest/developerguide/msk-password.html

使用secret manager保存username和password

在这里插入图片描述

创建secret

  • 名称必须以AmazonMSK_开头

  • 不能使用默认kms加密secret

    在这里插入图片描述

  • 密钥内容必须为以下格式

    {"username": "alice","password": "alice-secret"
    }
    

    在这里插入图片描述

shell连接

创建配置文件users_jaas.conf,导出为环境变量

# KafkaClient首字母大写
cat > /tmp/users_jaas.conf << EOF
KafkaClient {org.apache.kafka.common.security.scram.ScramLoginModule requiredusername="alice"password="alice-secret";
};
EOFexport KAFKA_OPTS=-Djava.security.auth.login.config=/tmp/users_jaas.conf

bin目录下创建客户端配置文件

cat > client_sasl.properties << EOF
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
ssl.truststore.location=/tmp/kafka.client.truststore.jks
EOF

链接集群

./kafka-topics.sh --bootstrap-server b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9096  --command-config client_sasl.properties --list

相关报错

# 密码错误
[2023-07-21 09:40:44,467] ERROR [AdminClient clientId=adminclient-1] Connection to node -1 (b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn/172.31.23.61:9096) failed authentication due to: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512

java代码连接

java代码连接配置

System.setProperty("java.security.auth.login.config", "/tmp/users_jaas.conf");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "SCRAM-SHA-512"); //仅支持SCRAM-SHA-512

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/331484.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【基于springboot+vue的房屋租赁系统】

介绍 本系统是基于springbootvue的房屋租赁系统&#xff0c;数据库为mysql&#xff0c;可用于日常学习和毕设&#xff0c;系统分为管理员、房东、用户&#xff0c;部分截图如下所示&#xff1a; 部分界面截图 用户 管理员 联系我 微信&#xff1a;Zzllh_

Wpf 使用 Prism 实战开发Day24

自定义询问窗口 当需要关闭系统或进行删除数据或进行其他操作的时候&#xff0c;需要询问用户是否要执行对应的操作。那么就需要一个弹窗来给用户进行提示。 一.添加自定义询问窗口视图 (MsgView.xaml) 1.首先&#xff0c;添加一个自定义询问窗口视图 (MsgView.xaml) <Use…

qmt量化教程4----订阅全推数据

文章链接 qmt量化教程4----订阅全推数据 (qq.com) 上次写了订阅单股数据的教程 量化教程3---miniqmt当作第三方库设置&#xff0c;提供源代码 全推就主动推送&#xff0c;当行情有变化就会触发回调函数&#xff0c;推送实时数据&#xff0c;可以理解为数据驱动类型&#xff0…

使用 Flask 和 Celery 构建异步任务处理应用

文章目录 什么是 Flask&#xff1f;什么是 Celery&#xff1f;如何在 Flask 中使用 Celery&#xff1f;步骤 1&#xff1a;安装 Flask 和 Celery步骤 2&#xff1a;创建 Flask 应用程序步骤 3&#xff1a;运行 Celery Worker步骤 4&#xff1a;启动 Flask 应用程序 结论 在构建…

C# NetworkStream 流的详解与示例

文章目录 一、NetworkStream类的基本概念1.1 NetworkStream类概述1.2 NetworkStream类属性1.3 NetworkStream类方法 二、NetworkStream的连接方式三、NetworkStream的传输模式四、NetworkStream类示例服务器端代码&#xff1a;客户端代码&#xff1a; 五、总结 在C#中&#xff…

刷代码随想录有感(77):回溯算法——含有重复元素的全排列

题干&#xff1a; 代码&#xff1a; class Solution { public:vector<int> tmp;vector<vector<int>> res;void backtracking(vector<int> nums, vector<int> used){if(tmp.size() nums.size()){res.push_back(tmp);return;}sort(nums.begin(),…

iCloud 照片到 Android 指南:帮助您快速将照片从 iCloud 传输到安卓手机

​ 概括 iOS 和 Android 之间的传输是一个复杂的老问题。将 iCloud 照片传输到 Android 似乎是不可能的。放心。现在的高科技已经解决了这个问题。尽管 Apple 和 Android 不提供传输工具&#xff0c;但您仍然有其他有用的选项。这篇文章与您分享了 5 个技巧。因此&#xff0c;…

云部署最简单python web

最近在玩云主机&#xff0c;考虑将简单的web应用装上去&#xff0c;通过广域网访问一下&#xff0c;代码很简单&#xff0c;所以新手几乎不会碰到什么问题。 from flask import Flaskapp Flask(__name__)app.route(/) def hello_world():return Hello, World!app.route(/gree…

plsql 学习

过程化编程语言 赋值&#xff1a;&#xff1a; ||&#xff1a;连接符号 dbms_output.put_line() :输出的语句 var_name ACCOUNTLIBRARY.USERNAME%type; 变量名&#xff1b;某个表的数据类型&#xff1b;赋值给变量名 用下面的方法更好用 异常exception 循…

Linux网络编程:HTTP协议

前言&#xff1a; 我们知道OSI模型上层分为应用层、会话层和表示层&#xff0c;我们接下来要讲的是主流的应用层协议HTTP&#xff0c;为什么需要这个协议呢&#xff0c;因为在应用层由于操作系统的不同、开发人员使用的语言类型不同&#xff0c;当我们在传输结构化数据时&…

算法打卡 Day9(字符串KMP 算法)-实现 strStr+ 重复的子字符串

KMP 算法 KMP 算法解决的是字符串匹配的问题&#xff0c;其经典思想是&#xff1a;当出现的字符串不匹配时&#xff0c;可以记录一部分之前已经匹配的文本内容&#xff0c;利用这些信息避免从头再去做匹配。 前缀表 next 数组就是一个前缀表。前缀表是用来回退的&#xff0c…

【启明智显技术分享】SOM2D02-2GW核心板适配ALSA(适用Sigmastar ssd201/202D)

提示&#xff1a;作为Espressif&#xff08;乐鑫科技&#xff09;大中华区合作伙伴及sigmastar&#xff08;厦门星宸&#xff09;VAD合作伙伴&#xff0c;我们不仅用心整理了你在开发过程中可能会遇到的问题以及快速上手的简明教程供开发小伙伴参考。同时也用心整理了乐鑫及星宸…

TypeScript学习日志-第三十二天(infer关键字)

infer关键字 一、作用与使用 infer 的作用就是推导泛型参数&#xff0c;infer 声明只能出现在 extends 子语句中&#xff0c;使用如下&#xff1a; 可以看出 已经推导出类型是 User 了 二、协变 infer 的 协变会返回联合类型&#xff0c;如图&#xff1a; 三、逆变 infer…

【C++】详解AVL树——平衡二叉搜索树

个人主页&#xff1a;东洛的克莱斯韦克-CSDN博客 祝福语&#xff1a;愿你拥抱自由的风 目录 二叉搜索树 AVL树概述 平衡因子 旋转情况分类 左单旋 右单旋 左右双旋 右左双旋 AVL树节点设计 AVL树设计 详解单旋 左单旋 右单旋 详解双旋 左右双旋 平衡因子情况如…

默认路由实现两个网段互通实验

默认路由实现两个网段互通实验 **默认路由&#xff1a;**是一种特殊的静态路由&#xff0c;当路由表中与数据包目的地址没有匹配的表项时&#xff0c;数据包将根据默认路由条目进行转发。默认路由在某些时候是非常有效的&#xff0c;例如在末梢网络中&#xff0c;默认路由可以…

ant design pro 6.0搭建教程

一、搭建 环境&#xff1a; Node.js 18.16.1 ant design pro 6.0 注意&#xff1a;选择umi3时&#xff0c;使用node.js 18版本的会报错&#xff0c;可以实践一下&#xff0c;这里就不再进行实践了。 umi3需要版本是低于node.js 18的 node下载地址&#xff1a; https://nodejs.…

韭菜的自我总结

韭菜的自我总结 股市技术面量价关系左侧右侧右侧技术左侧技术洗盘 韭菜的自我修养虚拟货币的启示韭菜的买入时机韭菜的心理压力成为优秀玩家的关键 股市技术面 技术面分析可以作为买卖时机判定的工具&#xff0c;但是投资还是需要基本面的分析作为支撑。也就是基本面选股&…

【C++】C++的心脏:深入理解内存管理中的 new 和 delete

欢迎来到CILMY23的博客 &#x1f3c6;本篇主题为&#xff1a; C的心脏&#xff1a;深入理解内存管理中的 new 和 delete &#x1f3c6;个人主页&#xff1a;CILMY23-CSDN博客 &#x1f3c6;系列专栏&#xff1a;Python | C | C语言 | 数据结构与算法 | 贪心算法 | Linux &a…

【C++进阶】AVL树

0.前言 前面我们已经学习过二叉搜索树了&#xff0c;但如果我们是用二叉搜索树来封装map和set等关联式容器是有缺陷的&#xff0c;很可能会退化为单分支的情况&#xff0c;那样效率就极低了&#xff0c;那么有没有方法来弥补二叉搜索树的缺陷呢&#xff1f; 那么AVL树就出现了&…

【开源】多语言大型语言模型的革新:百亿参数模型超越千亿参数性能

大型人工智能模型&#xff0c;尤其是那些拥有千亿参数的模型&#xff0c;因其出色的商业应用表现而受到市场的青睐。但是&#xff0c;直接通过API使用这些模型可能会带来数据泄露的风险&#xff0c;尤其是当模型提供商如OpenAI等可能涉及数据隐私问题时。私有部署虽然是一个解决…