kafka三节点集群平滑升级过程指导

一、前言

在这里插入图片描述

Apache Kafka作为常用的开源分布式流媒体平台,可以实时发布、订阅、存储和处理数据流,多用于作为消息队列获取实时数据,构建对数据流的变化进行实时反应的应用程序,已被数千家公司用于高性能数据管道、流分析、数据集成和任务关键型应用程序。而其中Apache Kafka Connect 作为 Kafka 中用于和其他数据系统流式传输数据的服务,其独立运行版本可以在 Kafka 发布包中通过 bin/connect-standalone.sh 启动,默认会在 8083 端口开启 HTTP REST API 服务,攻击者可以利用基于SASLJAAS 配置和SASL 协议的任意Kafka客户端,对可对连接器(Connector)的配置进行操作,将连接器中的 Kafka 客户端 sasl.jaas.config 属性值设置为 com.sun.security.auth.module.JndiLoginModule(通过 producer.override.sasl.jaas.config, consumer.override.sasl.jaas.configadmin.override.sasl.jaas.config 属性进行配置)时,如果此时连接器连接到攻击者可控的 LDAP 服务器时容易受到反序列化攻击,也称JNDI 注入来实现远程任意代码执行。云平台中,Kafka Connect 服务通常用于提供 Kafka 数据迁移、数据同步的管道能力,其默认 HTTP API 开放于 8083 端口。

    因现场kafka选用版本较低,安全扫描时触发安全风险告警,低于 Kafka 升级3.4.0版本,涉及【Apache Kafka JNDI注入漏洞(CVE-2023-25194)】漏洞,该漏洞可允许远程代码执行,当攻击者可控制kafka-clients连接时的属性,可通过设置 ’ sasl.jaas.config ’ 属性为 ’ com.sun.security.auth.module.JndiLoginModule ’ 进行JNDI注入或反序列化利用,当JDK版本过低或者存在Gadgets时可导致远程代码执行。现场版本kafka_2.13-2.8.0,java version “1.8.0_361”,sasl.jaas.config 配置采用:

sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule;

☬ 漏洞复现: 执行创建文件,相关安全软件会报:JNDI注入的告警;影响版本: 2.3.0 至 3.3.2 版本Kafka Connect,原则上不影响 Kafka server (broker),但是会级联影响,最好还是升级到3.4.0及以上版本,升级JDK版本,可采用OpenJDK替换,相关经验已验证:OpenJDK1.8.0_362 + Zookeeper3.6.3 + Kafka3.4.0。

在这里插入图片描述

关联资源:官网升级指导、kafka部署快走

二、软件升级

本次软件要升级到3.4.0版本,版本说明见Kafka - Version 3.4.0,升级步骤也可参考官网升级指导。注意这里咱们是从2.1.x升级到3.x,不同于3.x升级,需注意存储 consumer offsets的schema和inter.broker.protocol.version里的版本,一旦升级后不支持降级。Apache Kafka 3.4.0以来,新增了一个系统属性:org.apache.kafka.disallowed.login.modules,用来在SASL JAAS配置中禁用有问题的登录模块,另外默认com.sun.security.auth.module.JndiLoginModule 在该版本中被禁用;另外需注意的是, Kafka 3.0中 Java 8 已注明废弃, 在Apache Kafka 4.0将不再支持;如果启用TLS,Java 11及更高版本的性能会明显更好,因此强烈建议使用它们。对应的zk版本稳定版为 3.5,注意zk需要足够的堆空间(3-5G,看数据量大小);另zk集群不宜过大,尤其是在写操作频繁的使用模式中,意味着会造成大量的集群内通信(写操作和随后的集群成员更新的配额),尽量让ZooKeeper系统尽可能小,并尽可能保持其独立性,以处理负载。

1)滚动升级步骤

1、升级前注意:在待升级节点server.properties文件中添加:inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 3.3, 3.2, etc.),如果是从0.11.0.x或更高版本升级的,并且没有重写message.format.version,那么只需要配置覆写: inter-broker protocol version参数的kafka版本即可,否者还需要设置:log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION,现场我们只需要添加::inter.broker.protocol.version=3.4;

