DataX详解和架构介绍

系列文章目录

一、DataX详解和架构介绍
二、DataX源码分析 JobContainer
三、DataX源码分析 TaskGroupContainer
四、DataX源码分析 TaskExecutor
五、DataX源码分析 reader
六、DataX源码分析 writer
七、DataX源码分析 Channel


文章目录

  • 系列文章目录
  • DataX是什么?
  • DataX支持的数据源
  • DataX的框架设计
  • DataX核心架构
      • 核心模块介绍:
      • DataX调度流程:
  • DataX部署和配置


DataX是什么?

DataX是阿里开源的异构数据源离线同步工具。它致力于实现包括关系型数据库(如MySQL、Oracle等)、HDFS、Hive、MaxCompute(原ODPS)、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。
在这里插入图片描述

DataX的设计理念是将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源时,只需要将此数据源对接到DataX,便能与已有的数据源实现无缝数据同步。

DataX的架构主要基于Framework + Plugin的设计模式。它将数据读取和写入抽象成为Reader和Writer插件,这些插件可以接入不同的数据源,实现数据的读取和写入操作。同时,DataX提供了丰富的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入。

DataX的核心优势包括稳定性、高效性、易用性和扩展性。它经过长时间大规模生产环境的验证,能够保证数据同步的稳定性和可靠性;通过多线程、多进程、流式处理等技术手段,实现高效的数据同步;提供简单易用的配置方式,用户可以通过配置文件来定义数据源、目标端、同步策略等;支持丰富的插件体系,可以方便地扩展新的数据源和目标端。

此外,DataX还提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制作业速度,让作业在库可以承受的范围内达到最佳的同步速度。同时,它还具有强劲的同步性能、健壮的容错机制以及极简的使用体验等特点。

总之,DataX是一个强大而灵活的数据同步工具,能够有效地解决异构数据源之间的数据同步问题。通过合理的配置和优化,它可以帮助用户实现高效、稳定、可靠的数据同步操作。


DataX支持的数据源

DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入 。

类型数据源Reader(读)Writer(写)文档
RDBMS 关系型数据库MySQL读 、写
Oracle读 、写
OceanBase读 、写
SQLServer读 、写
PostgreSQL读 、写
DRDS读 、写
Kingbase读 、写
通用RDBMS(支持所有关系型数据库)读 、写
阿里云数仓数据存储ODPS读 、写
ADB
ADS
OSS读 、写
OCS
Hologres
AnalyticDB For PostgreSQL
阿里云中间件datahub读 、写
SLS读 、写
图数据库阿里云 GDB读 、写
Neo4j
NoSQL数据存储OTS读 、写
Hbase0.94读 、写
Hbase1.1读 、写
Phoenix4.x读 、写
Phoenix5.x读 、写
MongoDB读 、写
Cassandra读 、写
数仓数据存储StarRocks读 、写
ApacheDoris
ClickHouse读 、写
Databend
Hive读 、写
kudu
selectdb
无结构化数据存储TxtFile读 、写
FTP读 、写
HDFS读 、写
Elasticsearch
时间序列数据库OpenTSDB
TSDB读 、写
TDengine读 、写

DataX Framework提供了简单的接口与插件交互,提供简单的插件接入机制,只需要任意加上一种插件,就能无缝对接其他数据源。

DataX的框架设计

datax_framework_new
DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

  • Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
  • Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
  • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

DataX核心架构

DataX 3.0采用微内核架构模式, 开源版本支持单机多线程模式完成同步作业运行,本小节按一个DataX作业生命周期的时序图,从整体架构设计非常简要说明DataX各个模块相互关系。
datax_arch

核心模块介绍:

  1. DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
  2. DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
  3. 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
  4. 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
  5. DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0。

DataX调度流程:

