DataX: Ⅱ

序言

这里使用的是master分支,因为官网上并没有release分支,所以先用master分支吧,可能会有问题cuiyaonan2000@163.com

参考资料:

  1. https://github.com/alibaba/DataX
  2. https://github.com/alibaba/DataX/blob/master/introduction.md    --插件说明文档
  3. https://github.com/alibaba/DataX/blob/master/dataxPluginDev.md

源码打包

  1. 首先下载 GitHub - alibaba/DataX: DataX是阿里云DataWorks数据集成的开源版本。代码
  2. 首先如果是JDK17则会报错,后来选择JDK1.8
  3. Datax的运行依赖于python所以需要安装python2或者python3,centos7自带的有python2.7.5
  4. 然后打包生成可执行的文件 mvn -U clean package assembly:assembly -Dmaven.test.skip=true
  5. 成功后在根目录下的target中有相关的打包结果,如果包含所有Reader和Writer则打包会慢一点,但是还是有必要的

执行命令

在datax的bin目录下 

  1. python datax.py -r {YOUR_READER} -w {YOUR_WRITER}   该命令是显示对应的json模板,也可以直接从source或者reader的文档中查看
  2. python datax.py json文件   该命令就是执行对应的json文件

用例:Stream To Stream 

{"job": {"content": [{"reader": {"name": "streamreader","parameter": {"sliceRecordCount": 10,"column": [{"type": "long","value": "10"},{"type": "string","value": "hello,你好,世界-DataX"}]}},"writer": {"name": "streamwriter","parameter": {"encoding": "UTF-8","print": true}}}],"setting": {"speed": {"channel": 5}}}
}

执行结果

MysqlReader To Stream 

