王炸组合:Dolphinscheudler 3.1.*搭配SeaT unnel2.3.*高效完成异构数据数据集成

file

概述

本篇主要介绍如何通过Dolphinscheduler海豚调度搭配Seatunnel完成异构数据源之间的数据同步功能,这个在大数据流批一体数仓建设的过程中是一个非常好的解决方案, 稳定高效,只要用上了你肯定爱不释手。

环境准备

  • dolphinscheduler集群 >= 3.1.5
  • dolphinscheduler3.1.5版本源码
  • Seatunnel集群 >= 2.3.3

没有安装好以上准备环境的童鞋,请先参考我的另外两篇文章完成基础环境搭建基于Seatunnel最新2.3.5版本分布式集群安装部署指南(小白版)及dolphinscheduler分布式集群部署指南(小白版)再回到章节继续。

配置文件修改

这里说明一下, 通过海豚调度配置的Seatunnel数据同步任务最后都会被分配到DS集群的某个Worker组或者某个worker节点上进行执行,所以你要保证你的DS集群的目标worker节点上也安装了Seatunnel服务。这很重要,因为实际dolphisncheduler中定义的seatunnel任务实例到最后都是需要调用worker节点上的seatunnel服务在本地执行seatunnel的任务启动命令来完成任务提交和运行。

Dolphinscheduler的配置文件修改

因为我们需要使用seatunnel完成数据集成,所以我们需要在dolphinscheduler的系统环境变量中将我们的Seatunnel的安装目录进行配置。

找到你的dolphinscheduler主节点的安装目录下的$DOLPHINSCHEDULER_HOME/bin/env/dolphinscheduler_env.sh

设置SEATUNNEL_HOME的访问目录,将SEATUNNEL_HOME设置为你自己的SeaTunnel安装目录。

export SEATUNNEL_HOME=${SEATUNNEL_HOME:-/opt/software/seatunnel-2.3.5}

然后保存重启Dolphinscheduler集群即可完成配置修改同步到所有的api-server、master-server及worker-server节点。

Dolphinscheduler部分源码修改

为什么要修改Dolphinscheduler的源码? 因为我这里使用的Seatunnel的版本是2.3.5,使用的引擎不是Seatunnel的默认引擎, 用的是Spark引擎, Spark我用的版本是2.4.5, 所有我最后在命令执行的命令如下:

$SEATUNNEL_HOME/bin/start-seatunnel-spark-2-connector-v2.sh --master local[4] --deploy-mode client --config /opt/software/seatunnel-2.3.5/config/app-config/v2.batch.config.template

如果我用的是Spark3.X的版本,我执行命令如下:

$SEATUNNEL_HOME/bin/start-seatunnel-spark-3-connector-v2.sh --master local[4] --deploy-mode client --config /opt/software/seatunnel-2.3.5/config/app-config/v2.batch.config.template

然而在Dolphinscheduler3.1.5版本的Seatunnel任务插件中,存在一些问题没办法兼容, 首先是前端,这里引擎只支持Spark和Flink,没有针对具体的版本进行兼容,没办法自由的选择使用Spark2、Spark3还是FIink13、Flink15。 file

其次就是后端的代码。 file

找到EngineEnum类, 修改一下代码如下: file

public enum EngineEnum {// FLINK("${SEATUNNEL_HOME}/bin/start-seatunnel-flink.sh"),// SPARK("${SEATUNNEL_HOME}/bin/start-seatunnel-spark.sh");FLINK("${SEATUNNEL_HOME}/bin/start-seatunnel-flink-13-connector-v2.sh"),FLINK15("${SEATUNNEL_HOME}/bin/start-seatunnel-flink-15-connector-v2.sh"),SPARK("${SEATUNNEL_HOME}/bin/start-seatunnel-spark-2-connector-v2.sh"),SPARK3("${SEATUNNEL_HOME}/bin/start-seatunnel-spark-3-connector-v2.sh");private String command;EngineEnum(String command) {this.command = command;}public String getCommand() {return command;}
}