DataX的调度流程可以分为以下几个步骤:

  • Job切分:首先,DataX的Job模块会根据分库分表策略将Job切分成若干个小的Task。这是为了确保每个Task可以独立执行,并且可以并发执行以提高效率。
  • 并发数与TaskGroup计算:然后,根据用户配置的并发数,DataX会计算需要分配多少个TaskGroup。计算的方式是将总的Task数量除以每个TaskGroup中的Task数量(通常为5),从而得到TaskGroup的数量。
  • TaskGroup分配与启动:接下来,DataX会根据计算出的TaskGroup数量,将Task分配到各个TaskGroup中。每个TaskGroup会启动多个TaskExecutor来执行具体的Task。
  • TaskExecutor启动:当TaskGroup启动后,其中的TaskExecutor会启动ReaderThread和WriterThread。ReaderThread负责从数据源读取数据,WriterThread负责将数据写入目标端。这两个线程协同工作,实现了数据的读取、转换和写入过程。
  • 数据同步:在每个TaskExecutor中,ReaderThread和WriterThread会不断地从数据源读取数据,并将数据写入目标端,直到所有的数据都同步完成。
    整个调度流程依赖于Java底层线程池进行并发控制,DataX通过合理的调度策略和线程管理机制,实现了高效、稳定、可靠的数据同步。

举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。 DataX的调度决策思路是:

  1. DataXJob根据分库分表切分成了100个Task。
  2. 根据20个并发,DataX计算共需要分配4个TaskGroup。
  3. 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。

