Wend看源码-DataX

官方文档

GitHub - alibaba/DataX: DataX是阿里云DataWorks数据集成的开源版本。

简介

       DataX 是由阿里巴巴开源的一款功能强大的 ETL 工具。所谓 ETL,具体涵盖了 Extract(抽取)、Transform(转换)以及 Load(加载)这三个关键环节,指的是把数据从各式各样的数据源中抽取出来,随后经过数据清洗、格式转换等一系列处理操作,最终加载到诸如数据仓库、数据库这类目标存储系统当中的完整过程。而 ETL 工具的核心价值就在于能够自动化这一复杂的数据处理流程,为企业的数据整合与应用提供极大便利。

        DataX 运用了 Plugin + FrameWork 的架构模式。其插件(Plugin)的实现是基于 ClassLoader 的插件热插拔原理,通过破坏 Class 加载的双亲委派机制,运用动态加载 jar 包的方法,达成了插件热插拔的效果。下文将会针对 DataX 的插件热插拔机制展开更为详尽的阐述。

核心流程

   图2 datax 核心流程泳道图

Engine.start

  1. column 字段格式转换:在这个步骤中,对 column 字段进行格式转换。

  2. 初始化插件参数配置 pluginRegisterCenter,其中插件类型包括reader、transformer和writer三种插件类型

  3. 初始化Container :Container 是执行容器的抽象类分为JobContainer 和 TaskGroupContainer

  4. 初始化 Configuration:Configuration 包含每个任务的信息配置

  5. 初始化PerfTrace:PerfTrace是 DataX中的链路追踪对象

  6. Container.start :具体容器的运行

JobContainer.start

  1. preCheck :进行预处理前的检查工作。

  2. preHandle:处理前的操作流程

  3. init :容器初始化操作

  4. prepare :进行准备操作,触发 reader 和 writer 的 prepare 方法。

  5. split 拆分:根据配置计算出 “需要的通道数”,“task 任务数量”,“totalStage”。

  6. post :触发 reader 和 writer 的 post 方法,进行一些后续的清理、资源释放或者补充性的数据处理等操作。

  7. postHandle :对整个任务执行情况做最后的处理和总结。

  8. invokeHooks :调用外部 hooks,进行监控与汇报等工作。

TaskGroupContainer.start

  1. 参数准备:准备参数,包括检查时间间隔、状态汇报时间间隔和通道数量等。

  2. 注册 communication :communication包含状态及统计信息交互类

  3. 处理失败的任务重试机制是否重试:判断处理失败的任务是否需要重试。

  4. 删除未执行完毕的TaskExecutor 集合

  5. 处理失败的任务,根据重试机制判断是否重新加入任务列表

  6. 删除执行完毕的任务

  7. 失败的任务汇报错误

  8. 根据配置生成 TaskExecutor 集合

  9. TaskExecutor.doStart:启动 TaskExecutor。

  10. reportTaskGroupCommunication 汇报任务

new TaskExecutor

  1. 初始化 taskExcutor

  2. 根据配置生成不同的 Transformer

  3. 根据配置生成 writer 线程

  4. 根据配置生成 reader 线程

TaskExecutor.doStart

  1. 启动 writer 线程:启动 writer 线程。

  2. 启动 reader 线程:启动 reader 线程。

DataX 插件热插拔机制

类加载器(ClassLoader)

        在 Java 中,类加载器是一个负责加载类文件(.class文件)的组件。它是 Java 运行时环境(JRE)的一部分,用于将字节码文件加载到 Java 虚拟机(JVM)的内存中,使得 JVM 能够执行这些类所定义的代码。

类加载器的实际应用场景
        热部署
  • 在一些应用服务器(如 Tomcat)中,利用自定义类加载器来实现热部署功能。当应用程序的代码发生变化时,通过重新加载更新后的类而不需要重启整个服务器。例如,在开发 Web 应用时,修改了一个 Servlet 类,服务器可以通过自定义类加载器重新加载这个 Servlet,使得修改能够立即生效。

        插件(Plugin)架构
  • 可以使用自定义类加载器来加载插件中的类。这样,插件可以被动态地加载和卸载,而不会影响主应用程序的运行。Datax 就是利用自定义的ClassLoader 加载对应的reader和 writer 的jar 包,以实现通过配置动态加载的效果。