2、对Broker滚动升级,一次升级一个节点或实例:关闭待升级的broker,解压新版本,然后迁移更新配置,重启新的代理,验证数据同步;这时,最新版本的broker程序会运行,之后可以验证kafka集群的业务行为和性能是否符合预期。如果出现任何问题,目前还可以进行降级回滚。Kafka集群的完整升级过程涵盖了broker侧和client侧,因broker是向下兼容的,升级过程中必需先成功升级所有的broker,对于Client(producer 和 consumer)在broker完成升级之后再升级。

3、一旦验证了集群的业务行为和性能满足预期,就可以通过更改协议版本来应用:inter.broker.protocol.version=3.4

4、然后逐个重启kafka brokers,以让inter.broker.protocol.version=3.4生效;这是,就不再支持降级了;

5、最后完成kafka整个集群状态及数据分布验证。inter.broker.protocol.version的值可参考如下,但官网升级指导里看也可直接写成:inter.broker.protocol.version=2.8(针对本次带升级的2.8版本也可),之后再修改为inter.broker.protocol.version=3.4;这种格式是可以的。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

2)关停替换升级

即: 如果可接受停机,可将所有broker关闭后,更新版本替换后重启启动。

3)升级计划或流程

在这里插入图片描述
测试方案:
在这里插入图片描述

2.1、软件下载

#MD5: CF 6B 8B 1C A1 12 9E 69  41 39 92 99 B6 CC 47 8C
wget https://archive.apache.org/dist/kafka/3.4.0/kafka_2.12-3.4.0.tgz
md5sum kafka_2.12-3.4.0.tgz  #输出
cf6b8b1ca1129e6941399299b6cc478c  ./kafka_2.12-3.4.0.tgz

2.2、单节点/实例kafka升级

注意: 停止 Kafka 集群时,一定要等 Kafka 所有节点进程全部停止后再停止 Zookeeper 集群。因为 Zookeeper 集群当中记录着 Kafka 集群相关信息,Zookeeper 集群一旦先停止,Kafka 集群就没有办法再获取停止进程的信息,只能手动杀死 Kafka 进程。特别注意的是,broker之间的通讯协议和message的传输协议要与旧版本的一致,否则升级完的broker会因为通讯协议版本不一致导致节点一直报错(Connection to “broker id” was disconnected before the response was read),即升级新版本需现将旧版本的信息写入新版的配置文件中,以兼容当前(旧版)环境适配。从2.6.0版开始,对于Java 11或更新版本,TLSv1.3是默认启用的。客户机和服务器将协商是否支持TLSv1.3,否则将退回到TLSv1.2。

#1、解压缩新版本kafka
tar -xzf kafka_2.12-3.4.0.tgz
cd ./kafka_2.12-3.4.0/config#2、修改新旧版配置文件,添加版本参数
vim config/server.properties  #新增
inter.broker.protocol.version=0.10.1 #旧版本号#3、旧版查看topic
bin/kafka-topics.sh --bootstrap-server 10.100.1.94:9092 --list

2.3、剩余2节点kafka升级

2.4、集群状态确认

2.5、kafka性能测试

#使用 8 个线程向名为 test-update-perf 的主题发送 500000 条大小为 50000 字节的消息,并将性能统计信息写入位于 ./perf-test 目录中的 CSV 文件中。性能统计信息将每 3000 毫秒报告一次。./bin/kafka-producer-perf-test.sh --messages 500000 --message-size 50000 --topic test-update-perf --threads 8 --broker-list * —show-detailed-stats --csv-reporter-enabled --metrics-dir ./perf-test --reporting-interval 3000#参数说明
--messages 500000:指定要发送的消息总数。
--message-size 50000:指定要发送的每条消息的大小(以字节为单位)。
--topic test-update-perf:指定要发送消息的目标主题。
--threads 8:指定用于发送消息的线程数。
--broker-list *:指定 Kafka 集群中所有代理的列表。星号* 表示所有代理。
--show-detailed-stats:启用详细统计信息的显示,包括每个线程的统计信息。
--csv-reporter-enabled:启用 CSV 报告程序,它将性能统计信息写入 CSV 文件。
--metrics-dir ./perf-test:指定用于存储 CSV 报告程序输出的目录。
--reporting-interval 3000:指定性能统计信息报告的间隔(以毫秒为单位)#示例2:perf-consumer-t4单线程向test-update-perf主题请求消费500000 条大小为 50000 字节的消息,测试性能
/bin/kafka-consumer-perf-test.sh --topic test-update-perf --zookeeper 10.100.1.94:2183 --threads 1 --group perf-consumer-t4 --message-size 50000 --messages 10