这样修改完毕之后就OK了, 然后编译打包dolphinscheduler的源码。

更新Dolphinscheduler集群中的SeaTunnel任务插件

项目编译打包完成之后,找到dolphinscheduler-task-seatunnel工程下的target目录下的dolphinscheduler-task-seatunnel-3.1.5.jar包, 上传到你的dolphinscheduler集群的主节点。

file

然后将主节点上DS安装目录下的api-server/libsmaster-server/libsworker-server/libsalert-server/libs目录(其实这里可以只替换woker-server/libs目录)下的dolphinscheduler-task-seatunnel-3.1.5.jar重命名为dolphinscheduler-task-seatunnel-3.1.5.jar.20240606(带上日期方便知道变更时间)。

file

然后将我们编译的dolphinscheduler-task-seatunnel-3.1.5.jar拷贝到这几个目录(api-server/libs、master-server/libs、worker-server/libs、alert-server/libs目录,确认一下是不是所有目录下都有这个dolphinscheduler-task-seatunnel-3.1.5.jar,没有的目录就直接略过)下。

然后使用主节点上的分发脚本,将api-server/libsmaster-server/libsworker-server/libsalert-server/libs的修改同步到其他的DS节点上,分发完成之后,检查一下分发是否成功。

最后就是重启我们的DS集群,通过以上步骤我们就完成了Dolphisncheduler中SeaTunnel插件的升级适配。

测试验证

我们通过dolphinscheduler的工作流定义页面定义一个Seatunnel数据同步的任务, 完成Oracle数据库表采集到MySQL数据库的任务, 下面我们来操作。

关于seatunnel任务配置脚本文件,官网的文档介绍如下:

  • Source: https://seatunnel.incubator.apache.org/zh-CN/docs/category/source-v2
  • Transform: https://seatunnel.incubator.apache.org/zh-CN/docs/category/transform-v2
  • Sink: https://seatunnel.incubator.apache.org/zh-CN/docs/category/sink-v2/

Source输入源配置定义说明

这里我们的输入原始Oracle, 所以直接从Source中查找Oracle相关的配置如何定义,官网给我们提供了不少任务示例,:

简单任务示例
# Defining the runtime environment
env {parallelism = 4job.mode = "BATCH"
}
source{Jdbc {url = "jdbc:oracle:thin:@datasource01:1523:xe"driver = "oracle.jdbc.OracleDriver"user = "root"password = "123456"query = "SELECT * FROM TEST_TABLE"}
}transform {}sink {Console {}
}
按分区列并行任务示例

并行读取你配置的分片字段和分片数据,如果你想读取整个表,可以这样做

