官方文档
GitHub - alibaba/DataX: DataX是阿里云DataWorks数据集成的开源版本。
简介
DataX 是由阿里巴巴开源的一款功能强大的 ETL 工具。所谓 ETL,具体涵盖了 Extract(抽取)、Transform(转换)以及 Load(加载)这三个关键环节,指的是把数据从各式各样的数据源中抽取出来,随后经过数据清洗、格式转换等一系列处理操作,最终加载到诸如数据仓库、数据库这类目标存储系统当中的完整过程。而 ETL 工具的核心价值就在于能够自动化这一复杂的数据处理流程,为企业的数据整合与应用提供极大便利。
DataX 运用了 Plugin + FrameWork 的架构模式。其插件(Plugin)的实现是基于 ClassLoader 的插件热插拔原理,通过破坏 Class 加载的双亲委派机制,运用动态加载 jar 包的方法,达成了插件热插拔的效果。下文将会针对 DataX 的插件热插拔机制展开更为详尽的阐述。
核心流程
图2 datax 核心流程泳道图
Engine.start
-
column 字段格式转换:在这个步骤中,对 column 字段进行格式转换。
-
初始化插件参数配置 pluginRegisterCenter,其中插件类型包括reader、transformer和writer三种插件类型
-
初始化Container :Container 是执行容器的抽象类分为JobContainer 和 TaskGroupContainer
-
初始化 Configuration:Configuration 包含每个任务的信息配置
-
初始化PerfTrace:PerfTrace是 DataX中的链路追踪对象
-
Container.start :具体容器的运行
JobContainer.start
-
preCheck :进行预处理前的检查工作。
-
preHandle:处理前的操作流程
-
init :容器初始化操作
-
prepare :进行准备操作,触发 reader 和 writer 的 prepare 方法。
-
split 拆分:根据配置计算出 “需要的通道数”,“task 任务数量”,“totalStage”。
-
post :触发 reader 和 writer 的 post 方法,进行一些后续的清理、资源释放或者补充性的数据处理等操作。
-
postHandle :对整个任务执行情况做最后的处理和总结。
-
invokeHooks :调用外部 hooks,进行监控与汇报等工作。
TaskGroupContainer.start
-
参数准备:准备参数,包括检查时间间隔、状态汇报时间间隔和通道数量等。
-
注册 communication :communication包含状态及统计信息交互类
-
处理失败的任务重试机制是否重试:判断处理失败的任务是否需要重试。
-
删除未执行完毕的TaskExecutor 集合
-
处理失败的任务,根据重试机制判断是否重新加入任务列表
-
删除执行完毕的任务
-
失败的任务汇报错误
-
根据配置生成 TaskExecutor 集合
-
TaskExecutor.doStart:启动 TaskExecutor。
-
reportTaskGroupCommunication 汇报任务
new TaskExecutor
-
初始化 taskExcutor
-
根据配置生成不同的 Transformer
-
根据配置生成 writer 线程
-
根据配置生成 reader 线程
TaskExecutor.doStart
-
启动 writer 线程:启动 writer 线程。
-
启动 reader 线程:启动 reader 线程。
DataX 插件热插拔机制
类加载器(ClassLoader)
在 Java 中,类加载器是一个负责加载类文件(.class
文件)的组件。它是 Java 运行时环境(JRE)的一部分,用于将字节码文件加载到 Java 虚拟机(JVM)的内存中,使得 JVM 能够执行这些类所定义的代码。
类加载器的实际应用场景
热部署
-
在一些应用服务器(如 Tomcat)中,利用自定义类加载器来实现热部署功能。当应用程序的代码发生变化时,通过重新加载更新后的类而不需要重启整个服务器。例如,在开发 Web 应用时,修改了一个 Servlet 类,服务器可以通过自定义类加载器重新加载这个 Servlet,使得修改能够立即生效。
插件(Plugin)架构
-
可以使用自定义类加载器来加载插件中的类。这样,插件可以被动态地加载和卸载,而不会影响主应用程序的运行。Datax 就是利用自定义的ClassLoader 加载对应的reader和 writer 的jar 包,以实现通过配置动态加载的效果。
双亲委派
-
这是 Java 类加载器的一个重要特性。当一个类加载器收到加载类的请求时,它首先会把这个请求委派给它的父类加载器。只有当父类加载器无法完成加载任务(找不到类)时,子类加载器才会尝试自己加载。例如,当应用程序类加载器收到加载
java.util.Date
类的请求时,它会先委派给扩展类加载器,扩展类加载器又会委派给引导类加载器。因为引导类加载器可以加载java.util.Date
(它属于核心库),所以就由引导类加载器完成加载,而应用程序类加载器和扩展类加载器就不需要再进行加载操作了。 -
双亲委派模型的优点是可以保证 Java 核心类库的安全性和一致性。例如,它可以防止用户自定义的类覆盖 Java 核心类库中的类。如果没有这种机制,用户可能会编写一个名为
java.lang.Object
的类,这会导致整个 Java 系统的混乱。通过双亲委派模型,即使有这样的类存在,由于类加载器会先委派给引导类加载器加载真正的java.lang.Object
,用户自定义的同名类就不会被加载。
DataX 破坏双亲委派机制以实现Plugin机制
DataX 的插件机制是破坏了双亲委派机制。主要有三个原因:插件隔离的需求,插件动态加载的需求,加载非标准位置插件的需求。
-
插件隔离的需求:DataX 需要对不同数据源的插件进行隔离。因为不同数据源的插件可能会包含同名的类,但这些类在语义和功能上是不同的,用于适配不同数据源的特定需求。例如,MySQL 插件和 Oracle 插件可能都有一个名为
DataReader
的类,但它们的实现细节完全不同,用于从各自的数据源读取数据。如果遵循双亲委派机制,当加载DataReader
类时,可能会出现冲突,无法正确区分是哪个数据源插件对应的类。通过破坏双亲委派机制,DataX 为每个插件使用独立的类加载器,确保了插件之间的类隔离。 -
插件动态加载的需求:DataX 插件可能需要动态加载和更新。在实际应用中,可能会根据业务需求添加新的数据源插件或者更新现有的插件。如果采用双亲委派机制,已加载的类很难被更新,因为类加载器一旦加载了一个类,后续相同全限定名的类请求会直接返回已加载的类。DataX 通过破坏双亲委派机制,能够使用自定义类加载器根据插件的更新情况,灵活地重新加载插件类,实现插件的动态加载和更新。
-
加载非标准位置插件的需求:DataX 插件可能存储在非标准的位置,例如通过自定义的插件仓库或者网络位置存储插件。这些插件的加载位置不符合双亲委派机制下标准类加载器(如引导类加载器、扩展类加载器和应用程序类加载器)的加载路径。通过破坏双亲委派机制,DataX 可以利用自定义类加载器直接从这些特殊位置加载插件类,以满足插件的加载需求。
DataX transformer 模块详解
DataX 的Transformer 包含以下几种类型:SubstrTransformer,PadTransformer,ReplaceTransformer,GroovyTransformer,DigestTransformer。
SubstrTransformer(字符串截取转换)
主要用于从原始数据的字符串中提取指定位置和长度的子字符串。这在数据清洗和转换过程中非常有用,例如,当数据源中的某个字段包含了过多的冗余信息,只需要其中的一部分时,就可以使用 SubstrTransformer 来提取有用的部分。
DataX中使用SubstrTranformer 的标识符为 dx_substr
dx_substr(1,"2","5") column 1的value为“dataxTest”=>"taxTe" dx_substr(1,"5","10") column 1的value为“dataxTest”=>"Test"
PadTransformer(填充转换)
用于对字符串进行填充操作。它可以在字符串的左侧或右侧添加指定的字符,达到规定的长度。这种转换在需要统一数据格式长度的场景中很常见,比如金融数据中,为了使账户号码等数据格式统一,便于存储和比较。
DataX中使用PadTranformer 的标识符为 dx_pad
dx_pad(1,"l","4","A"), 如果column 1 的值为 xyz=> Axyz, 值为 xyzzzzz => xyzz dx_pad(1,"r","4","A"), 如果column 1 的值为 xyz=> xyzA, 值为 xyzzzzz => xyzz
ReplaceTransformer(替换转换)
主要功能是将原始数据中的指定字符串替换为新的字符串。在处理数据中的错误标记、旧的编码格式或者不需要的字符等情况时非常有用。
DataX中使用ReplaceTranformer 的标识符为 dx_replace
dx_replace(1,"2","4","****") column 1的value为“dataxTest”=>"da****est" dx_replace(1,"5","10","****") column 1的value为“dataxTest”=>"datax****"
FilterTransformer(过滤转换)
用于对数据进行筛选过滤。它可以根据设定的条件,决定是否保留数据。这在去除不需要的数据,如噪声数据、不符合业务规则的数据等场景中很重要。
DataX中使用FilterTranformer 的标识符为 dx_filter
dx_filter(1,"like","dataTest") dx_filter(1,">=","10")
GroovyTransformer(基于 Groovy 脚本的转换)
Groovy 是一种动态语言,GroovyTransformer 允许用户通过编写 Groovy 脚本来实现复杂的数据转换逻辑。这种方式非常灵活,可以处理各种复杂的情况,比如根据多个字段的值进行计算、根据复杂的业务规则进行数据重组等。
DataX中使用GroovyTranformer 的标识符为 dx_groovy
groovy 实现的subStr:String code = "Column column = record.getColumn(1);\n" +" String oriValue = column.asString();\n" +" String newValue = oriValue.substring(0, 3);\n" +" record.setColumn(1, new StringColumn(newValue));\n" +" return record;";dx_groovy(record);
DigestTransformer(摘要转换)
通常用于生成数据的摘要信息,如哈希值。这在数据安全、数据验证等场景中很有用。例如,为了验证数据在传输过程中是否被篡改,可以在传输前对数据生成哈希值,在接收后再次生成哈希值并进行比较。
DataX中使用DisgestTranformer 的标识符为 dx_digest
dx_digest(1,"md5","toUpperCase"), column 1的值为 xyzzzzz => 9CDFFC4FA4E45A99DB8BBCD762ACFFA2
DataX transformer 模块重构思路
当前Transformer 模块的缺点(个人观点)
目前我认为DataX Transformer 模块的最大缺点就是JSON配置化太过复杂,且集中于一个文件中,导致用户阅读和使用不便;
同时Transformer 模块目前不够灵活,只能支持简单的转换需求。我认为Transformer模块的设计可以支持SPI 机制,以保证用户可以自定义Transformer 来实现某些定制化的需求。
Transformer 重构架构
我的Transformer 架构采用的是文档即代码的方式,用户编辑的配置文件不仅仅是配置,而且是适用于用户阅读的文档。Transformer 配置以更加清晰的文档展现,这样后续的Transformer 规则配置的修改就不再需要开发人员进行,而是可以转移到运维人员或者是用户自己手动修改。
首先我们需要将Transformer 配置从job.json中独立出来,形成三个新的文档型配置文件:字段对应关系配置文件和值对应关系配置文件以及用户自定义Transform 配置文件。以下是Transformer 重构架构图。
字段对应关系配置文件负责将reader 和writer 的字段进行匹配,比如reader 的字段是name,对应writer 的字段的 old_name,通过配置可以轻松完成转换。同时字段对应关系配置可以设定字段的转换类型,比如reader 中 jobno的字段类型是string,而writer 中jobno 的字段类型需要是 Integer。
值对应关系配置文件负责处理抽取数据源和加载数据源的值转换工作,比如reader 中某个表包含字段 direction 方向,0 上行,1下行;而writer 中需要direction 方向的值为 1 上行,2下行;通过值对应关系配置文件,我们可以轻松实现这种值转换工作。同时值对应关系配置文件可以设定固定默认值,比如将checkFlag 字段设为固定默认值1。
自定义Transform 配置文件主要描述用户的自定义SPI 实现的详细内容,是介绍自定义SPI 实现的详细设计文档。
通过以上三种配置文件,我们将Transform 的配置从job.json 中分离出来,提高了配置的可读性和维护性。同时,新的配置方式可以更加良好的支持Transform 模块的功能,字段对应关系配置和值对应关系配置已经可以满足大部分的数据转换需求,而SPI 的扩展则可以轻松实现某些用户特定的需求。在这种文档即代码的架构方式中,配置文件的设计将是Transform 模块架构的重点,我将列取几个配置文件的模板以供感兴趣的小伙伴参考。
字段对应关系转换文件配置
{
"title":"字段对应关系转换文件配置",
"data":[
{
"old_column_name":"抽取表的字段名称",
"new_column_name":"加载表的字段名称",
"column_notes":"字段描述",
"column_explain":"字段含义解释"
},
{
"old_column_name":"reqResult",
"new_column_name":"bizReqResult",
"column_notes":"业务类型",
"column_explain":"0未处理 非零:处理完毕:1 同意 2拒绝"
},
{
"old_column_name":"initTime",
"new_column_name":"createTime",
"column_notes":"创建时间",
"column_explain":"创建时间",
"column_format":"yyyy-MM-dd"
}
]
}
值对应关系转换文件配置
{
"title":"值对应关系转换文件配置",
"data":[
{
"column_name":"字段列名",
"column_notes":"字段描述",
"column_old_explain":"抽取表的字段含义解释",
"column_new_explain":"加载表的字段含义解释",
"corresponding":"{"0":"2","1":"1","2":"0"}"
},
{
"column_name":"业务请求",
"column_notes":"bizReqResult",
"column_old_explain":"0未处理 非零:处理完毕:1 同意 2拒绝",
"column_new_explain":"0 不同意 1 同意 2 取消",
"corresponding":"{"0":"2","1":"1","2":"0"}"
},
{
"column_name":"字段列名",
"column_notes":"字段描述",
"column_explain":"字段含义",
"default":"默认值"
},
{
"column_name":"checkFlag",
"column_notes":"检查标识",
"column_explain":"默认不检查 0",
"default":"0"
}
]
}
固定位数的转换文件配置
{
"title":"固定位数的转换文件配置",
"data":[
{
"column_catLength":"字段裁剪位数",
"column_name":"转换的字段名称",
"column_index":"字段标记位数",
"column_notes":"字段描述",
"column_explain":"字段解释"
},
{
"column_catLength":10,
"column_name":"tradeType",
"column_index":0,
"column_notes":"交易性质",
"column_explain":"默认全0"
}
]
}
如果追求更好的阅读性和维护性,可以将字段对应关系配置和值对应关系配置再独立划分出两个新的配置文件,字段类型转换配置文件和字段默认值配置文件。所有的配置以filterChan责任链模式的方式,采用管道过滤器的架构每个filter 对数据进行并行处理,完成后形成最终的Transform过滤数据提供给writer 插件进行加载。
参考文献
DataX教程(10)- DataX插件热插拔原理_datax plugin-CSDN博客