双亲委派

  • 这是 Java 类加载器的一个重要特性。当一个类加载器收到加载类的请求时,它首先会把这个请求委派给它的父类加载器。只有当父类加载器无法完成加载任务(找不到类)时,子类加载器才会尝试自己加载。例如,当应用程序类加载器收到加载java.util.Date类的请求时,它会先委派给扩展类加载器,扩展类加载器又会委派给引导类加载器。因为引导类加载器可以加载java.util.Date(它属于核心库),所以就由引导类加载器完成加载,而应用程序类加载器和扩展类加载器就不需要再进行加载操作了。

  • 双亲委派模型的优点是可以保证 Java 核心类库的安全性和一致性。例如,它可以防止用户自定义的类覆盖 Java 核心类库中的类。如果没有这种机制,用户可能会编写一个名为java.lang.Object的类,这会导致整个 Java 系统的混乱。通过双亲委派模型,即使有这样的类存在,由于类加载器会先委派给引导类加载器加载真正的java.lang.Object,用户自定义的同名类就不会被加载。

DataX 破坏双亲委派机制以实现Plugin机制

        DataX 的插件机制是破坏了双亲委派机制。主要有三个原因:插件隔离的需求,插件动态加载的需求,加载非标准位置插件的需求。

  • 插件隔离的需求:DataX 需要对不同数据源的插件进行隔离。因为不同数据源的插件可能会包含同名的类,但这些类在语义和功能上是不同的,用于适配不同数据源的特定需求。例如,MySQL 插件和 Oracle 插件可能都有一个名为DataReader的类,但它们的实现细节完全不同,用于从各自的数据源读取数据。如果遵循双亲委派机制,当加载DataReader类时,可能会出现冲突,无法正确区分是哪个数据源插件对应的类。通过破坏双亲委派机制,DataX 为每个插件使用独立的类加载器,确保了插件之间的类隔离。

  • 插件动态加载的需求:DataX 插件可能需要动态加载和更新。在实际应用中,可能会根据业务需求添加新的数据源插件或者更新现有的插件。如果采用双亲委派机制,已加载的类很难被更新,因为类加载器一旦加载了一个类,后续相同全限定名的类请求会直接返回已加载的类。DataX 通过破坏双亲委派机制,能够使用自定义类加载器根据插件的更新情况,灵活地重新加载插件类,实现插件的动态加载和更新。

  • 加载非标准位置插件的需求:DataX 插件可能存储在非标准的位置,例如通过自定义的插件仓库或者网络位置存储插件。这些插件的加载位置不符合双亲委派机制下标准类加载器(如引导类加载器、扩展类加载器和应用程序类加载器)的加载路径。通过破坏双亲委派机制,DataX 可以利用自定义类加载器直接从这些特殊位置加载插件类,以满足插件的加载需求。

DataX transformer 模块详解

        DataX 的Transformer 包含以下几种类型:SubstrTransformer,PadTransformer,ReplaceTransformer,GroovyTransformer,DigestTransformer

SubstrTransformer(字符串截取转换)

        主要用于从原始数据的字符串中提取指定位置和长度的子字符串。这在数据清洗和转换过程中非常有用,例如,当数据源中的某个字段包含了过多的冗余信息,只需要其中的一部分时,就可以使用 SubstrTransformer 来提取有用的部分。

        DataX中使用SubstrTranformer 的标识符为 dx_substr

dx_substr(1,"2","5")  column 1的value为“dataxTest”=>"taxTe"
dx_substr(1,"5","10")  column 1的value为“dataxTest”=>"Test"

PadTransformer(填充转换)

        用于对字符串进行填充操作。它可以在字符串的左侧或右侧添加指定的字符,达到规定的长度。这种转换在需要统一数据格式长度的场景中很常见,比如金融数据中,为了使账户号码等数据格式统一,便于存储和比较。

        DataX中使用PadTranformer 的标识符为 dx_pad

dx_pad(1,"l","4","A"), 如果column 1 的值为 xyz=> Axyz, 值为 xyzzzzz => xyzz
dx_pad(1,"r","4","A"), 如果column 1 的值为 xyz=> xyzA, 值为 xyzzzzz => xyzz

ReplaceTransformer(替换转换)

        主要功能是将原始数据中的指定字符串替换为新的字符串。在处理数据中的错误标记、旧的编码格式或者不需要的字符等情况时非常有用。

          DataX中使用ReplaceTranformer 的标识符为 dx_replace

dx_replace(1,"2","4","****")  column 1的value为“dataxTest”=>"da****est"
dx_replace(1,"5","10","****")  column 1的value为“dataxTest”=>"datax****"

FilterTransformer(过滤转换)

        用于对数据进行筛选过滤。它可以根据设定的条件,决定是否保留数据。这在去除不需要的数据,如噪声数据、不符合业务规则的数据等场景中很重要。

         DataX中使用FilterTranformer 的标识符为 dx_filter

