41.3 将重查询记录增量更新到consul和redis中

本节重点介绍 :

  • 将重查询记录增量更新到consul中
  • 同时将record记录更新到本地
  • 更新到redis中

将重查询记录增量更新到consul中

封装consul-client


class Consul(object):def __init__(self, host, port):'''初始化,连接consul服务器'''self._consul = consul.Consul(host, port)def RegisterService(self, name, host, port, tags=None):tags = tags or []# 注册服务self._consul.agent.service.register(name,name,host,port,tags,# 健康检查ip端口,检查时间:5,超时时间:30,注销时间:30scheck=consul.Check().tcp(host, port, "5s", "30s", "30s"))def GetService(self, name):services = self._consul.agent.services()service = services.get(name)if not service:return None, Noneaddr = "{0}:{1}".format(service['Address'], service['Port'])return service, addrdef delete_key(self, key='prometheus/records'):res = self._consul.kv.delete(key, recurse=True)return resdef get_list(self, key='prometheus/records'):res = self._consul.kv.get(key, recurse=True)data = res[1]if not data:return {}pre_record_d = {}for i in data:v = json.loads(i.get('Value').decode("utf-8"))pre_record_d[v.get('record')] = v.get('expr')return pre_record_ddef set_data(self, key, value):'''self._consul.kv.put('prometheus/records/1',json.dumps({"record": "nyy_record_test_a","expr": 'sum(kafka_log_log_size{project=~"metis - main1 - sg2"}) by (topic)'}))'''self._consul.kv.put(key, value)def get_b64encode(self, message):message_bytes = message.encode('ascii')base64_bytes = base64.b64encode(message_bytes)return base64_bytes.decode("utf8")def txn_mset(self, record_expr_list):lens = len(record_expr_list)logging.info("top_lens:{}".format(lens))max_txn_once = 64yu_d = lens // max_txn_onceyu = lens / max_txn_onceif lens <= max_txn_once:passelse:max = yu_dif yu > yu_d:max += 1for i in range(0, max):sli = record_expr_list[i * max_txn_once:(i + 1) * max_txn_once]self.txn_mset(sli)return True'''{"KV": {"Verb": "<verb>","Key": "<key>","Value": "<Base64-encoded blob of data>","Flags": 0,"Index": 0,"Session": "<session id>"}}:return:'''txn_data = []logging.info("middle_lens:{}".format(len(record_expr_list)))for index, data in record_expr_list:txn_data.append({"KV": {"Key": "{}/{}".format(CONSUL_RECORD_KEY_PREFIX, index),"Verb": "set","Value": self.get_b64encode(json.dumps(data)),}})# TODO local test# print(txn_data)# return Trueres = self._consul.txn.put(txn_data)if not res:logging.error("txn_mset_error")return Falseif res.get("Errors"):logging.error("txn_mset_error:{}".format(str(res.get("Errors"))))return Falsereturn True

获取consul对象

    consul_client = Consul(CONSUL_HOST, CONSUL_PORT)if not consul_client:logging.fatal("connect_to_consul_error")

获取历史key增量更新

  • 做增量更新的意义是避免 重复添加相同的重查询
  • 使用set做增量更新
  • 最终一个key的示例和prometheus 的record yaml匹配
groups:
- name: my_recordinterval: 30srules:- record: hke:heavy_expr:0211d8a2fcdefee8e626c86ba3916281expr: sum(delta(kafka_topic_partition_current_offset{instance=~'1.1.1.1:9308', topic=~".+"}[5m])/5) by (topic)
  • 代码
    ##  consul中的历史记录pre_dic = consul_client.get_list(key=CONSUL_RECORD_KEY_PREFIX)old_len = len(pre_dic) + 1## 增量更新old_key_set = set(pre_dic.keys())this_key_set = set(res_dic.keys())## 更新的keysnew_dic = {}today_all_dic = {}new_key_set = this_key_set - old_key_setlogging.info("new_key_set:{} ".format(len(new_key_set)))for k in new_key_set:new_dic[k] = res_dic[k]record_expr_list = []for k in sorted(new_dic.keys()):record_expr_list.append({"record": k, "expr": new_dic.get(k)})today_all_dic.update(pre_dic)today_all_dic.update(new_dic)local_record_expr_list = []for k in sorted(today_all_dic.keys()):local_record_expr_list.append({"record": k, "expr": today_all_dic.get(k)})logging.info("get_all_record_heavy_query:{} ".format(len(local_record_expr_list)))

写入到本地record yaml中为了记录

    # 写到本地record yaml中write_record_yaml_file(local_record_expr_list)
def write_record_yaml_file(record_expr_list):'''data = {"groups": [{"name": "example","rules": [{"record": "nyy_record_test_a","expr": "sum(kafka_log_log_size{project=~"metis-main1-sg2"}) by (topic)"},],},]}'''data = {"groups": [{"name": "heavy_expr_record","rules": record_expr_list,},]}with open("{}/record_{}_{}.yml".format(PROME_RECORD_FILE, len(record_expr_list), now_date_str()), 'w') as f:yaml.dump(data, f, default_flow_style=False, sort_keys=False)

