DataX实战|使用Python 构建简易的DataX数据血缘工具(一)

导读:

在这篇文章中,我介绍了如何使用 Python 构建简易的 DataX 数据血缘工具,以便解决 DataXWeb 在查询表上下游关系时的不足。
选择 Flask 作为框架,利用 DataXWeb 的元数据 job_info 表和 job_json,通过 JSON 解析与 sqllineage 的 SQL 解析能力提取出任务相关的库表信息。

1. 背景

由于我们在 ETL 时全部在 datax 上,在 datax 里写了大量复杂的 sql,导致在排查问题时困难重重,而唯一的可视化页面是 dataxweb。

如果我需要查询一个表的上下游,只能在 dataxweb 里去搜,而 dataxweb 只支持搜任务名,若任务名与表名相同,那算走运;若历史任务名比较特殊,则无从查起。

因此,需要这么一个工具,输入一个表,就能知道这个表出现在哪些 datax 任务中,不管他是 source 表还是 target 表。相当于一个简单的血缘,或者上下游。

那么,如何实现呢?人生苦短,我选 Python。

2. 框架选择

显然这是一个前后端结合的项目,在 python 中,常用的前端框架有两种:

  • flask
  • django

这两个如何选择呢?

2.1 flask VS django

2.1.1 项目规模与复杂度:

Flask:轻量级,适合小型项目、微服务及功能单一应用,如简易 API、小型博客,代码结构扁平,上手快。

Django:全栈式,自带诸多功能,契合大型、复杂项目,像电商网站这类有繁杂业务逻辑、多模型与权限管理需求的企业级应用,借助其完善架构与内置组件能高效搭建。

2.1.2 开发速度与灵活性:

Flask:灵活性强,可按需自选第三方库集成构建,开发速度依开发者对组件熟悉度而定,适合特殊或实验性场景。

Django:内置功能助力快速开发大型应用,ORM、管理界面等省却基础搭建精力,但项目结构较固定,遵循既定模式,限制部分灵活性,利于大型项目组织维护。

2.1.3 性能与资源占用;

Flask:自身轻量,资源占用少,在简单场景性能佳,不过性能优化多靠开发者引入第三方库实现。

Django:自带缓存、连接池等优化手段提升大型项目性能,但因功能丰富,资源占用相对多,小型项目需适当优化。

2.1.4 社区与文档支持:

Flask:社区活跃,文档简洁且示例多,便于初学者入门,也能助开发者解决常见问题、选用扩展。

Django:社区庞大、生态丰富,遇问题易获解答,还有海量插件可选;官方文档详尽全面,覆盖各层次开发知识需求。

基于以上,本次的小项目使用 flask 是比较快速的。

3. 整体思路

3.1 dataxweb 元数据

既然所有的信息都在 dataxweb 中,那么先观察一下 dataxweb 的元数据。

在这里插入图片描述

DataXWeb元数据

这是所有的表,见名之意,任务相关信息在 job_info 中。

观察一下 job_info 的 DDL

