自定义 Kafka 脚本 kf-use.sh 的解析与功能与应用示例

Kafka:分布式消息系统的核心原理与安装部署-CSDN博客

自定义 Kafka 脚本 kf-use.sh 的解析与功能与应用示例-CSDN博客

Kafka 生产者全面解析:从基础原理到高级实践-CSDN博客

Kafka 生产者优化与数据处理经验-CSDN博客

Kafka 工作流程解析:从 Broker 工作原理、节点的服役、退役、副本的生成到数据存储与读写优化-CSDN博客

Kafka 消费者全面解析:原理、消费者 API 与Offset 位移-CSDN博客

Kafka 分区分配及再平衡策略深度解析与消费者事务和数据积压的简单介绍-CSDN博客

Kafka 数据倾斜:原因、影响与解决方案-CSDN博客

Kafka 核心要点解析_kafka mirrok-CSDN博客

Kafka 核心问题深度解析:全面理解分布式消息队列的关键要点_kafka队列日志-CSDN博客

目录

一、脚本功能概述

二、生产者性能测试

三、消费者性能测试

四、查看可用主题列表

五、脚本使用示例

六、脚本源码

七、总结


        在大数据处理的领域中,Kafka 扮演着极为重要的角色,它作为一个分布式流处理平台,能够高效地处理大规模的实时数据。而 Kafka 提供了一系列的脚本工具,帮助我们更好地管理和测试 Kafka 集群。今天,我们就来深入探讨一个自定义的 Kafka 脚本 kf-use.sh,了解其功能与使用场景。

一、脚本功能概述

        kf-use.sh 脚本主要提供了两个核心功能:生产者性能测试和消费者性能测试,同时还具备查看可用主题列表以及退出脚本等功能。通过这些功能,我们可以对 Kafka 集群的生产和消费能力进行评估,以便优化集群配置和数据处理流程。

二、生产者性能测试

  1. 参数输入
    • 当选择生产者性能测试功能时,脚本首先会提示用户输入主题名称。如果输入的主题不存在,脚本会要求用户重新输入,确保测试的主题是有效的。
    • 接着,用户需要输入要生产的记录数量,并且脚本会验证输入是否为整数,如果不是则提示重新输入。
    • 最后,用户要输入每条记录的大小(单位为字节),同样会进行整数验证。
  2. 性能测试执行
    • 一旦用户输入了正确的参数,脚本会调用 kafka-producer-perf-test.sh 工具,并传入相应的参数,如主题名称、记录数量、记录大小等,同时设置吞吐量为 -1(表示不限制吞吐量),并指定 Kafka 集群的地址 bigdata01:9092。这样就可以开始对生产者的性能进行测试,测试结果将反映出在给定条件下生产者向指定主题发送数据的效率。

三、消费者性能测试

  1. 主题选择
    • 在消费者性能测试功能中,首先会调用 kafka-topics.sh 工具列出当前可用的主题列表,然后提示用户输入要测试的主题名称。如果输入的主题不存在,脚本会要求重新输入。
  2. 性能测试执行
    • 当用户选择了存在的主题后,脚本会调用 kafka-consumer-perf-test.sh 工具,传入 Kafka 集群地址 bigdata01:9092、主题名称以及要消费的消息数量(这里固定为 100000 条),从而对消费者从指定主题消费数据的性能进行测试,测试结果可以帮助我们了解消费者处理数据的速度和效率。

四、查看可用主题列表

        无论是在生产者还是消费者性能测试功能中,都提供了查看可用主题列表的选项。通过调用 kafka-topics.sh 工具并传入集群地址 bigdata01:9092,可以获取当前 Kafka 集群中所有的主题名称,方便用户了解集群中的数据主题情况,以便做出正确的测试选择。

五、脚本使用示例

假设我们要对一个名为 test_topic 的主题进行生产者性能测试,我们可以按照以下步骤操作:

  1. 运行 kf-use.sh 脚本。
  2. 选择生产者性能测试功能(可能是对应的数字选项,如 1)。
  3. 输入主题名称 test_topic
  4. 输入要生产的记录数量,例如 10000。
  5. 输入每条记录的大小,比如 100 字节。
  6. 脚本会自动执行生产者性能测试,并输出测试结果,包括发送的总字节数、每秒发送的记录数、每秒发送的字节数等信息。