DataX部署和配置

  • 工具部署

    • 方法一、直接下载DataX工具包:DataX下载地址

      下载后解压至本地某个目录,进入bin目录,即可运行同步作业:

      $ cd  {YOUR_DATAX_HOME}/bin
      $ python datax.py {YOUR_JOB.json}
      

      自检脚本:
      python {YOUR_DATAX_HOME}/bin/datax.py {YOUR_DATAX_HOME}/job/job.json

    • 方法二、下载DataX源码,自己编译:DataX源码

      (1)、下载DataX源码:

      $ git clone git@github.com:alibaba/DataX.git
      

      (2)、通过maven打包:

      $ cd  {DataX_source_code_home}
      $ mvn -U clean package assembly:assembly -Dmaven.test.skip=true
      

      打包成功,日志显示如下:

      [INFO] BUILD SUCCESS
      [INFO] -----------------------------------------------------------------
      [INFO] Total time: 08:12 min
      [INFO] Finished at: 2015-12-13T16:26:48+08:00
      [INFO] Final Memory: 133M/960M
      [INFO] -----------------------------------------------------------------
      

      打包成功后的DataX包位于 {DataX_source_code_home}/target/datax/datax/ ,结构如下:

      $ cd  {DataX_source_code_home}
      $ ls ./target/datax/datax/
      bin		conf		job		lib		log		log_perf	plugin		script		tmp
      
  • 配置示例:从stream读取数据并打印到控制台

    • 第一步、创建作业的配置文件(json格式)

      可以通过命令查看配置模板: python datax.py -r {YOUR_READER} -w {YOUR_WRITER}

      $ cd  {YOUR_DATAX_HOME}/bin
      $  python datax.py -r streamreader -w streamwriter
      DataX (UNKNOWN_DATAX_VERSION), From Alibaba !
      Copyright (C) 2010-2015, Alibaba Group. All Rights Reserved.
      Please refer to the streamreader document:https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md Please refer to the streamwriter document:https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md Please save the following configuration as a json file and  usepython {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json 
      to run the job.{"job": {"content": [{"reader": {"name": "streamreader", "parameter": {"column": [], "sliceRecordCount": ""}}, "writer": {"name": "streamwriter", "parameter": {"encoding": "", "print": true}}}], "setting": {"speed": {"channel": ""}}}
      }
      

      根据模板配置json如下:

      #stream2stream.json
      {"job": {"content": [{"reader": {"name": "streamreader","parameter": {"sliceRecordCount": 10,"column": [{"type": "long","value": "10"},{"type": "string","value": "hello,你好,世界-DataX"}]}},"writer": {"name": "streamwriter","parameter": {"encoding": "UTF-8","print": true}}}],"setting": {"speed": {"channel": 5}}}
      }
      
    • 第二步:启动DataX

      $ cd {YOUR_DATAX_DIR_BIN}
      $ python datax.py ./stream2stream.json 
      

      同步结束,显示日志如下:

      ...
      2023-12-17 11:20:25.263 [job-0] INFO  JobContainer - 
      任务启动时刻                    : 2023-12-17 11:20:15
      任务结束时刻                    : 2023-12-17 11:20:25
      任务总计耗时                    :                 10s
      任务平均流量                    :              205B/s
      记录写入速度                    :              5rec/s
      读出记录总数                    :                  50
      读写失败总数                    :                   0
      

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/253095.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

外包干了10个月,技术退步明显.......

先说一下自己的情况,大专生,18年通过校招进入武汉某软件公司,干了接近4年的功能测试,今年年初,感觉自己不能够在这样下去了,长时间呆在一个舒适的环境会让一个人堕落! 而我已经在一个企业干了四年的功能测…

苹果macbook电脑删除数据恢复该怎么做?Mac电脑误删文件的恢复方法

苹果电脑删除数据恢复该怎么做?Mac电脑误删文件的恢复方法 如何在Mac上恢复误删除的文件?在日常使用Mac电脑时,无论是工作还是娱乐,我们都会创建和处理大量的文件。然而,有时候可能会不小心删除一些重要的文件&#x…

电缆线的阻抗50Ω,真正含义是什么?

当我们提到电缆线的阻抗时,它到底是什么意思?RG58电缆通常指的是50Ω的电缆线。它的真正含义是什么?假如取一段3英尺(0.9144米)长的RG58电缆线,并且在前端测量信号路径与返回路径之间的阻抗。那么测得的阻抗是多少?当然…

unity实现第一人称和第三人称

在角色设置两个挂载点,第一人称时,相机放在eys上面,切换第三人称时,放置到3rd节点上面,调整节点位置,达到期望效果 代码 void ThirdView(){Debug.Log("切换到第三人称");camera.SetParent(third…

财务数据处理问题及解决方案分享

一、平台介绍 财务自营计费主要承接京东自营数据在整个供应链中由C端转B端的功能实现,在整个供应链中属于靠后的阶段了,系统主要功能是计费和向B端的汇总。 二、问题描述 近年来自营计费数据量大增,有百亿的数据量,一天中汇总占…

ChatGPT Plus如何升级?信用卡付款失败怎么办?如何使用信用卡升级 ChatGPT Plus?

ChatGPT Plus是OpenAI提供的一种高级服务,它相较于标准版本,提供了更快的响应速度、更强大的功能,并且用户可以优先体验到新推出的功能。 尽管许多用户愿意支付 20 美元的月费来订阅 GPT-4,但在实际支付过程中,特别是…

【ES数据可视化】kibana实现数据大屏

目录 1.概述 2.绘制数据大屏 2.1.准备数据 2.2.绘制大屏 3.嵌入项目中 1.概述 再来重新认识一下kibana: Kibana 是一个用于数据可视化和分析的开源工具,是 Elastic Stack(以前称为 ELK Stack)中的一部分,由 Ela…

机器学习 | 一文看懂SVM算法从原理到实现全解析

目录 初识SVM算法 SVM算法原理 SVM损失函数 SVM的核方法 数字识别器(实操) 初识SVM算法 支持向量机(Support Vector Machine,SVM)是一种经典的监督学习算法,用于解决二分类和多分类问题。其核心思想是通过在特征空间中找到一…

【Linux网络编程三】Udp套接字编程(简易版服务器)

【Linux网络编程三】Udp套接字编程(简易版服务器) 一.创建套接字二.绑定网络信息1.构建通信类型2.填充网络信息①网络字节序的port②string类型的ip地址 3.最终绑定 三.读收消息1.服务器端接收消息recvfrom2.服务器端发送消息sendto3.客户端端发送消息sendto4.客户端…

海康威视球机摄像头运动目标检测、跟踪与轨迹预测

一、总体方案设计 运动目标检测与跟踪方案设计涉及视频流的实时拍摄、目标检测、轨迹预测以及云台控制。以下是四个步骤的详细设计: 1.室内场景视频流拍摄 使用海康威视球机摄像头进行室内视频流的实时拍摄。确保摄像头能覆盖整个室内空间,以便捕捉所…

如何修改远程端服务器密钥

前言 一段时间没改密码后,远程就会自动提示CtrlAltEnd键修改密码。但我电脑是笔记本,没有end键。打开屏幕键盘按这三个键也没用。 解决方法 打开远程 1、远程端WINC 输入osk 可以发现打开了屏幕键盘 2、电脑键盘同时按住CtrlAlt(若自身电…

【iOS ARKit】人形提取

为解决人形分离和深度估计问题,ARKit 新增加了 Segmentation Buffer(人体分隔缓冲区)和Estimated Depth Data Buffer(深度估计缓冲区)两个缓冲区。人体分隔缓冲区作用类似于图形渲染管线中的 Stencil Buffer&#xff0…

机器学习--K近邻算法,以及python中通过Scikit-learn库实现K近邻算法API使用技巧

文章目录 1.K-近邻算法思想2.K-近邻算法(KNN)概念3.电影类型分析4.KNN算法流程总结5.k近邻算法api初步使用机器学习库scikit-learn1 Scikit-learn工具介绍2.安装3.Scikit-learn包含的内容4.K-近邻算法API5.案例5.1 步骤分析5.2 代码过程 1.K-近邻算法思想 假如你有一天来到北京…

2月6日作业

1.现有无序序列数组为23,24,12,5,33,5347&#xff0c;请使用以下排序实现编程 函数1:请使用冒泡排序实现升序排序 函数2:请使用简单选择排序实现升序排序 函数3:请使用快速排序实现升序排序 函数4:请使用插入排序实现升序排序 #include<stdio.h> #include<string.h&…

嵌入式软件bug分析基本要求

摘要&#xff1a;软件从来不是一次就能完美的&#xff0c;需要以包容的眼光看待它的残缺。那问题究竟为何产生&#xff0c;如何去除呢&#xff1f; 1、软件问题从哪来 软件缺陷问题千千万万&#xff0c;主要是需求、实现、和运行环境三方面。 1.1 需求描述偏差 客户角度的描…

十分钟GIS——geoserver+postgis+udig从零开始发布地图服务

1数据库部署 1.1PostgreSql安装 下载到安装文件后&#xff08;postgresql-9.2.19-1-windows-x64.exe&#xff09;&#xff0c;双击安装。 指定安装目录&#xff0c;如下图所示 指定数据库文件存放目录位置&#xff0c;如下图所示 指定数据库访问管理员密码&#xff0c;如下图所…

正点原子--STM32通用定时器学习笔记(2)

1. 通用定时器输入捕获部分框图介绍 捕获/比较通道的输入部分&#xff08;通道1&#xff09; 输入通道映射CC1S[1:0]→采样频率CKD[1:0]→滤波方式IC1F[3:0]→边沿检测方式CC1P→捕获分频ICPS[1:0]→使能捕获CC1E 输入部分对相应的TIx输入信号采样&#xff0c;并产生一个滤波后…

【Linux取经路】探寻shell的实现原理

文章目录 一、打印命令行提示符二、读取键盘输入的指令三、指令切割四、普通命令的执行五、内建指令执行5.1 cd指令5.2 export指令5.3 echo指令 六、结语 一、打印命令行提示符 const char* getusername() // 获取用户名 {return getenv("USER"); }const char* geth…

【教程】Linux使用git自动备份和使用支持文件恢复的rm命令

转载请注明出处&#xff1a;小锋学长生活大爆炸[xfxuezhang.cn] 背景介绍 首先非常不幸地告诉你&#xff1a;Linux 系统的标准 rm 命令不支持文件恢复功能。一旦使用 rm 删除了文件或目录&#xff0c;它们就会从文件系统中永久删除&#xff0c;除非你使用专门的文件恢复工具尝试…

【Spring基础】从0开始学习Spring(2)

前言 在上篇文章&#xff0c;我已经讲了Spring中最核心的知识点&#xff1a;IoC&#xff08;控制反转&#xff09;以及DI&#xff08;依赖注入&#xff09;。这篇文章&#xff0c;我将讲一下关于Spring框架中的其它比较琐碎但是又还是挺重要的知识点&#xff0c;因此&#xff…