数据集成工具 ---- datax 3.0

1、datax:

        是一个异构数据源离线同步工具,致力于实现关系型数据库(mysql、oracle等)hdfs、hive、hbase等各种异构数据源之间的数据同步

2、参考网址文献:

https://github.com/alibaba/DataX/blob/master/introduction.mdhttps://github.com/alibaba/DataX/blob/master/userGuid.mdhttps://github.com/alibaba/DataX/blob/master/introduction.md

3、Datax的框架设计:

Datax作为离线数据同步工具,主要的是采用了Framework+plugin架构构成,将数据源的读数据和写数据封装成对应的Reader和Writer插件,纳入到整体的同步框架中。

        1、Reader:作为数据的采集模块,负责采集数据源的数据,将数据发送给Framework

        2、Writer:作为数据写入模块,负责不断的向Framework取出数据,将数据写入到对应的目的端

        3、Framework:用于连接Reader和Writer,作为两者的数据传输通道,并处理缓冲、数据转换等核心技术问题。

4、Datax的核心架构:

Datax3.0 开源版本是支持单机多线程来完成同步作业运行,因为底层是使用java做开发。整体的架构:

模块的核心介绍:

        1、Datax完成单个数据同步做作业,被称之为job,Datax接收到一个job时就会启动一个进程来完成数据同步工作,所以Datax job 模块是单个作业的中枢管理中心,主要是承担数据清理,子任务切分、TaskGroup 管理。

        2、当Datax启动后,Datax job会根据不同的源数据将job切分成不同的Task,所以Task是Datax的最小作业单位,每一个Task都会负责一部分的数据同步。

        3、切分成多个Task后,Datax job 就会调用scheduler模块,根据配置的并发数量,将拆分的Task重新组合,组装成TaskGroup,每一个TaskGroup都负责一定的Task任务的执行,默认TaskGroup并发数量数5个。

        4、每一个Task都是由TaskGroup所监控执行启动,每一个Task启动后都会按照Reader---Channel---Writer的执行顺序执行。

        5、当任务启动后,Datax job就会监控所有的TaskGroup的执行情况,当所有的TaskGroup任务完成后,job就会退出,当出现异常,就会异常退出并且进程退出值非0.

5、Datax的核心优势:

        1、可靠的数据质量监控

        2、丰富的数据转换功能

        3、精准的控制速度

        4、容错机制:

                1、线程内部重试

                        DataX的核心插件都经过团队的全盘review,不同的网络交互方式都有不同的重试策略。

                2、线程级别重试

                        目前DataX已经可以实现TaskFailover,针对于中间失败的Task,DataX框架可以做到整个Task级别的重新调度。

        5、极简的体验

6、Datax与Sqoop的区别:
功能Dataxsqoop
运行模式单进程多线程MR
分布式是不支持分布式支持
流控需要定制
统计信息支持不支持,分布式的数据收集不方便
数据校验只有core部分有校验功能不支持,分布式的数据收集不方便
监控需要定制需要定制
7、Datax部署:

1、下载jar包:

        下载路径:https://github.com/alibaba/DataX

2、解压文件,配置环境变量:  

#解压jar包
tar -zxvf datax.tar.gz#配置环境变量:
vim /etc/profileexport   DATAX_HOME=/user/loacl/soft/datax
export   PATH=.:$PATH:$DATAX_HOME/bin#配置好环境变量,让配置文件生效
source /etc/profile

3、使执行文件拥有执行权:

添加执行权:chmod +x  data.py
8、Datax的使用:
在datax中会自动的生成模板的命令:datax.py -r streamreader -w streamwriter
        1、streamreader  to  streamwriter,数据打印在控制台上面

参数说明:

"sliceRecordCount": 100  #指定打印的个数"channel": 1  #指定并发度
#创建json文件:vim streamreadertostreamwriter.json{"job": {"content": [{"reader": {"name": "streamreader", "parameter": {"column": [{"type":"string","value":"wyz"},{"type":"int","value":"18"}], "sliceRecordCount": 100  #指定打印的个数}}, "writer": {"name": "streamwriter", "parameter": {"encoding": "", "print": true}}}], "setting": {"speed": {"channel": 1  #指定并发度}}}
}#脚本执行命令:
datax.py streamreadertostreamwriter.json
        2、mysql to mysql