同样,如果要进行消费者性能测试,例如对 test_topic 主题:

  1. 运行 kf-use.sh 脚本并选择消费者性能测试功能(可能是数字 7 后再选择 1)。
  2. 查看可用主题列表,确认 test_topic 存在后输入该主题名称。
  3. 脚本会执行消费者性能测试,并给出消费 100000 条消息的相关性能数据,如每秒消费的消息数等。

六、脚本源码

#!/bin/bashwhile true; do# 命令大全系统界面echo "Kafka 命令大全系统:"echo "1. 主题操作(topics)"echo "2. 生产者操作(producer)"echo "3. 消费者操作(consumer)"echo "4. 配置操作(configs)"echo "5. 消费者组操作(consumer groups)"echo "6. 生产者性能测试(producer perf test)"echo "7. 消费者性能测试(consumer perf test)"echo "0. 退出"read -p "请输入功能选项数字:" choicecase $choice in1)# 主题操作菜单while true; doecho "主题操作功能:"echo "1. 查看所有主题"echo "2. 创建主题"echo "3. 查看某主题详细信息"echo "4. 修改某主题分区数"echo "5. 删除主题"echo "0.返回命令大全系统界面"read -p "请输入主题操作选项数字:" topic_choicecase $topic_choice in1)kafka-topics.sh --bootstrap-server bigdata01:9092 --list;;2)# 创建主题read -p "请输入要创建的主题名称:" topic_namewhile true; doread -p "请输入分区数(整数):" partitionsif [[ $partitions =~ ^[0-9]+$ ]]; thenbreakelseecho "分区数必须是整数,请重新输入。"fidonewhile true; doread -p "请输入副本数(整数,且不超过可用 broker 数量):" replication_factorif [[ $replication_factor =~ ^[0-9]+$ ]]; then# 这里可添加检查副本数不超过可用 broker 数量的逻辑,暂时省略breakelseecho "副本数必须是整数,请重新输入。"fidonekafka-topics.sh --bootstrap-server bigdata01:9092 --create --partitions $partitions --replication-factor $replication_factor --topic $topic_name;;3)# 查看某主题详细信息kafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"read -p "请输入要查看详细信息的主题名称:" topic_to_describekafka-topics.sh --bootstrap-server bigdata01:9092 --describe --topic $topic_to_describe;;4)# 修改某主题分区数kafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"read -p "请输入要修改分区数的主题名称:" topic_to_alterwhile true; doread -p "请输入新的分区数(整数且大于当前分区数):" new_partitionsif [[ $new_partitions =~ ^[0-9]+$ ]]; then# 这里可添加检查新分区数是否大于当前分区数的逻辑,暂时省略breakelseecho "新分区数必须是整数,请重新输入。"fidonekafka-topics.sh --bootstrap-server bigdata01:9092 --alter --topic $topic_to_alter --partitions $new_partitions;;5)# 删除主题kafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"read -p "请输入要删除的主题名称:" topic_to_deletekafka-topics.sh --bootstrap-server bigdata01:9092 --delete --topic $topic_to_delete;;0)break;;*)echo "无效的主题操作选择。";;esacdone;;2)# 生产者操作菜单while true; doecho "生产者操作功能:"echo "1. 发送消息到指定主题"echo "0.返回命令大全系统界面"read -p "请输入生产者操作选项数字:" producer_choicecase $producer_choice in1)kafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"read -p "请输入要发送消息的主题名称:" topic_nameecho "开始发送消息到主题 $topic_name。输入'EXIT'退出发送。"while true; doread -p "请输入消息内容:" messageif [ "$message" = "EXIT" ]; thenbreakfikafka-console-producer.sh --bootstrap-server bigdata01:9092 --topic $topic_name <<< "$message"done;;0)break;;*)echo "无效的生产者操作选择。";;esacdone;;3)# 消费者操作菜单while true; doecho "消费者操作功能:"echo "1. 消费指定主题的消息"echo "2. 从主题开头消费所有消息"echo "0.返回命令大全系统界面"read -p "请输入消费者操作选项数字:" consumer_choicecase $consumer_choice in1)kafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"read -p "请输入要消费消息的主题名称:" topic_namekafka-console-consumer.sh --bootstrap-server bigdata01:9092 --topic $topic_name;;2)kafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"read -p "请输入要从开头消费消息的主题名称:" topic_namekafka-console-consumer.sh --bootstrap-server bigdata01:9092 --from-beginning --topic $topic_name;;0)break;;*)echo "无效的消费者操作选择。";;esacdone;;4)# 配置操作菜单while true; doecho "配置操作功能:"echo "1. 查看主题配置"echo "2. 修改主题配置"echo "0.返回命令大全系统界面"read -p "请输入配置操作选项数字:" config_choicecase $config_choice in1)kafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"read -p "请输入要查看配置的主题名称:" topic_namekafka-configs.sh --bootstrap-server bigdata01:9092 --describe --entity-type topics --entity-name $topic_name;;2)kafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"read -p "请输入要修改配置的主题名称:" topic_nameread -p "请输入配置项名称:" config_nameread -p "请输入配置项值:" config_valuekafka-configs.sh --bootstrap-server bigdata01:9092 --alter --entity-type topics --entity-name $topic_name --add-config $config_name=$config_value;;0)break;;*)echo "无效的配置操作选择。";;esacdone;;5)# 消费者组操作菜单while true; doecho "消费者组操作功能:"echo "1. 查看消费者组列表"echo "2. 查看消费者组详情"echo "3. 重置消费者组偏移量"echo "0.返回命令大全系统界面"read -p "请输入消费者组操作选项数字:" consumer_group_choicecase $consumer_group_choice in1)kafka-consumer-groups.sh --bootstrap-server bigdata01:9092 --list;;2)read -p "请输入要查看详情的消费者组名称:" group_namekafka-consumer-groups.sh --bootstrap-server bigdata01:9092 --describe --group $group_name;;3)read -p "请输入要重置偏移量的消费者组名称:" group_nameread -p "请输入要重置偏移量的主题名称:" topic_namekafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"kafka-consumer-groups.sh --bootstrap-server bigdata01:9092 --reset-offsets --group $group_name --topic $topic_name --to-earliest;;0)break;;*)echo "无效的消费者组操作选择。";;esacdone;;6)# 生产者性能测试菜单while true; doecho "生产者性能测试功能:"echo "1. 执行生产者性能测试"echo "2. 查看可用主题列表"echo "0.返回命令大全系统界面"read -p "请输入生产者性能测试选项数字:" producer_perf_choicecase $producer_perf_choice in1)kafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"read -p "请输入要测试的主题名称:" topic_nameexisting_topics=$(kafka-topics.sh --bootstrap-server bigdata01:9092 --list)if echo "$existing_topics" | grep -q "$topic_name"; thenwhile true; doread -p "请输入要发送的记录数量(整数):" num_recordsif [[ $num_records =~ ^[0-9]+$ ]]; thenbreakelseecho "记录数量必须是整数,请重新输入。"fidonewhile true; doread -p "请输入每条记录的大小(整数,单位字节):" record_sizeif [[ $record_size =~ ^[0-9]+$ ]]; thenbreakelseecho "记录大小必须是整数,请重新输入。"fidonekafka-producer-perf-test.sh --topic $topic_name --num-records $num_records --record-size $record_size --throughput -1 --producer-props bootstrap.servers=bigdata01:9092elseecho "主题 $topic_name 不存在,请重新输入。"fi;;2)kafka-topics.sh --bootstrap-server bigdata01:9092 --list;;0)break;;*)echo "无效的生产者性能测试选择。";;esacdone;;7)# 消费者性能测试菜单while true; doecho "消费者性能测试功能:"echo "1. 执行消费者性能测试"echo "2. 查看可用主题列表"echo "0.返回命令大全系统界面"read -p "请输入消费者性能测试选项数字:" consumer_perf_choicecase $consumer_perf_choice in1)kafka-topics.sh --bootstrap-server bigdata01:9092 --listecho "当前可用主题列表如下:"read -p "请输入要测试的主题名称:" topic_nameexisting_topics=$(kafka-topics.sh --bootstrap-server bigdata01:9092 --list)if echo "$existing_topics" | grep -q "$topic_name"; thenkafka-consumer-perf-test.sh --broker-list bigdata01:9092 --topic $topic_name --messages 100000elseecho "主题 $topic_name 不存在,请重新输入。"fi;;2)kafka-topics.sh --bootstrap-server bigdata01:9092 --list;;0)break;;*)echo "无效的消费者性能测试选择。";;esacdone;;0)echo "退出脚本。"break;;*)echo "无效的选择。";;esac
done