通过命令python datax.py -r mysqlreader -w streamwriter 查看相关的模板为

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.Please refer to the mysqlreader document:https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md Please refer to the streamwriter document:https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md Please save the following configuration as a json file and  usepython {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json 
to run the job.{"job": {"content": [{"reader": {"name": "mysqlreader", "parameter": {"column": [], "connection": [{"jdbcUrl": [], "table": []}], "password": "", "username": "", "where": ""}}, "writer": {"name": "streamwriter", "parameter": {"encoding": "", "print": true}}}], "setting": {"speed": {"channel": ""}}}
}

然后编辑该json

{"job": {"content": [{"reader": {"name": "mysqlreader", "parameter": {"column": ["Name","GroupName"], "connection": [{"jdbcUrl": ["jdbc:mysql://192.168.137.2:3306/test"], "table": ["employee"]}], "password": "root", "username": "root"}}, "writer": {"name": "streamwriter", "parameter": {"encoding": "", "print": true}}}], "setting": {"speed": {"channel": "1"}}}
}

自定义插件

从设计之初,DataX就把异构数据源同步作为自身的使命,为了应对不同数据源的差异、同时提供一致的同步原语和扩展能力,DataX自然而然地采用了框架 + 插件 的模式:

  • 插件只需关心数据的读取或者写入本身。
  • 而同步的共性问题,比如:类型转换、性能、统计,则交由框架来处理。

作为插件开发人员,则需要关注两个问题----自定义插件要考虑的点感觉很简单啊cuiyaonan2000@163.com

  1. 数据源本身的读写数据正确性。
  2. 如何与框架沟通、合理正确地使用框架。

逻辑执行模型

插件开发者不用关心太多,基本只需要关注特定系统读和写,以及自己的代码在逻辑上是怎样被执行的,哪一个方法是在什么时候被调用的。在此之前,需要明确以下概念:

  • JobJob是DataX用以描述从一个源头到一个目的端的同步作业,是DataX数据同步的最小业务单元。比如:从一张mysql的表同步到odps的一个表的特定分区。
  • TaskTask是为最大化而把Job拆分得到的最小执行单元。比如:读一张有1024个分表的mysql分库分表的Job,拆分成1024个读Task,用若干个并发执行。
  • TaskGroup: 描述的是一组Task集合。在同一个TaskGroupContainer执行下的Task集合称之为TaskGroup
  • JobContainerJob执行器,负责Job全局拆分、调度、前置语句和后置语句等工作的工作单元。类似Yarn中的JobTracker
  • TaskGroupContainerTaskGroup执行器,负责执行一组Task的工作单元,类似Yarn中的TaskTracker。

简而言之, Job拆分成Task,在分别在框架提供的容器中执行,插件只需要实现JobTask两部分逻辑

物理执行模型

框架为插件提供物理上的执行能力(线程)。DataX框架有三种运行模式:

  • Standalone: 单进程运行,没有外部依赖。
  • Local: 单进程运行,统计信息、错误信息汇报到集中存储。
  • Distrubuted: 分布式多进程运行,依赖DataX Service服务。

当然,上述三种模式对插件的编写而言没有什么区别,你只需要避开一些小错误,插件就能够在单机/分布式之间无缝切换了。 JobContainerTaskGroupContainer运行在同一个进程内时,就是单机模式(StandaloneLocal);当它们分布在不同的进程中执行时,就是分布式(Distributed)模式。

编程接口

那么,JobTask的逻辑应是怎么对应到具体的代码中的?

首先,插件的入口类必须扩展ReaderWriter抽象类,并且实现分别实现JobTask两个内部抽象类,JobTask的实现必须是 内部类 的形式,原因见 加载原理 一节。

以Reader为例:

public class SomeReader extends Reader {public static class Job extends Reader.Job {@Overridepublic void init() {}@Overridepublic void prepare() {}@Overridepublic List<Configuration> split(int adviceNumber) {return null;}@Overridepublic void post() {}@Overridepublic void destroy() {}}public static class Task extends Reader.Task {@Overridepublic void init() {}@Overridepublic void prepare() {}@Overridepublic void startRead(RecordSender recordSender) {}@Overridepublic void post() {}@Overridepublic void destroy() {}}
}

Job接口功能如下:

  • init: Job对象初始化工作,此时可以通过super.getPluginJobConf()获取与本插件相关的配置。读插件获得配置中reader部分,写插件获得writer部分-----获取插件配置信息cuiyaonan2000@163.com
  • prepare: 全局准备工作,比如odpswriter清空目标表。
  • split: 拆分Task。参数adviceNumber框架建议的拆分数,一般是运行时所配置的并发度。值返回的是Task的配置列表。
  • post: 全局的后置工作,比如mysqlwriter同步完影子表后的rename操作。
  • destroy: Job对象自身的销毁工作。

Task接口功能如下:

  • init:Task对象的初始化。此时可以通过super.getPluginJobConf()获取与本Task相关的配置。这里的配置是Jobsplit方法返回的配置列表中的其中一个。
  • prepare:局部的准备工作。
  • startRead: 从数据源读数据,写入到RecordSender中。RecordSender会把数据写入连接Reader和Writer的缓存队列。
  • startWrite:从RecordReceiver中读取数据,写入目标数据源。RecordReceiver中的数据来自Reader和Writer之间的缓存队列。
  • post: 局部的后置工作。
  • destroy: Task象自身的销毁工作。

需要注意的是:

  • JobTask之间一定不能有共享变量,因为分布式运行时不能保证共享变量会被正确初始化。两者之间只能通过配置文件进行依赖。
  • preparepostJobTask中都存在,插件需要根据实际情况确定在什么地方执行操作。

插件定义

码写好了,有没有想过框架是怎么找到插件的入口类的?框架是如何加载插件的呢?

在每个插件的项目中,都有一个plugin.json文件,这个文件定义了插件的相关信息,包括入口类。例如:

{
    "name": "mysqlwriter",
    "class": "com.alibaba.datax.plugin.writer.mysqlwriter.MysqlWriter",
    "description": "Use Jdbc connect to database, execute insert sql.",
    "developer": "alibaba"
}

  • name: 插件名称,大小写敏感。框架根据用户在配置文件中指定的名称来搜寻插件。 十分重要 。
  • class: 入口类的全限定名称,框架通过反射插件入口类的实例。十分重要 。
  • description: 描述信息。
  • developer: 开发人员。

例如:

插件打包

打包后的位置跟目录其它插件一样,举例来说:

插件目录分为readerwriter子目录,读写插件分别存放。插件目录规范如下:

  • ${PLUGIN_HOME}/libs: 插件的依赖库。
  • ${PLUGIN_HOME}/plugin-name-version.jar: 插件本身的jar。
  • ${PLUGIN_HOME}/plugin.json: 插件描述文件。

尽管框架加载插件时,会把${PLUGIN_HOME}下所有的jar放到classpath,但还是推荐依赖库的jar和插件本身的jar分开存放

插件配置文件设计

DataX使用json作为配置文件的格式。一个典型的DataX任务配置如下:

{"job": {"content": [{"reader": {"name": "odpsreader","parameter": {"accessKey": "","accessId": "","column": [""],"isCompress": "","odpsServer": "","partition": [""],"project": "","table": "","tunnelServer": ""}},"writer": {"name": "oraclewriter","parameter": {"username": "","password": "","column": ["*"],"connection": [{"jdbcUrl": "","table": [""]}]}}}]}
}

DataX框架有core.json配置文件,指定了框架的默认行为。任务的配置里头可以指定框架中已经存在的配置项,而且具有更高的优先级,会覆盖core.json中的默认值。

配置中job.content.reader.parameter的value部分会传给Reader.Jobjob.content.writer.parameter的value部分会传给Writer.Job ,Reader.JobWriter.Job可以通过super.getPluginJobConf()来获取。----------就是我们自定义插件也要满足core.json的整体格式,在规定的key下编辑自己的插件属性cuiyaonan2000@163.com

DataX框架支持对特定的配置项进行RSA加密,例子中以*开头的项目便是加密后的值。 配置项加密解密过程对插件是透明,插件仍然以不带*的key来查询配置和操作配置项 。-------这个可定时有用的cuiyaonan2000@163.com

工具Configuration

为了简化对json的操作,DataX提供了简单的DSL配合Configuration类使用。---提供了一个解析Json的工具类

Configuration提供了常见的get带类型get带默认值getset等读写配置项的操作,以及clonetoJSON等方法。配置项读写操作都需要传入一个path做为参数,这个path就是DataX定义的DSL。语法有两条:

  1. 子map用.key表示,path的第一个点省略。
  2. 数组元素用[index]表示。

比如操作如下json:

{"a": {"b": {"c": 2},"f": [1,2,{"g": true,"h": false},4]},"x": 4
}

比如调用configuration.get(path)方法,当path为如下值的时候得到的结果为:

  • x4
  • a.b.c2
  • a.b.c.dnull
  • a.b.f[0]1
  • a.b.f[2].gtrue

注意,因为插件看到的配置只是整个配置的一部分。使用Configuration对象时,需要注意当前的根路径是什么。

更多Configuration的操作请参考ConfigurationTest.java

Channel

跟一般的生产者-消费者模式一样,Reader插件和Writer插件之间也是通过channel来实现数据的传输的。channel可以是内存的,也可能是持久化的,插件不必关心。插件通过RecordSenderchannel写入数据通过RecordReceiverchannel读取数据

channel中的一条数据为一个Record的对象,Record中可以放多个Column对象,这可以简单理解为数据库中的记录和列。

Record有如下方法(感觉最外层是个数组cuiyaonan2000@163.com):

public interface Record {// 加入一个列,放在最后的位置void addColumn(Column column);// 在指定下标处放置一个列void setColumn(int i, final Column column);// 获取一个列Column getColumn(int i);// 转换为json StringString toString();// 获取总列数int getColumnNumber();// 计算整条记录在内存中占用的字节数int getByteSize();
}
  1. 因为Record是一个接口,Reader插件首先调用RecordSender.createRecord()创建一个Record实例,然后把Column一个个添加到Record中。
  2. Writer插件调用RecordReceiver.getFromReader()方法获取Record,然后把Column遍历出来,写入目标存储中。当Reader尚未退出,传输还在进行时,如果暂时没有数据RecordReceiver.getFromReader()方法会阻塞直到有数据。如果传输已经结束,会返回nullWriter插件可以据此判断是否结束startWrite方法。-----------结束标致太够模糊了,没有固定的标识么cuiyaonan2000@163.com

Column的构造和操作,我们在《类型转换》一节介绍。

类型转换

为了规范源端和目的端类型转换操作,保证数据不失真,DataX支持六种内部数据类型:

  • Long:定点数(Int、Short、Long、BigInteger等)。
  • Double:浮点数(Float、Double、BigDecimal(无限精度)等)。
  • String:字符串类型,底层不限长,使用通用字符集(Unicode)。
  • Date:日期类型。
  • Bool:布尔值。
  • Bytes:二进制,可以存放诸如MP3等非结构化数据。

对应地,有DateColumnLongColumnDoubleColumnBytesColumnStringColumnBoolColumn六种Column的实现。

Column除了提供数据相关的方法外,还提供一系列以as开头的数据类型转换转换方法。

DataX的内部类型在实现上会选用不同的java类型:

内部类型实现类型备注
Datejava.util.Date
Longjava.math.BigInteger使用无限精度的大整数,保证不失真
Doublejava.lang.String用String表示,保证不失真
Bytesbyte[]
Stringjava.lang.String
Booljava.lang.Boolean

类型之间相互转换的关系如下:

from\toDateLongDoubleBytesStringBool
Date-使用毫秒时间戳不支持不支持使用系统配置的date/time/datetime格式转换不支持
Long作为毫秒时间戳构造Date-BigInteger转为BigDecimal,然后BigDecimal.doubleValue()不支持BigInteger.toString()0为false,否则true
Double不支持内部String构造BigDecimal,然后BigDecimal.longValue()-不支持直接返回内部String
Bytes不支持不支持不支持-按照common.column.encoding配置的编码转换为String,默认utf-8不支持
String按照配置的date/time/datetime/extra格式解析用String构造BigDecimal,然后取longValue()用String构造BigDecimal,然后取doubleValue(),会正确处理NaN/Infinity/-Infinity按照common.column.encoding配置的编码转换为byte[],默认utf-8-"true"为true, "false"为false,大小写不敏感。其他字符串不支持
Bool不支持true1L,否则0Ltrue1.0,否则0.0不支持-

脏数据处理

目前主要有三类脏数据:

  1. Reader读到不支持的类型、不合法的值。
  2. 不支持的类型转换,比如:Bytes转换为Date
  3. 写入目标端失败,比如:写mysql整型长度超长。

Reader.TaskWriter.Task中,通过AbstractTaskPlugin.getTaskPluginCollector()可以拿到一个TaskPluginCollector,它提供了一系列collectDirtyRecord的方法。当脏数据出现时,只需要调用合适的collectDirtyRecord方法,把被认为是脏数据的Record传入即可。

用户可以在任务的配置中指定脏数据限制条数或者百分比限制,当脏数据超出限制时,框架会结束同步任务,退出。插件需要保证脏数据都被收集到,其他工作交给框架就好

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/144687.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

5.wifi开发【智能家居:上】,开发准备:智能开关灯,智能采集温湿,智能调彩灯

一。wifi智能家居项目开发 【开发准备1】&#xff1a;继电器控制开发 1.智能开关 器件准备&#xff1a;wifi&#xff08;esp8266&#xff0c;使用CP2102&#xff09;继电器 结果&#xff1a; 2.继电器工作原理 &#xff08;1&#xff09;继电器是一种自动电气开关 &#xff…

代码随想录刷题笔记10——动态规划

动态规划理论基础 动态规划定义 动态规划&#xff0c;英文&#xff1a;Dynamic Programming&#xff0c;简称DP&#xff0c;如果某一问题有很多重叠子问题&#xff0c;使用动态规划是最有效的。 所以动态规划中每一个状态一定是由上一个状态推导出来的&#xff0c;这一点就区…

DHCP(自动分配ip地址实验案例)

目录 实验原理 案例 实验原理 DHCP 使用客户服务器方式&#xff0c;采用请求/应答方式工作。DHCP 基于 UDP 工作&#xff0c;DHCP服务器运行在67号端口&#xff0c;DHCP客户运行在68号端口。 DHCP的工作过程分为以下步骤&#xff1a; &#xff08;1&#xff09;DHCP服务器被…

【2023保研】双非上岸东南网安

个人情况 学校&#xff1a;henu 专业&#xff1a;信息安全 排名&#xff1a;1/66 英语&#xff1a;六级500 竞赛&#xff1a;蓝桥杯PB国一&#xff0c;ISCC国一&#xff0c;密码数学挑战赛国三&#xff0c;还有其他一些省级水奖 论文&#xff1a;一篇EI在投&#xff08;三作通…

python二维码识别tesseract

window安装tesseract 下载路径&#xff1a; https://digi.bib.uni-mannheim.de/tesseract/ 选择 双击安装在D:\sore\teeseract-OCR后&#xff1a; 配置环境变量 配置环境变量Path&#xff1a;D:\sore\teeseract-OCR 配置语言包的环境变量TESSDATA_PREFIX&#xff1a; D:\s…

ElementUI基本介绍及登录注册案例演示

目录 前言 一.简介 二.优缺点 三.Element完成登录注册 1. 环境配置及前端演示 1.1 安装Element-UI模块 1.2 安装axios和qs(发送get请求和post请求) 1.3 导入依赖 2 页面布局 2.1组件与界面 3.方法实现功能数据交互 3.1 通过方法进行页面跳转 3.2 axios发送get请求 …

chrome extensions mv3通过content scripts注入/获取原网站的window数据

开发插件的都知道插件的content scripts和top window只共享Dom不共享window和其他数据&#xff0c;如果想拿挂载在window的数据还有点难度&#xff0c;下面会通过事件的方式传递cs和top window之间的数据写一个例子 代码 manifest.json 这里只搞了2个js&#xff0c;content.…

什么是Promise链(Promise chaining)?它在异步编程中的作用是什么?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 什么是 Promise 链&#xff1f;⭐ 异步编程中的作用⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 欢迎来到前端入门之旅&#xff01;感兴趣的可以订阅本专栏哦&#xff01;这个专栏是为那些对Web开发感兴趣、…

动态内存操作(2)

接上一篇文章http://t.csdn.cn/1ONDq&#xff0c;这次我们继续讲解关于动态内存的相关知识。 一、常见的动态内存错误 1.对NULL指针进行解引用操作 #include<stdio.h> #include<stdlib.h> #include<limits.h> int main() {int* p (int*)malloc(INT_MAX/4);…

map和set的具体用法 【C++】

文章目录 关联式容器键值对setset的定义方式set的使用 multisetmapmap的定义方式insertfinderase[]运算符重载map的迭代器遍历 multimap 关联式容器 关联式容器里面存储的是<key, value>结构的键值对&#xff0c;在数据检索时比序列式容器效率更高。比如&#xff1a;set…

什么是数学建模(mooc笔记)

什么是数学建模 前提&#xff1a;我们数学建模国赛计划选择C题&#xff0c;故希望老师的教学中侧重与C题相关性大的模型及其思想进行培训。之后的学习内容中希望涉及以下知识点&#xff1a; logistic回归相关知识点。如&#xff1a;用法、适用、限制范围等。精学数学建模中常…

【Vue2.0源码学习】生命周期篇-挂载阶段(mount)

文章目录 1. 前言2. 挂载阶段分析3. 总结 1. 前言 模板编译阶段完成之后&#xff0c;接下来就进入了挂载阶段&#xff0c;从官方文档给出的生命周期流程图中可以看到&#xff0c;挂载阶段所做的主要工作是创建Vue实例并用其替换el选项对应的DOM元素&#xff0c;同时还要开启对…

高德地图根据两点的经纬度计算两点之间的距离(修正版)

SQL语句可以用来计算两个经纬度之间的距离。下面是一个示例的SQL语句&#xff1a; SELECT id, ( 6371 * ACOS( COS( RADIANS( lat1 ) ) * COS( RADIANS( lat2 ) ) * COS( RADIANS( lng2 ) - RADIANS( lng1 ) ) SIN( RADIANS( lat1 ) ) * SIN( RADIANS( lat2 ) ) ) ) AS dista…

阿里巴巴K8S集成seata

正文 在K8S集成seata&#xff0c;官方配置 代码 apiVersion: v1 kind: Service metadata:name: seata-servernamespace: wmz-devlabels:k8s-app: seata-server spec:type: NodePortports:- port: 8091nodePort: 30091protocol: TCPname: httpselector:k8s-app: seata-server-…

实例讲解Spring boot动态切换数据源

前言 在公司的系统里&#xff0c;由于数据量较大&#xff0c;所以配置了多个数据源&#xff0c;它会根据用户所在的地区去查询那一个数据库&#xff0c;这样就产生了动态切换数据源的场景。 今天&#xff0c;就模拟一下在主库查询订单信息查询不到的时候&#xff0c;切换数据…

elementui 菜单选中优化

/** 父级菜单悬浮样式**/ .el-submenu__title:hover {color:#1890ff!important; } /** 父级菜单箭头悬浮样式**/ .el-submenu__title:hover>.el-submenu__icon-arrow{font-size: 13px!important;} /** 子菜单悬浮样式**/ .el-menu-item:hover{color:#1890ff!important; } /*…

什么是Jmeter ?Jmeter使用的原理步骤是什么?

1.1 什么是 JMeter Apache JMeter 是 Apache 组织开发的基于 Java 的压力测试工具。用于对软件做压力测试&#xff0c;它最初被设计用于 Web 应用测试&#xff0c;但后来扩展到其他测试领域。 它可以用于测试静态和动态资源&#xff0c;例如静态文件、Java 小服务程序、CGI 脚…

青藏高原1-km分辨率生态环境质量变化数据集(2000-2020)

青藏高原平均海拔4000米以上&#xff0c;人口1300万&#xff0c;是亚洲九大河流的源头&#xff0c;为超过15亿人口提供淡水、食物和其他生态系统服务&#xff0c;被誉为地球第三极和亚洲水塔。然而&#xff0c;在该地区的人与自然的关系的研究是有限的&#xff0c;尤其是在精细…

PgSQL-内核特性-TupleTableSlotOps

PgSQL-内核特性-TupleTableSlotOps 执行器中表达式结果、函数结果、投影结果等&#xff0c;各种结果都需要以元组的形式返回&#xff0c;所以PgSQL引入了一种通用格式保存数据&#xff1a;TupleTableSlot。PgSQL执行器将记录存储到“元组表”中在各个算子之间进行传递&#xff…

【神经网络可视化】 梯度上升,可视化工具,风格转移

可视化可以帮助我们更好的理解卷积网络每一层学到了什么&#xff0c;或者说每一个卷积核究竟学到了什么&#xff0c;他是怎么理解图像的 这种的话当我们神经网络结果不太好时&#xff0c;我们可以分析不好的原因 图片来源于李飞飞老师的内容 梯度上升方法做可视化 文章目录 …