三、附录

3.1、Kafka消息发送流程

kafka在消息发送的过程中,涉及到两个线程:main 线程和 Sender 线程。其中,main 线程中会创建了一个队列 RecordAccumulator,main 线程将消息发送给 RecordAccumulator;Sender 线程则不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。示意如下:

在这里插入图片描述

3.2、kafka应用场景

在这里插入图片描述

3.3、kafka架构回顾

在这里插入图片描述

3.4、zookeeper中kafka信息结构

在这里插入图片描述

3.5、Kafka Broker 总工作流程

在这里插入图片描述

3.6、消费者组消费流程

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/265585.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Redis String 类型底层揭秘

目录 前言 String 类型低层数据结构 节省内存的数据结构 前言 Redis 的 string 是个 “万金油” ,这么评价它不为过. 它可以保存Long 类型整数,字符串, 甚至二进制也可以保存。对于key,value 这样的单值,查询以及插…

详解Kotlin中run、with、let、also与apply的使用和区别

Kotlin作为一种现代、静态类型的编程语言,不仅提供了丰富的特性,还提供了极具表现力的函数:run, with, let, also, 和 apply。理解这些函数的不同之处对于编写高效、易于维护的代码至关重要。 函数对比表 函数对象引用返回值使用场景runthi…

猜数游戏(个人学习笔记黑马学习)

案例需求 定义一个数字(1~10,随机产生),通过3次判断来猜出来数字 案例要求: 1.数字随机产生,范围1-10 2.有3次机会猜测数字,通过 3.层嵌套判断实现每次猜不中,会提示大了或小了 提示,通过如下代…

【海贼王的数据航海:利用数据结构成为数据海洋的霸主】链表—单链表

目录 1 -> 链表 1.1 -> 链表的概念及结构 1.2 -> 链表的分类 2 -> 无头单向非循环链表(单链表) 2.1 -> 接口声明 2.2 -> 接口实现 2.2.1 -> 动态申请一个结点 2.2.2 -> 单链表的打印 2.2.3 -> 单链表的尾插 2.2.4 -> 单链表的头插 2.…

消息中间件篇之RabbitMQ-消息不丢失

一、生产者确认机制 RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。 当消息没有到交换机就失败了,就会返回publish-confirm。当消息没有到达MQ时&…

2.27数据结构

1.链队 //link_que.c #include "link_que.h"//创建链队 Q_p create_que() {Q_p q (Q_p)malloc(sizeof(Q));if(qNULL){printf("空间申请失败\n");return NULL;}node_p L(node_p)malloc(sizeof(node));if(LNULL){printf("申请空间失败\n");return…

DETR(1):论文详解

文章目录 1. DETR 模型结构2.损失函数2.1 预测结果和GT 的匹配2.2 训练的loss计算3.实验3.1 大物体表现效果好3.2 Transformer Encoder 和Decoder的作用3.3 object query4. 伪代码5. 结论

【《高性能 MySQL》摘录】第 2 章 MySQL 基准测试

文章目录 2.1 为什么需要基准测试2.2 基准测试的策略2.2.1 测试何种指标 2.3 基准测试方法2.3.1 设计和规划基准测试2.3.2 基准测试应该运行多长时间2.3.3 获取系统性能和状态2.3.4 获得准确的测试结果2.3.5 运行基准测试并分析结果2.3.6 绘图的重要性 2.4 基准测试工具…

IntelliJ IDEA 2023:创新不止步,开发更自由 mac/win版

IntelliJ IDEA 2023激活版是一款强大而智能的集成开发环境(IDE),为开发者提供了一系列先进的功能和工具,帮助他们更高效地编写、调试和测试代码。 IntelliJ IDEA 2023 软件获取 IntelliJ IDEA 2023继承了其前代版本的优秀基因,并在此基础上进…