CREATE TABLE `job_info` (`id` int NOT NULL AUTO_INCREMENT,`job_group` int NOT NULL COMMENT '执行器主键ID',`job_cron` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '任务执行CRON',`job_desc` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,`project_id` int DEFAULT NULL COMMENT '所属项目id',`add_time` datetime DEFAULT NULL,`update_time` datetime DEFAULT NULL,`user_id` int NOT NULL COMMENT '修改用户',`alarm_email` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '报警邮件',`executor_route_strategy` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '执行器路由策略',`executor_handler` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '执行器任务handler',`executor_param` varchar(512) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '执行器任务参数',`executor_block_strategy` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '阻塞处理策略',`executor_timeout` int NOT NULL DEFAULT '0' COMMENT '任务执行超时时间,单位分钟',`executor_fail_retry_count` int NOT NULL DEFAULT '0' COMMENT '失败重试次数',`glue_type` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'GLUE类型',`glue_source` mediumtext CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci COMMENT 'GLUE源代码',`glue_remark` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT 'GLUE备注',`glue_updatetime` datetime DEFAULT NULL COMMENT 'GLUE更新时间',`child_jobid` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '子任务ID,多个逗号分隔',`trigger_status` tinyint NOT NULL DEFAULT '0' COMMENT '调度状态:0-停止,1-运行',`trigger_last_time` bigint NOT NULL DEFAULT '0' COMMENT '上次调度时间',`trigger_next_time` bigint NOT NULL DEFAULT '0' COMMENT '下次调度时间',`job_json` text CHARACTER SET utf8 COLLATE utf8_general_ci COMMENT 'datax运行脚本',`replace_param` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '动态参数',`jvm_param` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT 'jvm参数',`inc_start_time` datetime DEFAULT NULL COMMENT '增量初始时间',`partition_info` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '分区信息',`last_handle_code` int DEFAULT '0' COMMENT '最近一次执行状态',`replace_param_type` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '增量时间格式',`reader_table` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT 'reader表名称',`primary_key` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '增量表主键',`inc_start_id` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '增量初始id',`increment_type` tinyint DEFAULT '0' COMMENT '增量类型',`datasource_id` bigint DEFAULT NULL COMMENT '数据源id',PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=600 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC;

这里有几个关键字段,

  • id,
  • job_desc 任务名,
  • project_id 所属项目,
  • trigger_status 调度状态:0-停止,1-运行 ,
  • job_json datax 运行脚本

3.2 job_json 分析

这里还是比较简单的,主体就是 job_json 。熟悉 datax 的朋友 应该了解,datax 主要是 json,分为 reader 和 writer 。大体上分为两种,

reader 里读整个表,
reader 里写复杂的 sql
第一种的 json 如下:

"reader": {"name": "mysqlreader","parameter": {"username": "root","password": "root","column": ["id","name"],"splitPk": "db_id","connection": [{"table": ["table"],"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/database"]}]}
}

这种比较舒服,解析 json 就能把所有的信息获取。难点在于第二种。

"reader": {"name": "mysqlreader","parameter": {"username": "root","password": "root","splitPk": "db_id","connection": [{"querySql": ["select * from table"],"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/database"]}]}
}

就库表信息而言,都在 connection 里,第二种的难点在于 querySql 中有复杂的 sql,需要把 sql 解析出来 。(writer 都相似,不再赘述)

3.3 sql 解析

在 python 中 有一个解析 sql 的优秀项目叫 sqllineage 。官网地址[1]这里简单演示一下使用 。

from sqllineage.runner import LineageRunnersql = ''
with open('sql.sql', 'r', encoding='utf-8') as f:sql = f.read()
result = LineageRunner(sql)
print(result)

会得到如下结果

Source Tables:<default>.bd_a<default>.bd_b<default>.bd_c<default>.res_a<default>.res_c
Target Tables:

3.4 思路总结

至此,我们来理一下思路:

所有任务都在 dataxweb 的元数据 job_info 表里存在
datax 任务的主体在 job_info 里的 job_json 中
job_json 一种可以直接获取库、表信息,另一种写复杂 sql 的较为麻烦,但是可以用 sqllineage 来解决。
采用 flask 框架,url 中提供目标表,通过查询数据库和解析 json 的方式来找到和目标表相关的任务 。

下一篇,讲如何具体实现 ,以及实现过程的踩坑如何解决。

查询效果预览
在这里插入图片描述

第二篇文章在这里,含代码

▼ 关注「DataSpeed」,获取更多技术干货 ▼

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/482247.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Android 14之HIDL转AIDL通信

Android 14之HIDL转AIDL通信 1、interface接口1.1 接口变更1.2 生成hidl2aidl工具1.3 执行hidl2aidl指令1.4 修改aidl的Android.bp文件1.5 创建路径1.6 拷贝生成的aidl到1和current1.7 更新与冻结版本1.8 编译模块接口 2、服务端代码适配hal代码修改2.1 修改Android.bp的hidl依…