七、总结

        kf-use.sh 脚本为我们提供了一个便捷的方式来测试 Kafka 集群的生产者和消费者性能,同时方便地查看可用主题列表。通过合理使用这个脚本,我们可以更好地了解 Kafka 集群在数据生产和消费方面的能力,及时发现潜在的性能瓶颈并进行优化,从而提高整个大数据处理流程的效率。无论是对于 Kafka 初学者还是有一定经验的开发者,这个脚本都是一个非常实用的工具,可以帮助我们更好地管理和优化 Kafka 集群的运行。

        在实际应用中,我们可以根据不同的业务需求和数据处理场景,灵活调整测试参数,深入分析测试结果,以确保 Kafka 集群能够稳定、高效地运行,满足日益增长的大数据处理需求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/477269.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

浪潮信息自动驾驶框架AutoDRRT 2.0,赋能高阶自动驾驶

随着自动驾驶技术的迅猛进步&#xff0c;BEVTransformer的感知模式为高阶自动驾驶带来了前所未有的精度、泛化能力和多模态融合效果&#xff0c;已成为众多顶尖汽车制造商的首选方案。然而&#xff0c;当前自动驾驶方案中的大模型算法参数规模剧增&#xff0c;对算力、数据IO及…

【电源专题】BUCK电源SW电压的平均值为什么等于输出电压?