给每一条记录加上 序号,为了后续confd分片

  • record_expr_list中的记录已经排序了
    # 写入consul中new_record_expr_list = []for index, data in enumerate(record_expr_list):new_record_expr_list.append((index + old_len, data))if new_record_expr_list:consul_w_res = consul_client.txn_mset(new_record_expr_list)if not consul_w_res:logging.fatal("write_to_consul_error")else:logging.info("zero_new_heavy_record:{}")

增量写入redis中

    # 步骤 4 写入redis中if new_dic:mset_record_to_redis(new_dic)
def mset_record_to_redis(res_dic):if not res_dic:logging.fatal("record_expr_list empty")rc = redis_conn()if not rc:logging.fatal("failed to connect to redis-server")mset_res = rc.mset(res_dic)logging.info("mset_res:{} len:{}".format(str(mset_res), format(len(res_dic))))sadd_res = rc.sadd(REDIS_SET_KEY, *res_dic.keys())logging.info("sadd_res:{}".format(str(sadd_res)))smems = rc.smembers(REDIS_SET_KEY)logging.info("smember_res_len:{}".format(len(smems)))

将这个python运行加入playbook中

  • prome_heavy_expr_parse.yaml
- name:  localhosthosts: localhostuser: rootgather_facts:  falsevars_files:- config.yamltasks:- name:  merge resultshell: cd {{ prome_query_log.local_work_dir }}/../  && /usr/bin/python3 {{ prome_query_log.py_name }}connection: localrun_once: trueregister: result- name: Show debug infodebug: var=result verbosity=0

本节重点介绍 :

  • 将重查询记录增量更新到consul中
  • 同时将record记录更新到本地
  • 更新到redis中

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/499090.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Flink operator实现自动扩缩容

官网文档位置&#xff1a; 1.Autoscaler | Apache Flink Kubernetes Operator 2.Configuration | Apache Flink Kubernetes Operator 1.部署K8S集群 可参照我之前的文章k8s集群搭建 2.Helm安装Flink-Operator helm repo add flink-operator-repo https://downloads.apach…

LeetCode--排序算法(堆排序、归并排序、快速排序)

排序算法 归并排序算法思路代码时间复杂度 堆排序什么是堆&#xff1f;如何维护堆&#xff1f;如何建堆&#xff1f;堆排序时间复杂度 快速排序算法思想代码时间复杂度 归并排序 算法思路 归并排序算法有两个基本的操作&#xff0c;一个是分&#xff0c;也就是把原数组划分成…

加密流量TLS1.2 和TLS1.3的握手区别

加密流量TLS1.2 和TLS1.3的握手区别 TLS1.2 握手均是明文 1&#xff09;Client Hello 2&#xff09;Server Hello 3&#xff09;Certificate TLS1.3 握手中Client Hello是明文&#xff0c;而Server Hello中Extensions以及后面的握手信息不可见 1&#xff09;Client Hello…

5分钟掌握python中的匿名函数

lambda表达式&#xff0c;又称匿名函数&#xff0c;常用来表示内部仅包含1行表达式的函数。如果一个函数的函数体仅有 1 行表达式&#xff0c;则该函数就可以 用 lambda 表达式来代替。 lambda 表达式的语法格式如下&#xff1a; name lambda [list] : 表达式 其中&#xff…

单元测试3.0+ @RunWith(JMockit.class)+mock+Expectations

Jmockit使用笔记_基本功能使用Tested_Injectable_Mocked_Expectations_jmockit.class-CSDN博客 测试框架Jmockit集合junit使用 RunWith(JMockit.class) 写在测试案例类上的注解 Tested 在测试案例中,写在我们要测试的类上面, 一般用实现类 Injectable 在测试案例中声明…

保姆级教程Docker部署ClickHouse镜像

目录 1、安装Docker及可视化工具 2、创建挂载目录 3、获取配置文件 4、运行ClickHouse容器 5、Compose运行ClickHouse容器 6、查看ClickHouse运行状态 7、安装包部署 1、安装Docker及可视化工具 Docker及可视化工具的安装可参考&#xff1a;Ubuntu上安装 Docker及可视化…

如何通过深度学习提升大分辨率图像预测准确率?

随着科技的不断进步&#xff0c;图像处理在各个领域的应用日益广泛&#xff0c;特别是在医疗影像、卫星遥感、自动驾驶、安防监控等领域中&#xff0c;大分辨率图像的使用已经成为了一项不可或缺的技术。然而&#xff0c;大分辨率图像带来了巨大的计算和存储压力&#xff0c;同…

Spring实现Logback日志模板设置动态参数

版权说明&#xff1a; 本文由博主keep丶原创&#xff0c;转载请保留此块内容在文首。 原文地址&#xff1a; https://blog.csdn.net/qq_38688267/article/details/144842327 文章目录 背景设计日志格式实现配置动态取值logback-spring.xml 背景 多个单体服务间存在少量交互&…