env {parallelism = 4job.mode = "BATCH"
}
source {Jdbc {url = "jdbc:oracle:thin:@datasource01:1523:xe"driver = "oracle.jdbc.OracleDriver"connection_check_timeout_sec = 100user = "root"password = "123456"# 根据需要定义查询逻辑query = "SELECT * FROM TEST_TABLE"# 设置并行分片读取字段partition_column = "ID"# 分区切片数量partition_num = 10properties {database.oracle.jdbc.timezoneAsRegion = "false"}}
}
sink {Console {}
}
按主键或唯一索引并行任务示例

配置table_path会开启自动分割,可以配置split.*来调整分割策略

env {parallelism = 4job.mode = "BATCH"
}
source {Jdbc {url = "jdbc:oracle:thin:@datasource01:1523:xe"driver = "oracle.jdbc.OracleDriver"connection_check_timeout_sec = 100user = "root"password = "123456"table_path = "DA.SCHEMA1.TABLE1"query = "select * from SCHEMA1.TABLE1"split.size = 10000}
}sink {Console {}
}
并行上下限任务示例

指定查询的上下限内的数据效率更高按照你配置的上下限来读取你的数据源效率更高

source {Jdbc {url = "jdbc:oracle:thin:@datasource01:1523:xe"driver = "oracle.jdbc.OracleDriver"connection_check_timeout_sec = 100user = "root"password = "123456"# Define query logic as requiredquery = "SELECT * FROM TEST_TABLE"partition_column = "ID"# Read start boundarypartition_lower_bound = 1# Read end boundarypartition_upper_bound = 500partition_num = 10}
}
多表读取任务示例

配置table_list会开启自动分割,可以通过配置split.来调整分割策略*

env {job.mode = "BATCH"parallelism = 4
}
source {Jdbc {url = "jdbc:oracle:thin:@datasource01:1523:xe"driver = "oracle.jdbc.OracleDriver"connection_check_timeout_sec = 100user = "root"password = "123456""table_list"=[{"table_path"="XE.TEST.USER_INFO"},{"table_path"="XE.TEST.YOURTABLENAME"}]#where_condition= "where id > 100"split.size = 10000#split.even-distribution.factor.upper-bound = 100#split.even-distribution.factor.lower-bound = 0.05#split.sample-sharding.threshold = 1000#split.inverse-sampling.rate = 1000}
}sink {Console {}
}

Sink输出源配置定义说明

简单任务示例

本示例定义了一个SeaTunnel同步任务,通过FakeSource自动生成数据并发送到JDBC Sink。FakeSource一共生成16行数据(row.num=16),每行有两个字段name(string类型)和age(int类型)。最终的目标表为test_table,表中同样会有16行数据。运行此作业之前,你需要在mysql中创建数据库test和表test_table。如果你还没有安装和部署SeaTunnel,你需要按照安装SeaTunnel中的说明安装并部署SeaTunnel。然后按照快速开始使用SeaTunnel引擎中的说明运行此作业。

env {parallelism = 1job.mode = "BATCH"
}source {FakeSource {parallelism = 1result_table_name = "fake"row.num = 16schema = {fields {name = "string"age = "int"}}}
}transform {}sink {jdbc {url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"driver = "com.mysql.cj.jdbc.Driver"user = "root"password = "123456"query = "insert into test_table(name,age) values(?,?)"}
}
生成输出SQL任务示例

本示例无需编写复杂的sql语句,您可以配置输出端数据库名称表名称来自动为您生成添加语句

sink {jdbc {url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"driver = "com.mysql.cj.jdbc.Driver"user = "root"password = "123456"# 根据数据库表名自动生成sql语句generate_sink_sql = truedatabase = testtable = test_table}
}
精确任务示例

对于需要精确写入场景,我们保证精确一次。

sink {jdbc {url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"driver = "com.mysql.cj.jdbc.Driver"max_retries = 0user = "root"password = "123456"query = "insert into test_table(name,age) values(?,?)"is_exactly_once = "true"xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource"}
}
CDC(变更数据捕获)事件

我们也支持CDC变更数据在这种情况下,您需要配置数据库,表和primary_keys。

sink {jdbc {url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"driver = "com.mysql.cj.jdbc.Driver"user = "root"password = "123456" generate_sink_sql = true# You need to configure both database and tabledatabase = testtable = sink_tableprimary_keys = ["id","name"]field_ide = UPPERCASEschema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"data_save_mode="APPEND_DATA"}
}

完整测试脚本配置文件

下面给出本示例中完整的配置文件示例

env {parallelism = 4job.mode = "BATCH"
}
source{Jdbc {url = "jdbc:oracle:thin:@192.168.11.101:15210:YLAPP"driver = "oracle.jdbc.OracleDriver"user = "appuser001"password = "appuser001"query = "SELECT * FROM YL_APP.MET_COM_ICDOPERATION_LS"}
}transform {}sink {jdbc {url = "jdbc:mysql://192.168.10.210:13306/yl-app-new?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"driver = "com.mysql.cj.jdbc.Driver"user = "appuser001"password = "appuser001" generate_sink_sql = "true"database = "hive"table = "met_com_icdoperation_ls"schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"data_save_mode="APPEND_DATA"}
}

file

将上述脚本中的数据库配置信息修改成你的数据连接配置, 然后将脚本覆盖到上图脚本输入中, 保存工作流, 上线之后启动工作流。

file

到对应数据库验证

原来的Oracle数据库表

file

同步之后的MySQL数据库表

file

任务成功了, 数据也成功同步过来了, OK,测试通过!大家接下来可以在这个Demo的基础上进行更多的扩展和挖掘, 实战的多了, 你对于Dolphinscheduler和Seatunnel的架构和原理的理解就会越来越深入了,慢慢你也可以通过扩展源码来升级和拓展这些优秀开源框架的功能了。创作不易,如果我的文章对你有帮助欢迎点赞,收藏,送你一朵小红花~

原文链接:https://blog.csdn.net/qq_41865652/article/details/140971419

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/1889.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Wireshark抓包教程(2024最新版个人笔记)

改内容是个人的学习笔记 Wireshark抓包教程(2024最新版)_哔哩哔哩_bilibili 该课程笔记1-16 wireshark基础 什么是抓包工具:用来抓取数据包的一个软件 wireshark的功能:用来网络故障排查;用来学习网络技术 wireshark下…

Java Stream流操作List全攻略:Filter、Sort、GroupBy、Average、Sum实践

在Java 8及更高版本中,Stream API为集合处理带来了革命性的改变。本文将深入解析如何运用Stream对List进行高效的操作,包括筛选(Filter)、排序(Sort)、分组(GroupBy)、求平均值&…

机器学习头歌(第三部分-强化学习)

一、强化学习及其关键元素 二、强化学习的分类 三、任务与奖赏 import numpy as np# 迷宫定义 maze np.array([[0, 0, 0, 0, 0],[0, -1, -1, 0, 0],[0, 0, 0, -1, 0],[-1, -1, 0, -1, 0],[0, 0, 0, -1, 1] ])# 定义强化学习的参数 gamma 0.8 # 折扣因子 alpha 0.5 # 学习率…

小游戏前端地区获取

目前前端获取除了太平洋,没有其它的了。 //在JS中都是使用的UTF-8,然而requst请求后显示GBK却是乱码,对传入的GBK字符串,要用数据流接收,responseType: "arraybuffer" tt.request({url: "https://whoi…

美摄科技为企业打造专属PC端视频编辑私有化部署方案

美摄科技,作为视频编辑技术的先行者,凭借其在多媒体处理领域的深厚积累,为企业量身打造了PC端视频编辑私有化部署解决方案,旨在帮助企业构建高效、安全、定制化的视频创作平台,赋能企业内容创新,提升品牌影…

w160社区智慧养老监护管理平台设计与实现

🙊作者简介:多年一线开发工作经验,原创团队,分享技术代码帮助学生学习,独立完成自己的网站项目。 代码可以查看文章末尾⬇️联系方式获取,记得注明来意哦~🌹赠送计算机毕业设计600个选题excel文…

【ArcGIS微课1000例】0138:ArcGIS栅格数据每个像元值转为Excel文本进行统计分析、做图表

本文讲述在ArcGIS中,以globeland30数据为例,将栅格数据每个像元值转为Excel文本,便于在Excel中进行统计分析。 文章目录 一、加载globeland30数据二、栅格转点三、像元值提取至点四、Excel打开一、加载globeland30数据 打开配套实验数据包中的0138.rar中的tif格式栅格土地覆…

本地部署项目管理工具 Leantime 并实现外部访问

Leantime 是一款开源 AI 项目。它可以在本地直接运行大语言模型 LLM、生成图像、音频等。直接降低了用户使用AI的门褴。本文将详细的介绍如何利用 Docker 在本地部署 Leantime 并结合路由侠实现外网访问本地部署的 Leantime 。 第一步,本地部署安装 Leantime 1&am…

【HTML+CSS+JS+VUE】web前端教程-36-JavaScript简介

JavaScript介绍 JavaScript是一种轻量级的脚本语言,所谓脚本语言,指的是它不具备开发操作系统的能力,而是用来编写控制其他大型应用程序的“脚本” JavaScript是一种嵌入式语言,它本身提供的核心语法不算很多 为什么学习JavaScri…

what?ngify 比 axios 更好用,更强大?

文章目录 前言一、什么是ngify?二、npm安装三、发起请求3.1 获取 JSON 数据3.2 获取其他类型的数据3.3 改变服务器状态3.4 设置 URL 参数3.5 设置请求标头3.6 与服务器响应事件交互3.7 接收原始进度事件3.8 处理请求失败3.9 Http Observables 四、更换 HTTP 请求实现…

WINFORM - DevExpress -> gridcontrol ---->控件(ColumnEdit控件)

ImageComboBoxEdit--带图片的下拉菜单 DevExpress:带图片的下拉菜单ImageComboBoxEdit_weixin_34313182的博客-CSDN博客 ImageEdit--图片按钮 DevExpress控件中的gridcontrol表格控件,如何在属性中设置某一列显示为图片(图片按钮&#xff…

7.STM32F407ZGT6-RTC

参考: 1.正点原子 前言: RTC实时时钟是很基本的外设,用来记录绝对时间。做个总结,达到: 1.学习RTC的原理和概念。 2.通过STM32CubeMX快速配置RTC。 27.1 RTC 时钟简介 STM32F407 的实时时钟(RTC&#xf…

Vue2+OpenLayers添加/删除点、点击事件功能实现(提供Gitee源码)

目录 一、案例截图 二、安装OpenLayers库 三、安装Element-UI 四、代码实现 4.1、添加一个点 4.2、删除所有点 4.3、根据经纬度删除点 4.4、给点添加点击事件 4.5、完整代码 五、Gitee源码 一、案例截图 可以新增/删除标记点,点击标记点可以获取到当前标…

2025宝塔API一键建站系统PHP源码

源码介绍 2025宝塔API一键建站系统PHP源码,对接自己的支付,虚拟主机也能搭建,小白式建站系统,基于宝塔面板搭建的建站系统,功能丰富,多款模板,每日更新 上传源码到服务器,浏览器访问…

【计算机网络】深入浅出计算机网络

第一章 计算机网络在信息时代的作用 计算机网络已由一种通信基础设施发展成一种重要的信息服务基础设施 CNNIC 中国互联网网络信息中心 因特网概述 网络、互联网和因特网 网络(Network)由若干结点(Node)和连接这些结点的链路…

HTTP/HTTPS ⑤-CA证书 || 中间人攻击 || SSL/TLS

这里是Themberfue ✨上节课我们聊到了对称加密和非对称加密,实际上,单纯地非对称加密并不能保证数据不被窃取,我们还需要一个更加重要的东西——证书 中间人攻击 通过非对称加密生成私钥priKey和公钥pubKey用来加密对称加密生成的密钥&…

C# 25Dpoint

C# 25Dpoint ,做一个备份 using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.Text; using System.Windows.Forms;namespace _25Dpoint {public partial cl…

网络层协议-----IP协议

目录 1.认识IP地址 2.IP地址的分类 3.子网划分 4.公网IP和私网IP 5.IP协议 6.如何解决IP地址不够用 1.认识IP地址 IP 地址(Internet Protocol Address)是指互联网协议地址。 它是分配给连接到互联网的设备(如计算机、服务器、智能手机…

如何在Jupyter中快速切换Anaconda里不同的虚拟环境

目录 介绍 操作步骤 1. 选择环境,安装内核 2. 注册内核 3. 完工。 视频教程 介绍 很多网友在使用Jupyter的时候会遇到各种各样的问题,其中一个比较麻烦的问题就是我在Anaconda有多个Python的环境里面,如何让jupyter快速切换不同的Pyt…

分布式数据存储基础与HDFS操作实践(副本)

以下为作者本人撰写的报告,步骤略有繁琐,不建议作为参考内容,可以适当浏览,进一步理解。 一、实验目的 1、理解分布式文件系统的基本概念和工作原理。 2、掌握Hadoop分布式文件系统(HDFS)的基本操作。 …