1、可以通过命令获取模板:
datax.py  -r mysqlreader -w mysqlwriter 2、可以通过github上的模板进行编写:分别是mysqlreader和mysqlwriter,参数会比较详细3、在插入数据的需要注意是在将数据写入的时候如果出现在数据,那么此时可能是创建的表出了问题例如:表中的某个字段是主键,主键唯一
vim mysqlreaderTomysqlwriter.json{"job": {"content": [{"reader": {"name": "mysqlreader", "parameter": {"column": ["id","name","age","clazz","gender"], "connection": [{"jdbcUrl": ["jdbc:mysql://192.168.226.1:3306/bigdata25"], "table": ["stu"]}], "password": "123456", "username": "root", "where": ""      #不是必须要写的,作用是可以在读数据时进行一次过滤}}, "writer": {"name": "mysqlwriter", "parameter": {"column": ["id","name","age","clazz","gender"], "connection": [{"jdbcUrl": "jdbc:mysql://192.168.226.1:3306/bigdata25", "table": ["data_test"]}], "password": "123456", "preSql": [],   #不是必须写的,作用是再写入数据前可以执行该sql"session": [], "username": "root", "writeMode": "insert"   #必选,指定数据写入的模式,分成三种:insert(一般默认)、replace、update}}}], "setting": {"speed": {"channel": 1}}}
}#执行脚本:
datax.py   mysqlreaderTomysqlwriter.json

 将数据写入到mysql时,写入的表是需要提前创建的。

        3、mysql to hdfs

参数解释:

使用datax的时候hdfswriter只支持两种文件形式,分别是text和orc"fileType": "text",  #支持两种方式:text和orc,text表示的是textfile,orc表示的orcfile"compress": "", #指定文件的压缩形式,不指定代表不用压缩,text支持的压缩方式:gzip,bzip2,orc支持的压缩方式有NONE和SNAPPY"writeMode": "append"   #表示的是数据在写入的操作,分成三种:append,写入前不做任何处理  nonconflit 如果文件存在,直接报错  truncate:如果文件存在,那就先删除在写入"path": "/bigdata25/datax/datax_mysqltohdfs/"  #文件不存在是需要提前创建的
vim mysqlTohdfs.json{"job": {"content": [{"reader": {"name": "mysqlreader", "parameter": {"column": ["id","name","age","clazz","gender"], "connection": [{"jdbcUrl": ["jdbc:mysql://master:3306/bigdata25"], "table": ["stu"]}], "password": "123456", "username": "root", "where": ""}}, "writer": {"name": "hdfswriter", "parameter": {"column": [{"name":"id","type":"string"},{"name":"stu_name","type":"string" },{"name":"age","type":"string"},{"name":"clazz","type":"string"},{"name":"gender","type":"string" }], "compress": "", "defaultFS": "hdfs://master:9000", "fieldDelimiter": ",", "fileName": "stu_mysqltohdfs", "fileType": "text",  "path": "/bigdata25/datax/datax_mysqltohdfs/", "writeMode": "append"                         												}}}], "setting": {"speed": {"channel": 1}}}
}#执行脚本:
datax.py mysqlTohdfs.json
        4、mysql to hive

 原理思想:

实际上还是将数据存入到hdfs上面,hive通过记录元数据信息来获取数据
原理:创建好hive表在保存在hdfs上,是有文件路径,然后通过写入到指定的hdfs文件路径就能将数据写入到hive表中
当开启hive的时候,在hive中创建的表会默认的存储hdfs的/user/hive/warehouse/目录下

前期准备:

前期准备:
启动hive(后台启动):nohup hive --service metastore &
连接hive:hive创建hive表(在没有说明的情况下一般在都是创建一个外部表)
创建一个datax数据库:create database datax;
切换数据库:use datax
创建外部表:create external table if not exists datax_mysqltohive(id string,name string,age int,clazz string,gender string)
row format delimited  fields terminated  by ',' stored as textfile

编写脚本:

vim mysqlTohive.json{"job": {"content": [{"reader": {"name": "mysqlreader", "parameter": {"column": ["id","name","age","clazz","gender"], "connection": [{"jdbcUrl": ["jdbc:mysql://master:3306/bigdata25"], "table": ["stu"]}], "password": "123456", "username": "root", "where": ""}}, "writer": {"name": "hdfswriter", "parameter": {"column": [{"name":"id","type":"string"},{"name":"stu_name","type":"string" },{"name":"age","type":"string"},{"name":"clazz","type":"string"},{"name":"gender","type":"string" }], "compress": "", "defaultFS": "hdfs://master:9000", "fieldDelimiter": ",", "fileName": "datax_mysqltohive1", "fileType": "text", "path": "/user/hive/warehouse/datax.db/datax_mysqltohive", "writeMode": "append"}}}], "setting": {"speed": {"channel": 1}}}
}#执行脚本命令:
datax.py  mysqlTohive.json
        5、mysql to hbase(Hbase11XWriter)

参数解释:

"mode": "normal" 写hbase的模式,目前只支持normal模式"hbaseConfig"  {"hbase.zookeeper.quorum": "***"} 描述:连接HBase集群需要的配置信息,JSON格式"table": "writer" 表的名称,大小写比较敏感"encoding" 编码方式"rowkeyColumn" 描述:要写入的hbase的rowkey列。index:指定该列对应reader端column的索引,从0开始,若为常量index为-1;type:指定写入数据类型,用于转换HBase byte[];value:配置常量,常作为多个字段的拼接符。hbasewriter会将rowkeyColumn中所有列按照配置顺序进行拼接作为写入hbase的rowkey,不能全为常量"versionColumn" 表示指定写入hbase的时间戳,支持当前时间、指定时间列、指定时间

前期准备:

前期准备:启动zookeeper:zkServer.sh start(每一个节点上都是需要启动的)
查看zk的状态:zkServer.sh status 启动hbase:start-hbase.sh连接hbase:sqlline.py master,node1,node2进入hbase的客户端:hbase shell hbase中查看表的命令:!table
退出命令 !quit
查看表:list
在hbase中创建创建表,指定表名和列簇:create 'student','cf1'

编写脚本:

vim mysqlTohbase.json{"job": {"content": [{"reader": {"name": "mysqlreader", "parameter": {"column": ["id","name","age","clazz","gender"], "connection": [{"jdbcUrl": ["jdbc:mysql://master:3306/bigdata25"], "table": ["stu"]}], "password": "123456", "username": "root", "where": ""}}, "writer": {"name": "hbase11xwriter","parameter": {"hbaseConfig": {"hbase.zookeeper.quorum": "master:2181,node1:2181,node2:2181"},"table": "NEW_STU","mode": "normal","rowkeyColumn": [{"index":0,"type":"string"},{"index":-1,"type":"string","value":"_"}],"column": [{"index":1,"name": "cf1:name","type": "string"},{"index":2,"name": "cf1:age","type": "int"},{"index":3,"name": "cf1:clazz","type": "string"},{"index":4,"name": "cf1:gender","type": "string"}],"versionColumn":{"index": -1,"value":"123456789"},"encoding": "utf-8"}}}],"setting": {"speed": {"channel": 1}}  }
}#执行脚本
datax.py mysqlTohbase.json
        6、mysql增量同步数据到hive中。

最主要的工作就是在原先的mysql数据导入的hive中的基础上进行where过滤

vim mysqlTohive.json{"job": {"content": [{"reader": {"name": "mysqlreader", "parameter": {"column": ["id","name","age","clazz","gender"], "connection": [{"jdbcUrl": ["jdbc:mysql://master:3306/bigdata25"], "table": ["stu"]}], "password": "123456", "username": "root", "where": "id >20"}}, "writer": {"name": "hdfswriter", "parameter": {"column": [{"name":"id","type":"string"},{"name":"stu_name","type":"string" },{"name":"age","type":"string"},{"name":"clazz","type":"string"},{"name":"gender","type":"string" }], "compress": "", "defaultFS": "hdfs://master:9000", "fieldDelimiter": ",", "fileName": "datax_mysqltohive1", "fileType": "text", "path": "/user/hive/warehouse/datax.db/datax_mysqltohive", "writeMode": "append"}}}], "setting": {"speed": {"channel": 1}}}
}#执行脚本命令:
datax.py  mysqlTohive.json
9、在使用datax过程中出现的错误:

1、配置文件出现错误,脚本不完整:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/276498.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Redis:持久化、线程模型、大 key

Redis持久化方式有什么方式? Redis 的读写操作都是在内存中,所以 Redis 性能才会高,但是当 Redis 重启后,内存中的数据就会丢失,那为了保证内存中的数据不会丢失,Redis 实现了数据持久化的机制&#xff0c…

【CenterFusion】CenterFusion网络架构概述

一、CenterFusion 概述 这个项目,重点研究毫米波雷达和相机传感器融合的方法利用毫米波雷达传感器数据和相机传感器数据进行 3D 目标检测并在 NuScenes 数据集上面进行评估CenterFusion 网络架构: CenterFusion 网络架构首先利用全卷积骨干网提取目标物…

【ArcGIS】栅格数据进行标准化(归一化)处理

栅格数据进行标准化(归一化)处理 方法1:栅格计算器方法2:模糊分析参考 栅格数据进行标准化(归一化)处理 方法1:栅格计算器 栅格计算器(Raster Calculator) 计算完毕后,得到归一化…

谷粒商城——分布式基础(全栈开发篇第一部分)

文章目录 一、服务治理网路数据支撑日志处理ELK应用监控集成工具开发工具 二、环境创建1、虚拟机创建2、虚拟机安装docker等1. 安装docker1. 配置阿里docker3.docker安装mysql错误 4、docker安装redis 3、软件1.Maven 阿里云镜像1.8jdk2、idea lombokmybatisX ,3、 …

[LVGL]:MACOS下使用LVGL模拟器

如何在MACOS下使用lvgl模拟器 1.安装必要环境 brew install sdl2查看sdl2安装位置: (base) ➜ ~ brew list sdl2 /opt/homebrew/Cellar/sdl2/2.30.1/bin/sdl2-config /opt/homebrew/Cellar/sdl2/2.30.1/include/SDL2/ (78 files) /opt/homebrew/Cellar/sdl2/2.3…

Vue3基础笔记(1)模版语法 属性绑定 渲染

Vue全称Vue.js是一种渐进式的JavaScript框架,采用自底向上增量开发的设计,核心库只关注视图层。性能丰富,完全有能力驱动采用单文件组件和Vue生态系统支持的库开发的复杂单页应用,适用于场景丰富的web前端框架。灵活性和可逐步集成…

Vue.js+SpringBoot开发个人健康管理系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 健康档案模块2.2 体检档案模块2.3 健康咨询模块 三、系统展示四、核心代码4.1 查询健康档案4.2 新增健康档案4.3 查询体检档案4.4 新增体检档案4.5 新增健康咨询 五、免责说明 一、摘要 1.1 项目介绍 基于JAVAVueSpri…

第十四届蓝桥杯省赛真题 Java 研究生 组【原卷】

文章目录 发现宝藏【考生须知】试题 A: 特殊日期试题 B: 与或异或试题 C: 棋盘试题 D: 子矩阵试题 E : \mathrm{E}: E: 互质数的个数试题 F: 小蓝的旅行计划试题 G: 奇怪的数试题 H: 太阳试题 I: 高塔试题 J \mathrm{J} J : 反异或 01 串 发现宝藏 前些天发现了一个巨牛的人…

YOLOv9改进 添加可变形注意力机制DAttention

一、Deformable Attention Transformer论文 论文地址:arxiv.org/pdf/2201.00520.pdf 二、Deformable Attention Transformer注意力结构 Deformable Attention Transformer包含可变形注意力机制,允许模型根据输入的内容动态调整注意力权重。在传统的Transformer中,注意力是…

C语言从入门到实战————数组和指针的深入理解

前言 在C语言中,数组和指针有的密切得联系,因为数组名本身就相当于一个指针常量。指针是一个变量,专门用来存储另一个变量的内存地址,通过这个地址可以访问和操作该变量的值,同时也包括数组。数组是一组连续存储的同类…

社交革命的引领者:探索Facebook如何改变我们的生活方式

1.数字社交的兴起 随着互联网的普及,社交媒体成为我们日常生活的重要组成部分。Facebook作为其中的先驱,从最初的社交网络演变成了一个拥有数十亿用户的全球化平台。它不仅改变了我们与世界互动的方式,还深刻影响了我们的社交习惯、人际关系以…

nut-ui组件库icon中使用阿里图标

1.需求 基本每个移动端组件库都有组件 icon组件 图标组件、 但是很多组件库中并找不到我们需要的图标 这时候 大家有可能会找图标库 最大众的就是iconfont的图标了 2.使用 有很多方式去使用这个东西 比如将再限链接中的css引入 在使用 直接下载图标 symbol 方式 等....…

解锁未知:探索 Web3 的创新与前景

在数字化时代的潮流下,Web3作为下一代互联网的关键构建,正引领着数字经济的崭新篇章。本文将深入探讨Web3的创新特性及其对未来发展的影响。 1. Web3 的崭新定义 Web3不仅是技术的革新,更是一种理念的演进。其核心特征包括去中心化、可编程性…

Linux编译器gcc/g++的功能与使用

一、程序的生成 首先,我们知道程序的编译分为四步: 1、预处理 2、编译 3、汇编 4、链接 1.1预处理 预处理功能主要包括头文件展开、宏定义、文件包含、条件编译、去注释等。 所谓的头文件展开就是在预处理时候,将头文件内容拷贝至源文…

探索TikTok云手机在社交媒体营销的作用

近年来,TikTok作为全球短视频平台之一,其用户基数呈现持续增长的趋势。伴随社交媒体的蓬勃发展,企业和个人纷纷涌入TikTok平台,追求更广泛的曝光和用户互动。为满足这一需求,TikTok云手机应运而生。本文将深度剖析TikT…

ETH共识升级之路

简介 根据我们之前的介绍,了解到ETH网络的共识方式,已经从 PoW 切换到了 PoS,今天我们就回顾下升级之路,以及升级带来的影响 最早的共识机制 PoW 以太坊创建之初采用了类似比特币的工作量证明机制,即矿工通过计算哈希函…

HandyControl PropertyGrid及自定义编辑器

前提条件 项目引入对应HandyControl对应版本包。 使用案例 UI部分 <Window xmlns:hc"https://handyorg.github.io/handycontrol"><hc:TabControl><hc:TabItem Header"默认样式"><hc:PropertyGrid Width"380" SelectedO…

Rust 深度学习库 Burn

一、概述 Burn 它是一个新的综合动态深度学习框架&#xff0c;使用 Rust 构建的&#xff0c;以极高的灵活性、计算效率和可移植性作为其主要目标。 Rust Burn 是一个以灵活性、高性能和易用性为核心设计原则工具&#xff0c;主打就是灵活性 、高性能 及易用性。 二、Rust B…

[蓝桥杯]-最大的通过数-CPP-二分查找、前缀和

目录 一、题目描述&#xff1a; 二、整体思路&#xff1a; 三、代码&#xff1a; 一、题目描述&#xff1a; 二、整体思路&#xff1a; 首先要知道不是他们同时选择序号一样的关卡通关&#xff0c;而是两人同时进行两个入口闯关。就是说两条通道存在相同关卡编号的的关卡被通…