【shell-10】shell实现的各种kafka脚本

kafka-shell工具

    • 背景
    • 日志 log
    • 一.启动kafka->(start-kafka)
    • 二.停止kafka->(stop-kafka)
    • 三.创建topic->(create-topic)
    • 四.删除topic->(delete-topic)
    • 五.获取topic列表->(list-topic)
    • 六. 将文件数据 录入到kafka->(file-to-kafka)
    • 七.将kafka数据 下载到文件->(kafka-to-file)
    • 八. 查看topic的groupID消费情况->(list-group)

背景

注意:我用的kafka版本是 3.2.1 其他版本kafka提供的 命令行可能有细微区别。
因为经常要用kafka环境参与测试,所以写了不少脚本。在很多时候可以大大提高测试的效率。
主要包含如下功能:
topic的管理【创建,删除】
topic信息查看【topic列表,topic groupid 消费情况】
topic数据传输【file数据录入到topic,topic数据下载到本地文件】
脚本中做了各种检查,日志的输出做了颜色区分,用起来没啥问题。

日志 log

此文件是个额外的日志文件主要用于打印日志,该文件会被下面的shell文件引用

#!/bin/bash
#日志级别 debug-1, info-2, warn-3, error-4, always-5
LOG_LEVEL=2#调试日志
function log_debug(){content="$(date '+%Y-%m-%d %H:%M:%S') [DEBUG]: $@"[ $LOG_LEVEL -le 1  ] && echo -e "\033[32m"  ${content}  "\033[0m"
}
#信息日志
function log_info(){content="$(date '+%Y-%m-%d %H:%M:%S') [INGO]: $@"[ $LOG_LEVEL -le 2  ] && echo -e "\033[32m"  ${content} "\033[0m"
}
#警告日志
function log_warn(){content="$(date '+%Y-%m-%d %H:%M:%S') [WARN] $@"[ $LOG_LEVEL -le 3  ] && echo -e "\033[33m" ${content} "\033[0m"
}
#错误日志
function log_err(){content="$(date '+%Y-%m-%d %H:%M:%S') [ERROR]: $@"[ $LOG_LEVEL -le 4  ] && echo -e "\033[31m" ${content} "\033[0m"
}
~  

一.启动kafka->(start-kafka)

下面代码中的路径你要替换成自己的路径

#!/bin/bash
source /home/shell/logpid=`ps -aux | grep /home/kafka/kafka_2.12-3.2.1/bin/ | grep -v grep |awk '{print$2}'`
log_info "Start checking kafka process"
if [ -z $pid ]; thenlog_info "The kafka process does not exist, startting.........................................................................................."
elselog_warn "The kafka process exists and does not need to be started"exit 1
fi
nohup kafka-server-start.sh /home/kafka/kafka_2.12-3.2.1/config/server.properties >>/home/kafka/kafka.log 2>&1 &
# 日志的路径是安装kafka的时候指定的,也要替换成自己的路径
tail -f 20 /home/kafka/kafka.log

二.停止kafka->(stop-kafka)

下面代码中的路径你要替换成自己的路径

#!/bin/bash
source /home/shell/log
log_info "Start checking kafka process"
pid=`ps -aux | grep /home/kafka/kafka_2.12-3.2.1/bin/ | grep -v grep |awk '{print$2}'`
if [ -z $pid ]; thenlog_warn "The kafka process does not exist and does not need to be stopped"exit 1
elselog_info "The kafka process alive, stopping.............................................................................................................."
fi
kafka-server-stop.sh
log_info "Stop kafka success"

三.创建topic->(create-topic)

下面代码中的路径你要替换成自己的路径