在Buck电源测试过程中,我们会去测试SW开关节点的波形。那么从SW波形中我们能看出什么呢? 首先查看SW波形一般会看SW频率,通过SW波形的频率知道目前芯片的运行状态是什么。比如PSM还是PWM模式。 此外,还会看SW波形的占空比,通过占空比我们可以知道目前输出的状态是怎么样的…

微信分账系统供应链分润微信支付 (亲测源码)

搭建环境&#xff1a;nginxphp7.2mysql5.7 1.上传源码到网站根目录并解压 2.导入数据库文件到数据库 3.修改数据库链接文件/.env 4.设置运行目录为/public 5.伪静态设置成tp 6.后台地址&#xff1a;域名/zh9025.php 源码下载&#xff1a;https://download.csdn.net/down…

HTB:Buff[WriteUP]

目录 连接至HTB服务器并启动靶机 信息搜集 使用rustscan对靶机TCP端口进行开放扫描 使用nmap对靶机开放的端口进行脚本、服务扫描 使用curl分别访问靶机的两个端口 使用浏览器访问靶机8080端口页面 漏洞利用 使用searchsploit搜索该WebAPP 通过python2利用该EXP成功ge…

[UE5学习] 一、使用源代码安装UE5.4

一、简介 本文介绍了如何使用源代码安装编译UE5.4&#xff0c;并且新建简单的项目&#xff0c;打包成安卓平台下的apk安装包。 二、使用源代码安装UE5.4 注意事项&#xff1a; 请保证可以全程流畅地科学上网。请保证C盘具有充足的空间。请保证接下来安装下载的visual studi…

遗传算法(Genetic Algorithm, GA)

简介 遗传算法&#xff08;Genetic Algorithm, GA&#xff09;是一种基于自然选择和遗传机制的优化算法&#xff0c;由 John Holland 于20世纪70年代提出。它是一种模拟生物进化过程的启发式搜索算法&#xff0c;被广泛应用于函数优化、机器学习、调度问题等领域。 代码说明 …

【深度学习之回归预测篇】 深度极限学习机DELM多特征回归拟合预测(Matlab源代码)

深度极限学习机 (DELM) 作为一种新型的深度学习算法&#xff0c;凭借其独特的结构和训练方式&#xff0c;在诸多领域展现出优异的性能。本文将重点探讨DELM在多输入单输出 (MISO) 场景下的应用&#xff0c;深入分析其算法原理、性能特点以及未来发展前景。 1、 DELM算法原理及其…