2月28日代码随想录二叉搜索树中的众数

摸了一个寒假了,得赶一赶了 251.二叉搜索树中的众数 给你一个含重复值的二叉搜索树(BST)的根节点 root ,找出并返回 BST 中的所有 众数(即,出现频率最高的元素)。 如果树中有不止一个众数&am…

虚拟机JVM

虚拟机 1、定义jvm 假想计算机 运行在操作系统之上 和硬件之间没有直接交互 包括 一套字节码指令、寄存器、栈、垃圾回收、堆 一个存储方法域 jvm:承担一个翻译工作,动态的将java代码编译成操作系统可以识别的机器码。 从软件层面屏蔽了不同操作系统在底层硬件与指…

查看cuda和cudnn版本

查看cuda 打开命令提示符(Windows键 R,然后输入cmd并回车)。输入nvcc --version或者nvcc -V来获取Cuda的版本信息。 查看cudnn版本 查看Cudnn版本: 进入Cuda安装目录,通常位于C:\Program Files\NVIDIA GPU Computi…

Doris——荔枝微课统一实时数仓建设实践

目录 一、业务介绍 二、早期架构及痛点 2.1 早期架构 2.2 架构痛点 三、技术选型 四、新的架构及方案 五、搭建经验 5.1 数据建模 5.2 数据开发 5.3 库表设计 5.4 数据管理 5.4.1 监控告警 5.4.2 数据备份与恢复 六、收益总结 七、未来规划 原文大佬这篇Doris腾…

STM32 Cubemx配置SPI编程(使用Flash模块)

文章目录 前言一、W25Q64模块介绍二、STM32Cubemx配置SPI三、SPI HAL库操作函数分析3.1查询方式3.2中断方式 四、Flash时序分析1.读器件ID指令2.写使能3.擦除扇区4.页编程5.读数据6.读状态寄存器 五、Flash驱动程序编写1.代码编写测试 总结 前言 本篇文章来为大家讲解一下Flas…

安装极狐GitLab Runner并测试使用

本文继【新版极狐安装配置详细版】之后继续 1. 添加官方极狐GitLab 仓库: 对于 RHEL/CentOS/Fedora: curl -L "https://packages.gitlab.com/install/repositories/runner/gitlab-runner/script.rpm.sh" | sudo bash2. 安装最新版本的极狐G…

STM32 4位数码管和74HC595

4位数码管 在使用一位数码管的时候,会用到8个IO口,那如果使用4位数码管,难道要使用32个IO口吗?肯定是不行的,太浪费了IO口了。把四个数码管全部接一起共用8个IO口,然后分别给他们一个片选。所以4位数码管共…

GO语言基础总结

多态: 定义一个父类的指针(接口),然后把指针指向子类的实例,再调用这个父类的指针,然后子类的方法被调用了,这就是多态现象。 Golang 高阶 goroutine 。。。。。 channel channel的定义 …

LeetCode59. 螺旋矩阵 II(C++)

LeetCode59. 螺旋矩阵 II 题目链接代码 题目链接 https://leetcode.cn/problems/spiral-matrix-ii/ 代码 class Solution { public:vector<vector<int>> generateMatrix(int n) {vector<vector<int>> res(n, vector<int>(n, 0));int startx …

用 Python 自动化处理无聊的事情

“编程最棒的部分就是看到机器做一些有用的事情而获得的胜利。用 Python 将无聊的事情自动化将所有编程视为这些小小的胜利&#xff1b;它让无聊变得有趣。” Hilary Mason&#xff0c;数据科学家兼 Fast Forward Labs 创始人 “我很享受打破东西然后把它们重新组合起来的乐趣…

Vue+SpringBoot打造音乐偏好度推荐系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、系统设计2.1 功能模块设计2.1.1 音乐档案模块2.1.2 我的喜好模块2.1.3 每日推荐模块2.1.4 通知公告模块 2.2 用例图设计2.3 实体类设计2.4 数据库设计 三、系统展示3.1 登录注册3.2 音乐档案模块3.3 音乐每日推荐模块3.4 通知公告模…