从kafka和zookeeper中获取生产和消费偏移量

从kafka和zookeeper中获取生产和消费偏移量

  • 特殊说明

    • 该命令是使用python进行编译,需要使用centos7系统上进行使用。
  • 命令详细

[root@mongodb_1 get_offsets_num]# ./get_offsets_num -h
usage: get_offsets_num [-h] [-k KAFKA_HOST] [-z ZOOKEEPER_HOST][-m INTERVAL_MINUTES]Usage of argparseoptional arguments:-h, --help            show this help message and exit-k KAFKA_HOST, --kafka_host KAFKA_HOST需要输入kafka:端口-z ZOOKEEPER_HOST, --zookeeper_host ZOOKEEPER_HOST需要输入zookeeper:端口-m INTERVAL_MINUTES, --Interval_minutes INTERVAL_MINUTES间隔分钟
  • 命令执行
[root@mongodb_1 get_offsets_num]# ./get_offsets_num_v2.py  -k 10.130.25.77:9092 -z 10.130.25.79:2181  
Interval 1 minutes sleep
=======================================================================================
kafka offsets: agent 2574552 2574552
zookeeper offsets: agent 2574552 2574552
agent kafka offsets num: 0 storm offsets num: 0 Actual consumption: 0
=======================================================================================
kafka offsets: record 89110 89110
zookeeper offsets: record 89110 89110
record kafka offsets num: 0 storm offsets num: 0 Actual consumption: 0
=======================================================================================
  • 代码详情
#!/usr/local/python3/bin/python3
import os, time,json,argparse
from kazoo.client import KazooClient
from kafka3 import KafkaConsumer, TopicPartitiondef get_zoo_consumer_info(Topology):Topology_num = 0zk_cli.start()path = "/stormOffset/" + Topology + "/partition_0"if zk_cli.exists(path):str_data, stat = zk_cli.get(path)str_data = json.loads(str_data)Topology_num =  str_data.get("offset")#print("zookeeper now " + path + " offsets: " + str(Topology_num) )else:   print("Path " + path  + " does not exist.")return Topology_numdef get_kafka_consumer_info(server, topic):partition = 0tp = TopicPartition(topic, partition)end_offset = server.end_offsets([tp])[tp]#print("kafka topic " + topic + " partition " + str(partition) + " offsets: " + str(end_offset))return end_offsetif  __name__ == '__main__':parser = argparse.ArgumentParser(description='Usage of argparse')parser.add_argument('-k','--kafka_host', type=str, default="10.130.25.77:9092",help='需要输入kafka:端口')parser.add_argument('-z','--zookeeper_host', type=str, default="10.130.25.79:2181",help='需要输入zookeeper:端口')parser.add_argument('-m','--Interval_minutes', type=int, default="1",help='间隔分钟')args = parser.parse_args()kafka_host= args.kafka_hostzookeer_host= args.zookeeper_hostKafka_production_topics = "agent,record"Zoo_consumption_topics= "agentTopology,recordTopology"Interval_minutes = args.Interval_minutestry:zk_cli = KazooClient(hosts=zookeer_host)#print("init zookeeper " + zookeer_host + " conn ok")except Exception as e:print("init zookeeper conn error: "+ str(e))try:#kafka_server = KafkaConsumer(bootstrap_servers=kafka_host)kafka_server = KafkaConsumer(bootstrap_servers=kafka_host)#print("init kafka " + kafka_host + "  conn ok")except Exception as e:print("init kafka conn error: "+ str(e))zoo_offset = {}kafka_offset = {}Kafka_production_topics_list = Kafka_production_topics.split(",")Kafka_production_topics_list_2  =  Kafka_production_topics.split(",")Zoo_consumption_topics_list = Zoo_consumption_topics.split(",")Zoo_consumption_topics_list_2 =   Zoo_consumption_topics.split(",")for i in range(0,len(Kafka_production_topics_list)):kafka_topics = Kafka_production_topics_list.pop()get_kafka_offset_num = get_kafka_consumer_info(kafka_server,kafka_topics)kafka_offset[kafka_topics]=get_kafka_offset_numzoo_topics = Zoo_consumption_topics_list.pop()get_zoo_offset_num = get_zoo_consumer_info(zoo_topics)zoo_offset[zoo_topics]= get_zoo_offset_numprint("Interval " + str(Interval_minutes) + " minutes sleep")print("=======================================================================================")time.sleep(int(Interval_minutes) * 60)for i in range(0,len(Kafka_production_topics_list_2)):kafka_topics = Kafka_production_topics_list_2.pop()get_kafka_offset_num = get_kafka_consumer_info(kafka_server,kafka_topics)last_kafka_num = kafka_offset.get(kafka_topics)minutes_kafka_offset_num = get_kafka_offset_num - last_kafka_numzoo_topics = Zoo_consumption_topics_list_2.pop()get_zoo_offset_num = get_zoo_consumer_info(zoo_topics)last_zoo_num =  zoo_offset.get(zoo_topics)minutes_zoo_offset_num = get_zoo_offset_num - last_zoo_numDifference = minutes_kafka_offset_num - minutes_zoo_offset_numprint("kafka offsets:",kafka_topics,get_kafka_offset_num,last_kafka_num)print("zookeeper offsets:",kafka_topics,get_zoo_offset_num,last_zoo_num)print(kafka_topics  + " kafka offsets num: " + str(minutes_kafka_offset_num) + " storm offsets num: " + str(minutes_zoo_offset_num) + " Actual consumption: " + str(Difference))print("=======================================================================================")zk_cli.stop()# 关闭消费者连接kafka_server.close()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/430546.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