dx_filter(1,"like","dataTest")  
dx_filter(1,">=","10") 

GroovyTransformer(基于 Groovy 脚本的转换)

        Groovy 是一种动态语言,GroovyTransformer 允许用户通过编写 Groovy 脚本来实现复杂的数据转换逻辑。这种方式非常灵活,可以处理各种复杂的情况,比如根据多个字段的值进行计算、根据复杂的业务规则进行数据重组等。

        DataX中使用GroovyTranformer 的标识符为 dx_groovy

groovy 实现的subStr:String code = "Column column = record.getColumn(1);\n" +" String oriValue = column.asString();\n" +" String newValue = oriValue.substring(0, 3);\n" +" record.setColumn(1, new StringColumn(newValue));\n" +" return record;";dx_groovy(record);

DigestTransformer(摘要转换)

        通常用于生成数据的摘要信息,如哈希值。这在数据安全、数据验证等场景中很有用。例如,为了验证数据在传输过程中是否被篡改,可以在传输前对数据生成哈希值,在接收后再次生成哈希值并进行比较。

        DataX中使用DisgestTranformer 的标识符为 dx_digest

dx_digest(1,"md5","toUpperCase"), column 1的值为 xyzzzzz => 9CDFFC4FA4E45A99DB8BBCD762ACFFA2

DataX transformer 模块重构思路

当前Transformer 模块的缺点(个人观点)

        目前我认为DataX Transformer 模块的最大缺点就是JSON配置化太过复杂,且集中于一个文件中,导致用户阅读和使用不便;

        同时Transformer 模块目前不够灵活,只能支持简单的转换需求。我认为Transformer模块的设计可以支持SPI 机制,以保证用户可以自定义Transformer 来实现某些定制化的需求。

Transformer 重构架构

        我的Transformer 架构采用的是文档即代码的方式,用户编辑的配置文件不仅仅是配置,而且是适用于用户阅读的文档。Transformer 配置以更加清晰的文档展现,这样后续的Transformer 规则配置的修改就不再需要开发人员进行,而是可以转移到运维人员或者是用户自己手动修改。

        首先我们需要将Transformer 配置从job.json中独立出来,形成三个新的文档型配置文件:字段对应关系配置文件和值对应关系配置文件以及用户自定义Transform 配置文件。以下是Transformer 重构架构图。

        字段对应关系配置文件负责将reader 和writer 的字段进行匹配,比如reader 的字段是name,对应writer 的字段的 old_name,通过配置可以轻松完成转换。同时字段对应关系配置可以设定字段的转换类型,比如reader 中 jobno的字段类型是string,而writer 中jobno 的字段类型需要是 Integer。

        值对应关系配置文件负责处理抽取数据源和加载数据源的值转换工作,比如reader 中某个表包含字段 direction 方向,0 上行,1下行;而writer 中需要direction 方向的值为 1 上行,2下行;通过值对应关系配置文件,我们可以轻松实现这种值转换工作。同时值对应关系配置文件可以设定固定默认值,比如将checkFlag 字段设为固定默认值1。

        自定义Transform 配置文件主要描述用户的自定义SPI 实现的详细内容,是介绍自定义SPI 实现的详细设计文档。

        通过以上三种配置文件,我们将Transform 的配置从job.json 中分离出来,提高了配置的可读性和维护性。同时,新的配置方式可以更加良好的支持Transform 模块的功能,字段对应关系配置和值对应关系配置已经可以满足大部分的数据转换需求,而SPI 的扩展则可以轻松实现某些用户特定的需求。在这种文档即代码的架构方式中,配置文件的设计将是Transform 模块架构的重点,我将列取几个配置文件的模板以供感兴趣的小伙伴参考。

字段对应关系转换文件配置

{
"title":"字段对应关系转换文件配置",
"data":[
{
"old_column_name":"抽取表的字段名称",
"new_column_name":"加载表的字段名称",
"column_notes":"字段描述",
"column_explain":"字段含义解释"
},
{
"old_column_name":"reqResult",
"new_column_name":"bizReqResult",
"column_notes":"业务类型",
"column_explain":"0未处理 非零:处理完毕:1 同意 2拒绝"
},
{
"old_column_name":"initTime",
"new_column_name":"createTime",
"column_notes":"创建时间",
"column_explain":"创建时间",
"column_format":"yyyy-MM-dd"
}
]
}

值对应关系转换文件配置