【无线传感网】无线传感器网络拓扑控制技术

文章目录 拓扑控制的意义影响整个网络的生存时间减小节点间通信干扰&#xff0c;提高网络通信效率为路由协议、时间同步提供基础影响数据融合弥补节点失效的影响 拓扑控制的设计目标能量消耗覆盖度连通性算法的分布式程度网络延迟&#x1f6a9;干扰和竞争对称性鲁棒性和可扩展性…

如何在没有 iCloud 的情况下将联系人从 iPhone 传输到 iPhone

概括 近期iOS 13.5的更新以及苹果公司发布的iPhone SE在众多iOS用户中引起了不小的轰动。此外&#xff0c;不少变化&#xff0c;如暴露通知 API、Face ID 增强功能以​​及其他在 COVID-19 期间与公共卫生相关的新功能&#xff0c;吸引了 iPhone 用户尝试新 iPhone 并更新到最…

matlab 设计滤波器

滤波器可视化工具 fvtool 与 filterAnalyzer 设计滤波器&#xff1a; matlab 菜单栏 APP - 滤波器设计

Keil中的gcc

文章目录 一、IDE背后的命令1.1 IDE是什么1.2 IDE的背后是命令1.3 有两套主要的编译器 二、准备工作2.1 arm-linux-gcc和gcc是类似的2.2 Code::Blocks2.2.1 设置windows环境变量2.2.2 命令行示例 三、gcc编译过程详解3.1 程序编译4步骤3.2 gcc的使用方法3.2.1 gcc使用示例3.2.2…

SQL-Server链接服务器访问Oracle数据

SQL Server 链接服务器访问 Oracle 离线安装 .NET Framework 3.5 方法一&#xff1a;使用 NetFx3.cab 文件 下载 NetFx3.cab 文件&#xff0c;并将其放置在 Windows 10 系统盘的 C:Windows 文件夹中。 以管理员身份运行命令提示符&#xff0c;输入以下命令并回车&#xff1a; …

关于easy-es对时间范围查询遇到的小bug

前言&#xff1a;在使用easy-es之前作为一个小白的我只有es原生查询的基础&#xff0c;在自己通过查看官方文档自学easy-es遇到了一个挫折&#xff0c;其他的还好语法和MybatisPlus差不多&#xff0c;正以为我觉得很快就能入手&#xff0c;在对时间范围的判断就给我当头一棒&am…

typora+picgo core+minio自动上传图片

1. 在服务器上安装docker版本minio 创建/docker/minio文件夹 mkdir -p /docker/minio在此文件夹创建docker-compose.yml version: "3.5" services:minio:image: quay.io/minio/minio:latestcontainer_name: minioprivileged: truerestart: alwaysports:# API接口访…

WebRTC线程的启动与运行

WebRTC线程运行的基本逻辑&#xff1a; while(true) {…Get(&msg, …);…Dispatch(&msg);… }Dispatch(Message *pmsg) {…pmsg->handler->OnMessage(pmsg);… }在执行函数内部&#xff0c;就是一个while死循环&#xff0c;只做两件事&#xff0c;从队列里Get取…

【OceanBase】使用 Superset 连接 OceanBase 数据库并进行数据可视化分析

文章目录 前言一、前提条件二、操作步骤2.1 准备云主机实例2.2 安装docker-compose2.3 使用docker-compose安装Superset2.3.1 克隆 Superset 的 GitHub 存储库2.3.2 通过 Docker Compose 启动 Superset 2.4 开通 OB Cloud 云数据库2.5 获取连接串2.6 使用 Superset 连接 OceanB…

我们能否使用 ANSYS SPEOS 测量水质?

介绍 Ansys SPEOS 是动态环境科学领域的尖端工具&#xff0c;可为围绕水质管理的复杂问题提供深入的见解和创新解决方案。通过其光学系统功能&#xff0c;它为理解和改善不同环境的生态动态提供了一个强大的框架。 主要特点和优势 多材质建模&#xff1a; 为了准确模拟环境…

简易屏幕共享工具-基于WebSocket

前面写了两个简单的屏幕共享工具&#xff0c;不过那只是为了验证通过截屏的方式是否可行&#xff0c;因为通常手动截屏的频率很低&#xff0c;而对于视频来说它的帧率要求就很高了&#xff0c;至少要一秒30帧率左右。所以&#xff0c;经过实际的截屏工具验证&#xff0c;我了解…

论文分享 | PromptFuzz:用于模糊测试驱动程序生成的提示模糊测试

大语言模型拥有的强大能力可以用来辅助多种工作&#xff0c;但如何有效的辅助仍然需要人的精巧设计。分享一篇发表于2024年CCS会议的论文PromptFuzz&#xff0c;它利用模型提示生成模糊测试驱动代码&#xff0c;并将代码片段嵌入到LLVM框架中执行模糊测试。 论文摘要 制作高质…