NLP 文本匹配任务核心梳理

定义 本质上是做了意图的识别 判断两个内容的含义(包括相似、矛盾、支持度等)侠义 给定一组文本,判断语义是否相似Yi 分值形式给出相似度 广义 给定一组文本,计算某种自定义的关联度Text Entailment 判断文本是否能支持或反驳这个…

统信服务器操作系统【Cron定时任务服务】

Cron定时任务服务服务介绍、服务管理、服务配置 文章目录 一、功能概述二、功能介绍1. Cron 服务管理2.Cron 服务管理3.Cron 服务配置run-parts一、功能概述 cron是一个可以用来根据时间、日期、月份、星期的组合来 调度对周期性任务执行的守护进程。利用 cron 所提供的功能,可…

插座空置状态检测系统源码分享

插座空置状态检测检测系统源码分享 [一条龙教学YOLOV8标注好的数据集一键训练_70全套改进创新点发刊_Web前端展示] 1.研究背景与意义 项目参考AAAI Association for the Advancement of Artificial Intelligence 项目来源AACV Association for the Advancement of Computer…

输电线塔目标检测数据集yolo格式该数据集包括2644张输电线塔高清图像,该数据集已经过yolo格式标注,具有完整的txt标注文件和yaml配置文件。

输电线塔目标检测数据集yolo格式 该数据集包括2644张输电线塔高清图像,该数据集已经过yolo格式标注,具有完整的txt标注文件和yaml配置文件。 输电线塔目标检测数据集 数据集名称 输电线塔目标检测数据集(Transmission Tower Object Detecti…

视频去水印 —— 释放创意,让学习与创作更自由!

🌟 视频去水印 —— 释放创意,让学习与创作更自由! 在这个短视频盛行的时代,抖音、快手、小红书等平台成为了创意与灵感的聚集地。你是否曾遇到过想要学习或进行二次创作,却被视频中的水印所困扰?现在&…

RHCSA认证-Linux(RHel9)-Linux入门

文章目录 概要一、创建、查看和编辑⽂本1.1 输出重定向1.2 vim编辑器1.3 shell 变量1.5 获取帮助 二、管理本地用户和组2.1 描述用户2.2 切换用户和赋权2.3 用户管理2.4 用户组管理2.5 密码策略 三、控制文件访问3.1 列出文件和文件权限3.2 更改文件权限和拥有者3.3 控制默认权…

WPF自定义Dialog模板,内容用不同的Page填充

因为审美的不同,就总有些奇奇怪怪的需求,使用框架自带的对话框已经无法满足了,这里记录一下我这边初步设计的对话框。别问为啥要用模板嵌套Page来做对话框,问就是不想写太多的窗体。。。。 模板窗体(XAML)…

独立游戏《Project:Survival》UE5C++开发日志0——游戏介绍

该游戏是《星尘异变》团队的下一款作品,太空科幻题材的生存游戏,我将负责使用C、蓝图实现游戏的基础框架和核心功能,其中还包含使用人工智能算法助力游戏开发或帮助玩家运营 目前已有功能: 1.3D库存系统:所有库存中的物…

1.6 计算机网络体系结构

参考:📕深入浅出计算机网络 常见的三种计算机网络体系结构 TCP/IP体系结构 路由器一般只包含网络接口层和网际层。 应用层TCP/IP体系结构的应用层包含了大量的应用层协议,例如HTTP、SMTP、DNS、RTP等运输层TCP和UDP是TCP/IP体系结构运输层的…

UWA支持鸿蒙HarmonyOS NEXT

华为在开发者大会上,宣布了鸿蒙HarmonyOS NEXT将仅支持鸿蒙内核和鸿蒙系统的应用,不再兼容安卓应用,这意味着它将构建一个全新且完全独立的生态系统。 为此,UWA也将在最新版的UWA SDK v2.5.0中支持鸿蒙HarmonyOS NEXT&#xff0c…

链表分割-----------lg

现有一链表的头指针 ListNode* pHead,给一定值x,编写一段代码将所有小于x的结点排在其余结点之前,且不能改变原来的数据顺序,返回重新排列后的链表的头指针。 我们可以假设x为36,则小于36都排在前边,>3…

虚幻引擎游戏保存/加载存档功能

函数名功能Does Save Game Exist检查存档是否存在Load Game from Slot加载存档Save Game to Slot保存存档Delete Game in Slot删除存档 Slot Name 是插槽名字 存档都是通过插槽名字来 读取/加载/检查/删除的 先创建一个SaveGame类 , 这个类里可以存放要保存的数据 , 比如 玩家…

【UE5】将2D切片图渲染为体积纹理,最终实现使用RT实时绘制体积纹理【第二篇-着色器制作】

在上一篇文章中,我们已经理顺了实现流程。 接下来,我们将在UE5中,从头开始一步一步地构建一次流程。 通过这种方法,我们可以借助一个熟悉的开发环境,使那些对着色器不太熟悉的朋友们更好地理解着色器的工作原理。 这篇…

思科安全网络解决方案

《网安面试指南》http://mp.weixin.qq.com/s?__bizMzkwNjY1Mzc0Nw&mid2247484339&idx1&sn356300f169de74e7a778b04bfbbbd0ab&chksmc0e47aeff793f3f9a5f7abcfa57695e8944e52bca2de2c7a3eb1aecb3c1e6b9cb6abe509d51f&scene21#wechat_redirect 《Java代码审…

Redis数据持久化总结笔记

Redis 是内存数据库,如果不将内存中的数据库状态保存到磁盘,那么一旦服务器进程退出,服务器中的数据库状态也会消失。所以 Redis 提供了持久化功能! Redis 提供了 2 个不同形式的持久化方式 RDB(Redis DataBase&#…

【python】requests 库 源码解读、参数解读

文章目录 一、基础知识二、Requests库详解2.1 requests 库源码简要解读2.2 参数解读2.3 处理响应2.4 错误处理 一、基础知识 以前写过2篇文章: 计算机网络基础: 【socket】从计算机网络基础到socket编程——Windows && Linux C语言 Python实现…

排序----希尔排序

void ShellSort(int* a, int n) {int gap n;while (gap > 1){// 1保证最后一个gap一定是1// gap > 1时是预排序// gap 1时是插入排序gap gap / 3 1;for (size_t i 0; i < n - gap; i){int end i;int tmp a[end gap];while (end > 0){if (tmp < a[end]){…

英伟达NVIDIA数字IC后端笔试真题(ASIC Physical Design Engineer)

今天小编给大家分享下英伟达NVIDIA近两年数字IC后端笔试真题&#xff08;ASIC Physical Design&#xff09; 请使用OR门和INV反相器来搭建下面所示F逻辑表达式的电路图。 数字IC后端设计如何从零基础快速入门&#xff1f;(内附数字IC后端学习视频&#xff09; 2024届IC秋招兆…

WEB领域是不是黄了还是没黄

进入2024年后&#xff0c;WEB领域大批老表失业&#xff0c;一片哀嚎&#xff0c;个个饿的鬼叫狼嚎&#xff0c;为啥呢&#xff0c;下面是我个人的见解和看法。 中国程序员在应用层的集中 市场需求&#xff1a;中国的互联网行业在过去几年中经历了爆炸性增长&#xff0c;尤其是…

RAG技术全面解析:Langchain4j如何实现智能问答的跨越式进化?

LLM 的知识仅限于其训练数据。如希望使 LLM 了解特定领域的知识或专有数据&#xff0c;可&#xff1a; 使用本节介绍的 RAG使用你的数据对 LLM 进行微调结合使用 RAG 和微调 1 啥是 RAG&#xff1f; RAG 是一种在将提示词发送给 LLM 之前&#xff0c;从你的数据中找到并注入…