[Redis#0] iredis: linux上redis超好用的环境配置

目录 Features 特征 Install 安装 Pip Brew Linux的 Download Binary 下载 Binary Usage 用法 Using DSN 使用 DSN Change The Default Prompt更改默认提示 Configuration 配置 Keys Development 发展 Release Strategy 发布策略 Setup Environment 设置环境 De…

软件测试——性能测试概念篇

前言&#xff1a;在完成对web网页或者接口的功能测试后&#xff0c;我们还需要考虑性能方面的因素&#xff0c;在学习完性能测试后&#xff0c;目标是能够对个人编写的项目进行性能测试&#xff0c;找到性能不足的地方&#xff08;性能问题个人很难去解决&#xff0c;如&#x…

从搭建uni-app+vue3工程开始

技术栈 uni-app、vue3、typescript、vite、sass、uview-plus、pinia 一、项目搭建 1、创建以 typescript 开发的工程 npx degit dcloudio/uni-preset-vue#vite-ts my-vue3-project2、安装sass npm install -D sass// 安装sass-loader&#xff0c;注意需要版本10&#xff0c;…

探索 .NET 9 控制台应用中的 LiteDB 异步 CRUD 操作

本文主要是使用异步方式&#xff0c;体验 litedb 基本的 crud 操作。 LiteDB 是一款轻量级、快速且免费的 .NET NoSQL 嵌入式数据库&#xff0c;专为小型本地应用程序设计。它以单一数据文件的形式提供服务&#xff0c;支持文档存储和查询功能&#xff0c;适用于桌面应用、移动…

AWS 新加坡EC2 VPS 性能、线路评测及免费注意事项

原文论坛给你更好的阅读讨论体验&#x1f490;&#xff1a; AWS 新加坡EC2 VPS 性能、线路评测及免费注意事项 - VPS - 波波论坛 引言 对于那些习惯薅“羊毛”的朋友来说&#xff0c; AWS 的 免费套餐 可能已经非常熟悉。这台vps是我用外币卡薅的免费的12个月的机器&#xf…

C++ASCII码表和字符操作

目录 1. 引言 2. ASCII码表 2.1 控制字符 2.2 可显示字符 3. 字符操作 3.1 记住几个字符规律 3.2 打印能够显示的ASCII码 3.3 字母大小写转换 3.4 数字转数字字符 1. 引言 在电子计算机中&#xff0c;只能识别由 0 和 1 组成的一串串的二进制数字&#xff0c;为了将人类…

git使用(二)

git使用&#xff08;二&#xff09; git常用基本操作命令git clonegit loggit remotegit statusgit addgit commitgit pushgit branchgit pull git常用基本操作命令 git clone 项目开发中项目负责人会在github上创建一个远程仓库&#xff0c;我们需要使用git clone将远程仓库…

密码学11

概论 计算机安全的最核心三个关键目标&#xff08;指标&#xff09;/为&#xff1a;保密性 Confidentiality、完整性 Integrity、可用性 Availability &#xff0c;三者称为 CIA三元组 数据保密性&#xff1a;确保隐私或是秘密信息不向非授权者泄漏&#xff0c;也不被非授权者使…

netstat -tuln | grep 27017(显示所有监听状态的 TCP 和 UDP 端口,并且以数字形式显示地址和端口号)

文章目录 1. 确定占用端口的进程使用 lsof 命令使用 fuser 命令 2. 结束占用端口的进程3. 修改 MongoDB 配置文件4. 检查 MongoDB 日志文件5. 重新启动 MongoDB 服务6. 检查 MongoDB 服务状态总结 [rootlocalhost etc]# netstat -tuln | grep 27017 tcp 0 0 127.0.…

ElasticSearch7.x入门教程之集群安装(一)

文章目录 前言一、es7.x版本集群安装二、elasticsearch-head安装三、Kibana安装总结 前言 在工作中遇到了&#xff0c;便在此记录一下&#xff0c;以防后面会再次遇到。第一次使用是在2020年末&#xff0c;过了很久了&#xff0c;忘了些许部分了。 在工作当中&#xff0c;如果…

I.MX6U 裸机开发18.GPT定时器实现高精度延时

I.MX6U 裸机开发18.GPT定时器实现高精度延时 一、GPT定时器简介1. GPT 功能2. 时钟源3. 框图4. 运行模式&#xff08;1&#xff09;Restart mode&#xff08;2&#xff09;Free-Run Mode 5. 中断类型&#xff08;1&#xff09;溢出中断 Rollover Interrupt&#xff08;2&#x…

key-value存储实现

文章目录 一、项目简介二、项目流程图三、网络3.1、epoll实现3.2、io_uring实现 四、协议五、存储5.1、array实现5.2、rbtree实现5.3、hash实现 六、测试 一、项目简介 key-value存储其实是一个小型的redis&#xff0c;用户在客户端输入存储相关的指令发送给服务器端&#xff…

大公司如何实现打印机共享的?如何对打印机进行管控或者工号登录后进行打印?异地打印机共享的如何实现可以帮助用户在不同地理位置使用同一台打印机完成打印任务?

大公司如何实现打印机共享的&#xff1f;如何对打印机进行管控或者工号登录后进行打印&#xff1f;异地打印机共享的如何实现可以帮助用户在不同地理位置使用同一台打印机完成打印任务&#xff1f; 如果在局域网内&#xff0c;可以不需要进行二次开发&#xff0c;通过对打印机进…