51c视觉~YOLO~合集4

我自己的原文哦~ https://blog.51cto.com/whaosoft/12512597 1、Yolo8 1.1、检测PCB元件 技术世界正在以惊人的速度发展&#xff0c;而这种转变的核心是一个革命性的工具 — 计算机视觉。它最有趣的应用之一是电子印刷电路板 &#xff08;PCB&#xff09; 的检测和分析。本文…

基于树莓派的安保巡逻机器人--项目介绍

目录 一、项目简介 二、项目背景 三、作品研发技术方案 作品主要内容&#xff1a; 方案的科学性 设计的合理性 四、作品创新性及特点 五、作品自我评价 本篇为项目“基于树莓派的安保巡逻机器人”介绍博客 演示视频链接&#xff1a; 基于树莓派的安保巡逻机器人_音游…

多点DMALL启动招股:将在港交所上市,聚焦数字零售服务

近日&#xff0c;多点数智有限公司&#xff08;Dmall Inc.&#xff0c;下称“多点”或“多点DMALL”&#xff09;发布全球发售文件&#xff0c;于11月28日至12月3日招股&#xff0c;预计将于2024年12月6日在港交所主板挂牌上市。 招股书显示&#xff0c;多点DMALL本次全球发售的…

挑战用React封装100个组件【007】

项目地址 https://github.com/hismeyy/react-component-100 组件描述 今天的组件是用来展示聊天列表&#xff0c;或者论坛内容列表的组件。配合挑战006的时候开发的组件&#xff0c;可以显示用户的具体信息。 样式展示 前置依赖 今天&#xff0c;我分享的组件&#xff0c;需…

汉字Unicode编码相互转换API集成指南

汉字Unicode编码相互转换API集成指南 引言 在国际化的背景下&#xff0c;字符编码的统一变得尤为重要。Unicode作为一种通用字符集标准&#xff0c;能够支持全球几乎所有的语言文字&#xff0c;包括复杂的汉字系统。对于开发人员来说&#xff0c;掌握如何在不同的编码格式之间…

[Linux] 进程间通信——匿名管道命名管道

标题&#xff1a;[Linux] 进程间通信——匿名管道&&命名管道 水墨不写bug &#xff08;图片来源于网络&#xff09; 目录 一、进程间通信 二、进程间通信的方案——匿名管道 &#xff08;1&#xff09;匿名管道的原理 &#xff08;2&#xff09;使用匿名管道 三、进…

一体化数据安全平台uDSP 入选【年度创新安全产品 TOP10】榜单

近日&#xff0c;由 FreeBuf 主办的 FCIS 2024 网络安全创新大会在上海隆重举行。大会现场揭晓了第十届 WitAwards 中国网络安全行业年度评选获奖名单&#xff0c;该评选自 2015 年举办以来一直饱受赞誉&#xff0c;备受关注&#xff0c;评选旨在以最专业的角度和最公正的态度&…

pycharm链接neo4j(导入文件)

1.新建csv文件 2.写入文件 3.运行代码 import csv from py2neo import Graph, Node, Relationship# 连接到Neo4j数据库&#xff0c;使用Bolt协议 graph Graph("bolt://localhost:7687", auth("neo4j", "password"))# 读取CSV文件 with open(…

vscode ctrl+/注释不了css