{
"title":"值对应关系转换文件配置",
"data":[
{
"column_name":"字段列名",
"column_notes":"字段描述",
"column_old_explain":"抽取表的字段含义解释",
"column_new_explain":"加载表的字段含义解释",
"corresponding":"{"0":"2","1":"1","2":"0"}"
},
{
"column_name":"业务请求",
"column_notes":"bizReqResult",
"column_old_explain":"0未处理 非零:处理完毕:1 同意 2拒绝",
"column_new_explain":"0 不同意 1 同意 2 取消",
"corresponding":"{"0":"2","1":"1","2":"0"}"
},
{
"column_name":"字段列名",
"column_notes":"字段描述",
"column_explain":"字段含义",
"default":"默认值"
},
{
"column_name":"checkFlag",
"column_notes":"检查标识",
"column_explain":"默认不检查 0",
"default":"0"
}
]
}

固定位数的转换文件配置

{
"title":"固定位数的转换文件配置",
"data":[
{
"column_catLength":"字段裁剪位数",
"column_name":"转换的字段名称",
"column_index":"字段标记位数",
"column_notes":"字段描述",
"column_explain":"字段解释"
},
{
"column_catLength":10,
"column_name":"tradeType",
"column_index":0,
"column_notes":"交易性质",
"column_explain":"默认全0"
}
]
}

        如果追求更好的阅读性和维护性,可以将字段对应关系配置和值对应关系配置再独立划分出两个新的配置文件,字段类型转换配置文件和字段默认值配置文件。所有的配置以filterChan责任链模式的方式,采用管道过滤器的架构每个filter 对数据进行并行处理,完成后形成最终的Transform过滤数据提供给writer 插件进行加载。

参考文献

DataX教程(10)- DataX插件热插拔原理_datax plugin-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/488002.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vue.js:代码架构组成与布局设置

前言:最近在弄一个开源的管理系统项目,前后端分离开发,这里对前端的Vue框架做一个总结,有遗漏和错误的地方欢迎大家指出~ 🏡个人主页:謬熙,欢迎各位大佬到访❤️❤️❤️~ 👲个人简介…

静态路由与交换机配置实验

1.建立网络拓扑 添加2台计算机,标签名为PC0、PC1;添加2台二层交换机2960,标签名为S0、S1;添加2台路由器2811,标签名为R0、R1;交换机划分的VLAN及端口根据如下拓扑图,使用直通线、DCE串口线连接…

与 Cursor AI 对话编程:2小时开发报修维修微信小程序

本文记录了如何通过与 Cursor AI 对话,全程不写一行代码的情况下,完成一个完整的报修小程序。整个过程展示了 AI 如何帮助我们: 生成代码 、解决问题、优化实现、完善细节。 先看一下效果图: 一、项目配置 首先我是这样和 AI 对…

汽车车牌标记支持YOLO,COCO,VOC三种格式标记,4000张图片的数据集

本数据集支持YOLO,COCO,VOC三种格式标记汽车车牌,无论是新能源汽车还是油车都能识别标记,该数据集一共包含4000张图片 数据集分割 4000总图像数 训练组 70% 2800图片 有效集 20% 800图片 测…

共享无人系统,便捷生活触手可得

共享无人系统适用各种无人场景:共享麻将室、共享茶室、共享健身房、共享自习室、共享桌球室,实现线上预约,一键预约,自由组合时间,智能通断电,智能语音提醒。 优惠券是常用的营销工具,后台创建之后发放给会…

使用HTML获取商品详情:技术实现与最佳实践

1. 引言 在电子商务领域,获取商品详情是提升用户体验和增强网站功能性的关键。本文将探讨如何使用HTML结合其他技术手段获取商品详情,并展示如何将这些信息有效地呈现给用户。 2. 理解商品详情页面的结构 在开始编码之前,我们需要了解商品…

怎么禁用 vscode 中点击 go 包名时自动打开浏览器跳转到 pkg.go.dev

本文引用怎么禁用 vscode 中点击 go 包名时自动打开浏览器跳转到 pkg.go.dev 在 vscode 设置项中配置 gopls 的 ui.navigation.importShortcut 为 Definition 即可。 "gopls": {"ui.navigation.importShortcut": "Definition" }ui.navigation.i…

从 Zuul 迁移到 Spring Cloud Gateway:一步步实现服务网关的升级

从 Zuul 迁移到 Spring Cloud Gateway:一步步实现服务网关的升级 迁移前的准备工作迁移步骤详解第一步:查看源码第二步:启动类迁移第三步:引入 Gateway 依赖第四步 编写bootstrap.yaml第五步:替换路由配置第六步&#…