#!/bin/bash
source /home/shell/log
log_info "脚本功能: 创建topic"
log_info "脚本参数: topic名称(必选)"
if [ $# -ne 1 ]; thenlog_err "错误:请传入topic名称"exit 1
fi
#TOPIC名称
TOPIC_NAME=$1
#KAFKA地址
KAFKA_BROKER=ip:9092
# 检查Kafka主题是否存在, 若已存在则放弃创建
if kafka-topics.sh --bootstrap-server $KAFKA_BROKER  --list | grep -q "^$TOPIC_NAME$";thenlog_warn "$TOPIC_NAME 已经存在,放弃创建"
else# 默认1副本,3分区kafka-topics.sh --create --bootstrap-server $KAFKA_BROKER --replication-factor 1 --partitions 3 --topic $TOPIC_NAMElog_info "请执行topic-list检查创建是否成功"
fi
~     

在这里插入图片描述

四.删除topic->(delete-topic)

下面代码中的路径你要替换成自己的路径

#!/bin/bashsource /home/shell/log
log_info "脚本作用:删除topic"
log_info "脚本参数: 支持多个topic,用空格分开,可以批量删除"
KAFKA_BROKER=ip:9092
function check_kafka_topic() {local local_topic_name=$1if kafka-topics.sh --bootstrap-server $KAFKA_BROKER  --list | grep -q "^$local_topic_name$";thenlog_info "$local_topic_name存在->true"return 0  # 返回true  elselog_warn "$local_topic_name 不存在->false"return 1  # return falsefi
}# 逐个删除topic
for topic in "$@"
doif ! check_kafka_topic $topic; thenlog_info "tpoic->$topic 不存在,跳过删除行为"continueelselog_info "topic->$topic 执行删除"kafka-topics.sh --delete --bootstrap-server $KAFKA_BROKER --topic $topiclog_info "topic->$topic 删除成功"fi
done

在这里插入图片描述

五.获取topic列表->(list-topic)

#!/bin/bash
source /home/shell/log
KAFKA_BROKER=ip:9092  
log_info "脚本作用: 列出topic信息"
log_info "脚本参数: topic名称(可选->未指定topic则列出所有topic信息)"
if [ $# -eq 1 ]; thenlog_info "目标$1 详情如下"kafka-topics.sh --describe  --bootstrap-server $KAFKA_BROKER | grep -v "__consumer_offsets" | grep $1
elselog_info "所有topic 列表如下:"kafka-topics.sh --describe  --bootstrap-server $KAFKA_BROKER | grep -v "__consumer_offsets"
fi

在这里插入图片描述

六. 将文件数据 录入到kafka->(file-to-kafka)

#!/bin/bash
source /home/shell/log
log_info "脚本作用: 将文件中的数据录入指定topic"
log_info "脚本参数: 1.文件路劲(必选) 2.topic(必选)"
log_info "参数校验"
log_info "执行条件检查.........................................................................................................."
if [ $# -ne 2 ]; thenlog_err "必须传入两个参数: 1.文件路劲(必选) 2.topic(必选)"exit 1
fiif ! [ -f $1 ]; thenlog_err "$1不是一个有效的数据文件"exit 1
fiFILE_PATH=$1
TOPIC_NAME=$2
KAFKA_BROKER=ip:9092  #检查topic是否存在
function check_kafka_topic() {local local_KAFKA_BROKER=$1if kafka-topics.sh --bootstrap-server $KAFKA_BROKER  --list | grep -q "^$local_KAFKA_BROKER$";thenreturn 0  # 返回true  elsereturn 1  # return falsefi
}#将文件数据推送到kafka
function send_to_kafka(){local local_path=$1local count=0while IFS= read -r line; do  kafka-console-producer.sh --broker-list $KAFKA_BROKER --topic $TOPIC_NAME <<< "$line"  count=$((count+1))done < "$local_path"echo $count
}        if ! check_kafka_topic $TOPIC_NAME;thenlog_err "条件检查不通过, 原因: topic->$TOPIC_NAME不存在, 请先创建topic"exit 1
filog_info "参数检查通过.........................................................................................................."
start_time=`date "+%Y-%m-%d %H:%M:%S"`
start_seconds=$(date -d "$start_time" +%s)log_info "开始录入数............................................................................................................"
count=$(send_to_kafka $FILE_PATH)end_time=`date "+%Y-%m-%d %H:%M:%S"`
end_seconds=$(date -d "$end_time" +%s)
time_diff=$((end_seconds - start_seconds))  log_info "录入条数: $count"
log_info "花费时间:$time_diff 秒"
log_info "录入完成.............................................................................................................."

在这里插入图片描述

七.将kafka数据 下载到文件->(kafka-to-file)

#!/bin/bash
source /home/shell/log
log_info "脚本作用: 将kafka指定topic的数据消费到指定文件中"
log_info "脚本参数:1.数据文件路径(必选) 2.topic名称(必选) 3.groupID(可选->不存在则从头消费,存在则从grooupID offset 开始消费)"
log_info "group-list 脚本可以查看当前的"
# Kafka的bin目录  
KAFKA_BIN_DIR=/path/to/kafka/bin#kafka 地址  
KAFKA_SERVER=ip:9092 # Kafka的配置文件目录  
KAFKA_CONFIG_DIR=/home/kafka/kafka_2.12-3.2.1/config# Kafka消费者配置文件  
CONSUMER_CONFIG=$KAFKA_CONFIG_DIR/consumer.properties# 指定要消费的主题  
TOPIC_NAME=your_topic_name# 指定要写入的文件 
FILE_PATH=$1
TOPIC_NAME=$2
GROUP_ID=$3log_info "执行检察............................................................................................................................"function check_kafka_topic() {local local_topic_name=$1if kafka-topics.sh --bootstrap-server $KAFKA_SERVER  --list | grep -q "^$local_topic_name$";thenreturn 0  # 返回true  elsereturn 1  # return falsefi
}if ! check_kafka_topic $TOPIC_NAME;thenlog_err "topic->$TOPIC_NAME 未找到"exit 1
fi
log_info "检查通过............................................................................................................................"log_info "当前topic,所有groupID的消费情况如下>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"
while IFS= read -r line; doif [[ $line == *"PARTITION"* ]]; thencontent="$(date '+%Y-%m-%d %H:%M:%S') [INFO] $line"echo -e "\033[45m" ${content} "\033[0m"else  log_info "$line"fi
done< <(kafka-consumer-groups.sh --bootstrap-server $KAFKA_SERVER --describe  --all-groups | grep -v '__consumer_offsets' | grep "$TOPIC_NAME\|PARTITION")log_info "当前topic,所有groupID的消费情况输出完成>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"log_info "消费进程运行中( CTRL+C 可退出消费 )................................................................................................."
# 运行消费者脚本并将输出重定向到文件  
if [ $# -eq 2 ]; thenkafka-console-consumer.sh --bootstrap-server $KAFKA_SERVER --topic $TOPIC_NAME --from-beginning > $FILE_PATH
fi
if [ $# -eq 3 ]; thenkafka-console-consumer.sh --bootstrap-server $KAFKA_SERVER --topic $TOPIC_NAME --from-beginning --group $GROUP_ID > $FILE_PATH
fi

在这里插入图片描述

八. 查看topic的groupID消费情况->(list-group)

#!/bin/bash
kafka_broker=ip:9092
source /home/shell/log
log_info "脚本功能: 查看topic的groupID信息"
log_info "脚本参数: topic名称(可选->未指定topic则列出所有topic的groupID信息)"
function check_kafka_topic() {local local_topic_name=$1if kafka-topics.sh --bootstrap-server $kafka_broker  --list | grep -q "^$local_topic_name$";thenlog_info "$local_topic_name存在->true"return 0  # 返回true  elselog_warn "$local_topic_name 不存在->false"return 1  # return falsefi
}if [ $# -eq 1 ]; thenif ! check_kafka_topic $1; then#topic 不存在则直接退出程序log_warn "topic=$1, 不存在"exit 1filog_info "topic_name=$1 的gruoupID信息如下:"kafka-consumer-groups.sh --bootstrap-server $kafka_broker --describe --all-groups | grep $1 | grep -v __consumer_offsets
elselog_info "所有groupID信息如下:"kafka-consumer-groups.sh --bootstrap-server $kafka_broker --describe --all-groups | grep -v __consumer_offsets
fi

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/246230.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

研发日记,Matlab/Simulink避坑指南(六)——字节分割Bug

文章目录 前言 背景介绍 问题描述 分析排查 解决方案 总结归纳 前言 见《研发日记&#xff0c;Matlab/Simulink避坑指南&#xff08;一&#xff09;——Data Store Memory模块执行时序Bug》 见《研发日记&#xff0c;Matlab/Simulink避坑指南(二)——非对称数据溢出Bug》…

【C++】list讲解及模拟

目录 list的基本介绍 list模拟实现 一.创建节点 二.迭代器 1.模版参数 2.迭代器的实现&#xff1a; a. ! b. c. -- d. *指针 e.&引用 整体iterator (与const复用)&#xff1a; 三.功能实现 1.模版参数 2.具体功能实现&#xff1a; 2.1 构造函数 2.2 begi…

大型语言模型基础知识的可视化指南

直观分解复杂人工智能概念的工具和文章汇总 如今&#xff0c;LLM&#xff08;大型语言模型的缩写&#xff09;在全世界都很流行。没有一天不在宣布新的语言模型&#xff0c;这加剧了人们对错过人工智能领域的恐惧。然而&#xff0c;许多人仍在为 LLM 的基本概念而苦苦挣扎&…

python爬虫基础

python爬虫基础 前言 Python爬虫是一种通过编程自动化地获取互联网上的信息的技术。其原理可以分为以下几个步骤&#xff1a; 发送HTTP请求&#xff1a; 爬虫首先会通过HTTP或HTTPS协议向目标网站发送请求。这个请求包含了爬虫想要获取的信息&#xff0c;可以是网页的HTML内…

关于C#中的HashSet<T>与List<T>

HashSet<T> 表示值的集合。这个集合的元素是无须列表&#xff0c;同时元素不能重复。由于这个集合基于散列值&#xff0c;不能通过数组下标访问。 List<T> 表示可通过索引访问的对象的强类型列表。内部是用数组保存数据&#xff0c;不是链表。元素可重复&#xf…

如何利用streamlit 將 gemini pro vision 進行圖片內容介紹

如何利用streamlit 將 gemini pro vision 進行圖片內容介紹 1.安裝pip install google-generativeai 2.至 gemini pro 取 api key 3.撰寫如下文章:(方法一) import json import requests import base64 import streamlit as st 讀取圖片檔案&#xff0c;並轉換成 Base64 編…

mysql 存储过程学习

存储过程介绍 1.1 SQL指令执行过程 从SQL执行的流程中我们分析存在的问题: 1.如果我们需要重复多次执行相同的SQL&#xff0c;SQL执行都需要通过连接传递到MySQL&#xff0c;并且需要经过编译和执行的步骤; 2.如果我们需要执行多个SQL指令&#xff0c;并且第二个SQL指令需要…

145基于matlab的求解悬臂梁前3阶固有频率和振型

基于matlab的求解悬臂梁前3阶固有频率和振型,采用的方法分别是&#xff08;假设模态法&#xff0c;解析法&#xff0c;瑞利里兹法&#xff09;。程序已调通&#xff0c;可直接运行。 145 matlab 悬臂梁 固有频率 振型 (xiaohongshu.com)

Linux 驱动开发基础知识—— LED 驱动程序框架(四)

个人名片&#xff1a; &#x1f981;作者简介&#xff1a;一名喜欢分享和记录学习的在校大学生 &#x1f42f;个人主页&#xff1a;妄北y &#x1f427;个人QQ&#xff1a;2061314755 &#x1f43b;个人邮箱&#xff1a;2061314755qq.com &#x1f989;个人WeChat&#xff1a;V…

[嵌入式软件][启蒙篇][仿真平台] STM32F103实现IIC控制OLED屏幕

上一篇&#xff1a;[嵌入式软件][启蒙篇][仿真平台] STM32F103实现LED、按键 [嵌入式软件][启蒙篇][仿真平台] STM32F103实现串口输出输入、ADC采集 [嵌入式软件][启蒙篇][仿真平台]STM32F103实现定时器 [嵌入式软件][启蒙篇][仿真平台] STM32F103实现IIC控制OLED屏幕 文章目…

DS:单链表的实现(超详细!!)

创作不易&#xff0c;友友们点个三连吧&#xff01; 在博主的上一篇文章中&#xff0c;很详细地介绍了顺序表实现的过程以及如何去书写代码&#xff0c;如果没看过的友友们建议先去看看哦&#xff01; DS&#xff1a;顺序表的实现&#xff08;超详细&#xff01;&#xff01;&…

上位机图像处理和嵌入式模块部署(python opencv)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 前面我们谈到了qt&#xff0c;谈到了opencv&#xff0c;也谈到了嵌入式&#xff0c;但是没有说明python在这个过程当中应该扮演什么样的角色。open…

风口抓猪-借助亚马逊云科技EC2服务器即刻构建PalWorld(幻兽帕鲁)私服~~~持续更新中

Pocketpair出品的生存类游戏《幻兽帕鲁》最近非常火&#xff0c;最高在线人数已逼近200万。官方服务器亚历山大&#xff0c;游戏开发商也提供了搭建私人专用服务器的方案&#xff0c;既可以保证稳定的游戏体验&#xff0c;也可以和朋友一起联机游戏&#xff0c;而且还能自定义经…

Websocket协议详解

前言 本文主要介绍Websocket是什么以及其协议内容。 WebSocket 协议实现在受控环境中运行不受信任代码的一个客户端到一个从该代码已经选择加入通信的远程主机之间的全双工通信。该协议包括一个打开阶段握手规定以及通信时基本消息帧的定义。其基于TCP之上。此技术的目标是为基…

分布式因果推断在美团履约平台的探索与实践

美团履约平台技术部在因果推断领域持续的探索和实践中&#xff0c;自研了一系列分布式的工具。本文重点介绍了分布式因果树算法的实现&#xff0c;并系统地阐述如何设计实现一种分布式因果树算法&#xff0c;以及因果效应评估方面qini_curve/qini_score的不足与应对技巧。希望能…

基于机器学习的地震预测(Earthquake Prediction with Machine Learning)

基于机器学习的地震预测&#xff08;Earthquake Prediction with Machine Learning&#xff09; 一、地震是什么二、数据组三、使用的工具和库四、预测要求五、机器学习进行地震检测的步骤六、总结 一、地震是什么 地震几乎是每个人都听说过或经历过的事情。地震基本上是一种自…

浪花 - 响应拦截器(强制登录)

1. 配置响应拦截器 import axios from axios;const myAxios axios.create({baseURL: http://localhost:8080/api/, });myAxios.defaults.withCredentials true;// 请求拦截器 myAxios.interceptors.request.use(function (config) {// Do something before request is sentc…

ubuntu设置右键打开terminator、code

前言&#xff1a; 这里介绍一种直接右键打开本地目录下的terminator和vscode的方法。 一&#xff1a;右键打开terminator 1.安装terminator sudo apt install terminator 2.安装nautilus-actions filemanager-actions sudo apt-get install nautilus-actions filemanager…

【小白教程】幻兽帕鲁服务器一键搭建 | 支持更新 | 自定义配置

幻兽帕鲁刚上线就百万在线人数&#xff0c;官方服务器的又经常不稳定&#xff0c;所以这里给大家带来最快捷的搭建教程&#xff0c;废话不多说直接开始。 步骤一&#xff1a;准备服务器 服务器建议 Linux 系统&#xff0c;资源占用低&#xff0c;而且一键脚本只需要一条命令&am…

如何使用宝塔面板配置Nginx反向代理WebSocket(wss)

本章教程&#xff0c;主要介绍一下在宝塔面板中如何配置websocket wss的具体过程。 目录 一、添加站点 二、申请证书 三、配置代理 1、增加配置内容 2、代理配置内容 三、注意事项 一、添加站点 二、申请证书 三、配置代理 1、增加配置内容 map $http_upgrade $connection_…