方式一.全部禁用插件排查问题. 方式二.打开首选项的json文件,注释掉setting.json,排查是哪一行配置有问题. 我的最终问题:需要将 "*.vue": "vue",改成"*.vue": "html", "files.associations": { // "*.vue": &qu…

TCP三次握手与四次挥手(TCP重传机制,2MSL)超详细!!!计算机网络

本篇是关于3次握手和四次挥手的详细解释~ 如果对你有帮助&#xff0c;请点个免费的赞吧&#xff0c;谢谢汪。&#xff08;点个关注也可以&#xff01;&#xff09; 如果以下内容需要补充和修改&#xff0c;请大家在评论区多多交流~。 目录 1. TCP头部&#xff1a; 2. 三次握手…

单片机学习笔记 15. 串口通信(理论)

更多单片机学习笔记&#xff1a;单片机学习笔记 1. 点亮一个LED灯单片机学习笔记 2. LED灯闪烁单片机学习笔记 3. LED灯流水灯单片机学习笔记 4. 蜂鸣器滴~滴~滴~单片机学习笔记 5. 数码管静态显示单片机学习笔记 6. 数码管动态显示单片机学习笔记 7. 独立键盘单片机学习笔记 8…

C#中switch语句使用

编写一个程序&#xff0c;使用switch语句将用户输入的分数转换成等级&#xff0c;如表 private static void Main(string[] args) { Console.WriteLine("请输入分数&#xff1a;"); int score int.Parse(Console.ReadLine()); switch (score) …

[网络安全]sqli-labs Less-5 解题详析

[网络安全]Less-5 GET - Double Injection - Single quotes - String:双注入GET单引号字符型注入 判断注入类型判断注入点个数查库名&#xff08;爆破&#xff09; left函数抓包查库名&#xff08;双查询注入&#xff09; 原理实例查库名&#xff08;extractvalue函数&#xff…

pyspark实现基于协同过滤的电影推荐系统

最近在学一门大数据的课&#xff0c;课程要求很开放&#xff0c;任意做一个大数据相关的项目即可&#xff0c;不知道为什么我就想到推荐算法&#xff0c;一直到着手要做之前还没有新的更好的来代替&#xff0c;那就这个吧。 推荐算法 推荐算法的发展由来已久&#xff0c;但和…

python股票数据分析(Pandas)练习

需求&#xff1a; 使用pandas读取一个CSV文件&#xff0c;文件内容包括股票名称、价格和交易量。完成以下任务&#xff1a; 找出价格最高的股票&#xff1b; 计算总交易量&#xff1b; 绘制价格折线图。 代码实现&#xff1a; import pandas as pd import matplotlib.pyplot …

利用Python爬虫精准获取淘宝商品详情的深度解析

在数字化时代&#xff0c;数据的价值日益凸显&#xff0c;尤其是在电子商务领域。淘宝作为中国最大的电商平台之一&#xff0c;拥有海量的商品数据&#xff0c;对于研究市场趋势、分析消费者行为等具有重要意义。本文将详细介绍如何使用Python编写爬虫程序&#xff0c;精准获取…

K8s调度器扩展(scheduler)

1.K8S调度器 筛选插件扩展 为了熟悉 K8S调度器扩展步骤&#xff0c;目前只修改 筛选 插件 准备环境&#xff08;到GitHub直接下载压缩包&#xff0c;然后解压&#xff0c;解压要在Linux系统下完成&#xff09; 2. 编写调度器插件代码 在 Kubernetes 源代码目录下编写调度插件…

领养我的宠物:SpringBoot开发指南

第2章 开发环境与技术 本章节对开发宠物领养系统需要搭建的开发环境&#xff0c;还有宠物领养系统开发中使用的编程技术等进行阐述。 2.1 Java语言 Java语言是当今为止依然在编程语言行业具有生命力的常青树之一。Java语言最原始的诞生&#xff0c;不仅仅是创造者感觉C语言在编…

Permute for Mac 媒体文件格式转换软件 安装教程【音视频图像文件转换,简单操作,轻松转换,提高效率】

Mac分享吧 文章目录 Permute for Mac 格式转换软件 效果图展示一、Permute 格式转换软件 Mac电脑版——v3.11.15⚠️注意事项&#xff1a;1️⃣&#xff1a;下载软件2️⃣&#xff1a;安装软件2.1 左侧安装包拖入右侧文件夹中&#xff0c;等待安装完成&#xff0c;运行软件2.2…