【金融贷后】贷后运营精细化管理

文章目录 一、贷后专业术语讲解① 什么是贷后,贷后部是干什么的?② 贷后部门常见组织架构?③ 贷后专业术语有哪些? 二、贷后常用作业手段介绍① 贷后产品形态介绍?② 催收常用的方法? 三、贷后策略岗位介绍…

利用cnocr库完成中文扫描pdf文件的文字识别

很多pdf文件文字识别软件都会收费,免费的网页版可能会带来信息泄露,还有一些类似于腾讯AI和百度AI的接口都有调用次数限制,因此,利用识别正确率极高且免费的cnocr库来自己动手做个pdf文件文字识别程序就是一个很不错的选择。以下程…

论文阅读 -- IDENTIFYING THE RISKS OF LM AGENTS WITHAN LM-EMULATED SANDBOX, ICLR2024

论文链接:https://arxiv.org/pdf/2309.15817 目录 ABSTRACT 1 INTRODUCTION 2 BACKGROUND & PROBLEM STATEMENT 3 CONSTRUCTING TOOLEMU 3.1 EMULATING TOOL EXECUTIONS WITH LANGUAGE MODELS 3.2 DESIGNING AUTOMATIC EVALUATIONS WITH LANGUAGE MODEL…

期末复习-计算机网络篇SCAU

第一章:概述 1.计算机网络的特点,互联网发展的三个阶段 特点:连通性、资源共享 三个阶段: 1969-1990:从单个网络ARPANET向互联网发展 1985-1993:建成了三级结构的互联网 1993-现在:全球范…

TesseractOCR-GUI:基于WPF/C#构建TesseractOCR简单易用的用户界面

前言 前篇文章使用Tesseract进行图片文字识别介绍了如何安装TesseractOCR与TesseractOCR的命令行使用。但在日常使用过程中,命令行使用还是不太方便的,因此今天介绍一下如何使用WPF/C#构建TesseractOCR简单易用的用户界面。 普通用户使用 参照上一篇教…

openGauss开源数据库实战二十一

文章目录 任务二十一 使用JDBC访问openGauss数据库任务目标实施步骤一、准备工作 二、下载并安装JavaSE81 下载JavaSE8安装Java8SE并配置环境变量 三、下载并安装eclipse四、下载并安装openGauss的JDBC驱动包五、使用IDEA编写JDBC测试程序1 使用IDEA的SSH连接虚拟机2 创建项目并…

算法——前缀和

如果我们想要得到数组中一段区间的和最朴素的想法肯定是我们从区间的开始下标遍历到结束下标并累加,但是这显然存在一个问题,时间开销是O(n)的级别,并且有着大量的重复计算,求[n, m]的和后继续求[n…m…p]区…

可视化建模以及UML期末复习篇----UML图

这是一篇相对较长的文章,如你们所见,比较详细,全长两万字。我不建议你们一次性看完,直接跳目录找你需要的知识点即可。 --------欢迎各位来到我UML国! 一、UML图 总共有如下几种: 用例图(Use Ca…

Tableau数据可视化与仪表盘搭建

1.Tableau介绍 可视化功能 数据赋能 数据赋能就是将我们的数据看板发布到我们的线上去 这里的IP地址是业务部门可以通过账号密码登入的 我们也可以根据需要下载,选中并点击下载即可 下载下来之后,自己就能根据数据进行自定义的分析 也可以下载图片 还有…

NanoLog起步笔记-7-log解压过程初探

nonolog起步笔记-6-log解压过程初探 再看解压过程建立调试工程修改makefile添加新的launch项 注:重新学习nanolog的README.mdPost-Execution Log Decompressor 下面我们尝试了解,解压的过程,是如何得到文件头部的meta信息的。 再看解压过程 …

设计模式-装饰器模式(结构型)与责任链模式(行为型)对比,以及链式设计

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言1.装饰器模式1.1概念1.2作用1.3应用场景1.4特点1.5类与对象关系1.6实现 2责任链模式2.1概念2.2作用2.3应用场景2.4特点2.5类与对象关系2.6实现 3.对比总结 前言…

llama-factory实战: 基于qwen2.5-7b 手把手实战 自定义数据集清洗 微调

基于qwen2.5 手把手实战 自定义数据集 微调(llama-factory) 准备工作1.数据集准备(例:民法典.txt)2.服务器准备(阿里云 DSW 白嫖)3.环境配置pip 升级模型下载微调助手 4.数据集处理脚本文件4.1文本分割(ber…