使用Apache Flink实现实时数据同步与清洗:MySQL和Oracle到目标MySQL的ETL流程

使用Apache Flink实现实时数据同步与清洗:MySQL和Oracle到目标MySQL的ETL流程

实现数据同步的ETL(抽取、转换、加载)过程通常涉及从源系统(如数据库、消息队列或文件)中抽取数据,进行必要的转换,然后将数据加载到目标系统(如另一个数据库或数据仓库)。在这里,我们将展示如何使用Apache Flink来实现一个从MySQL或Oracle数据库抽取数据并同步到另一个MySQL数据库的ETL过程。

  • 1. 从源数据库(MySQL和Oracle)实时抽取数据
  • 2. 对数据进行清洗和转换
  • 3. 将转换后的数据写入目标数据库(MySQL)
    请添加图片描述

我们将使用Apache Flink来实现这个流程。Flink具有强大的数据流处理能力,适合处理实时数据同步和转换任务。

环境准备

  • 确保MySQL和Oracle数据库运行**,并创建相应的表。
  • 创建Spring Boot项目,并添加Flink、MySQL JDBC、和Oracle JDBC驱动的依赖。

第一步:创建源和目标数据库表

假设我们有以下三个表:

  • source_mysql_table(MySQL中的源表)
  • source_oracle_table(Oracle中的源表)
  • target_table(目标MySQL表)

MySQL源表

CREATE DATABASE source_mysql_db;
USE source_mysql_db;CREATE TABLE source_mysql_table (id INT AUTO_INCREMENT PRIMARY KEY,user_id VARCHAR(255) NOT NULL,action VARCHAR(255) NOT NULL,timestamp VARCHAR(255) NOT NULL
);

Oracle源表

CREATE TABLE source_oracle_table (id NUMBER GENERATED BY DEFAULT ON NULL AS IDENTITY,user_id VARCHAR2(255) NOT NULL,action VARCHAR2(255) NOT NULL,timestamp VARCHAR2(255) NOT NULL,PRIMARY KEY (id)
);

目标MySQL表

CREATE DATABASE target_db;
USE target_db;CREATE TABLE target_table (id INT AUTO_INCREMENT PRIMARY KEY,user_id VARCHAR(255) NOT NULL,action VARCHAR(255) NOT NULL,timestamp VARCHAR(255) NOT NULL
);

第二步:添加项目依赖

在pom.xml中添加Flink、MySQL和Oracle相关的依赖:

<dependencies><!-- Spring Boot dependencies --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!-- Apache Flink dependencies --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.14.0</version></dependency><!-- MySQL JDBC driver --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.23</version></dependency><!-- Oracle JDBC driver --><dependency><groupId>com.oracle.database.jdbc</groupId><artifactId>ojdbc8</artifactId><version>19.8.0.0</version></dependency>
</dependencies>

第三步:编写Flink ETL任务

创建一个Flink任务类来实现ETL逻辑。

创建一个POJO类表示数据结构

package com.example.flink;public class UserAction {private int id;private String userId;private String action;private String timestamp;// Getters and setterspublic int getId() {return id;}public void setId(int id) {this.id = id;}public String getUserId() {return userId;}public void setUserId(String userId) {this.userId = userId;}public String getAction() {return action;}public void setAction(String action) {this.action = action;}public String getTimestamp() {return timestamp;}public void setTimestamp(String timestamp) {this.timestamp = timestamp;}
}

编写Flink任务类

package com.example.flink;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;@Component
public class FlinkETLJob implements CommandLineRunner {@Overridepublic void run(String... args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 从MySQL读取数据DataStream<UserAction> mysqlDataStream = env.addSource(new MySQLSource());// 从Oracle读取数据DataStream<UserAction> oracleDataStream = env.addSource(new OracleSource());// 合并两个数据流DataStream<UserAction> mergedStream = mysqlDataStream.union(oracleDataStream);// 清洗和转换数据DataStream<UserAction> transformedStream = mergedStream.map(new MapFunction<UserAction, UserAction>() {@Overridepublic UserAction map(UserAction value) throws Exception {// 进行清洗和转换value.setAction(value.getAction().toUpperCase());return value;}});// 将数据写入目标MySQL数据库transformedStream.addSink(new MySQLSink());// 执行任务env.execute("Flink ETL Job");}public static class MySQLSource implements SourceFunction<UserAction> {private static final String JDBC_URL = "jdbc:mysql://localhost:3306/source_mysql_db";private static final String JDBC_USER = "source_user";private static final String JDBC_PASSWORD = "source_password";private volatile boolean isRunning = true;@Overridepublic void run(SourceContext<UserAction> ctx) throws Exception {try (Connection connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD)) {while (isRunning) {String sql = "SELECT * FROM source_mysql_table";try (PreparedStatement statement = connection.prepareStatement(sql);ResultSet resultSet = statement.executeQuery()) {while (resultSet.next()) {UserAction userAction = new UserAction();userAction.setId(resultSet.getInt("id"));userAction.setUserId(resultSet.getString("user_id"));userAction.setAction(resultSet.getString("action"));userAction.setTimestamp(resultSet.getString("timestamp"));ctx.collect(userAction);}}Thread.sleep(5000); // 模拟实时数据流,每5秒查询一次}}}@Overridepublic void cancel() {isRunning = false;}}public static class OracleSource implements SourceFunction<UserAction> {private static final String JDBC_URL = "jdbc:oracle:thin:@localhost:1521:orcl";private static final String JDBC_USER = "source_user";private static final String JDBC_PASSWORD = "source_password";private volatile boolean isRunning = true;@Overridepublic void run(SourceContext<UserAction> ctx) throws Exception {try (Connection connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD)) {while (isRunning) {String sql = "SELECT * FROM source_oracle_table";try (PreparedStatement statement = connection.prepareStatement(sql);ResultSet resultSet = statement.executeQuery()) {while (resultSet.next()) {UserAction userAction = new UserAction();userAction.setId(resultSet.getInt("id"));userAction.setUserId(resultSet.getString("user_id"));userAction.setAction(resultSet.getString("action"));userAction.setTimestamp(resultSet.getString("timestamp"));ctx.collect(userAction);}}Thread.sleep(5000); // 模拟实时数据流,每5秒查询一次}}}@Overridepublic void cancel() {isRunning = false;}}public static class MySQLSink extends RichFlatMapFunction<UserAction, Void> {private static final String JDBC_URL = "jdbc:mysql://localhost:3306/target_db";private static final String JDBC_USER = "target_user";private static final String JDBC_PASSWORD = "target_password";private transient Connection connection;private transient PreparedStatement statement;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD);String sql = "INSERT INTO target_table (user_id, action, timestamp) VALUES (?, ?, ?)";statement = connection.prepareStatement(sql);}@Overridepublic void flatMap(UserAction value, Collector<Void> out) throws Exception {statement.setString(1, value.getUserId());statement.setString(2, value.getAction());statement.setString(3, value.getTimestamp());statement.executeUpdate();}@Overridepublic void close() throws Exception {super.close();if (statement != null) {statement.close();}if (connection != null) {connection.close();}}}
}

第四步:配置Spring Boot

在application.properties中添加必要的配置:

# Spring Boot configuration
server.port=8080

第五步:运行和测试

  • 启动MySQL和Oracle数据库:确保你的源和目标数据库已经运行,并且创建了相应的数据库和表。
  • 启动Spring Boot应用:启动Spring Boot应用程序,会自动运行Flink ETL任务。
  • 测试Flink ETL任务:插入一些数据到源数据库的表中,验证数据是否同步到目标数据库的表中。

总结

通过上述步骤,你可以在Spring Boot项目中集成Flink并实现实时数据同步和ETL流程。这个示例展示了如何从MySQL和Oracle源数据库实时抽取数据,进行数据清洗和转换,并将结果加载到目标MySQL数据库中。根据你的具体需求,你可以扩展和修改这个示例,处理更复杂的数据转换和加载逻辑。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/359495.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2024最新版Node.js下载安装及环境配置教程(非常详细)

一、进入官网地址下载安装包 官网&#xff1a;Node.js — Run JavaScript Everywhere 其他版本下载&#xff1a;Node.js — Download Node.js (nodejs.org) 选择对应你系统的Node.js版本 二、安装程序 &#xff08;1&#xff09;下载完成后&#xff0c;双击安装包&#xf…

Go的GUI Fyne开发环境搭建—Windows 11

安装go 到官网下载安装go安装包 https://go.dev/learn/ 通过如下命令检验安装是否成功&#xff0c;出现版本号则安装成功 go version安装国内go依赖包代理 go env -w GOPROXYhttps://goproxy.cn安装gcc编译器 直接用官网提供的安装建议第二条&#xff0c;到这个地址进行下载…

二刷算法训练营Day41 (Day40休息) | 动态规划(3/17)

目录 详细布置&#xff1a; 1. 背包问题理论基础 1.1 01背包 2. 46. 携带研究材料&#xff08;第六期模拟笔试&#xff09; 一维dp数组&#xff08;滚动数组&#xff09; 3. 416. 分割等和子集 详细布置&#xff1a; 1. 背包问题理论基础 但说实话&#xff0c;背包九讲…

C#开发-集合使用和技巧(六)特殊转换方法SelectMany的介绍和用法

C#开发-集合使用和技巧&#xff08;六&#xff09; 特殊转换方法SelectMany的介绍和用法 介绍使用示例Select与SelectMany对比特殊情况 介绍 SelectMany 方法在C#中用于将集合中的元素转换为其他类型的集合&#xff0c;并将这些集合扁平化为一个单一的序列。它是LINQ的一部分…

Unity URP下通过相机让部分Render不受后处理渲染

我们有时候不想某些对象受到后处理影响&#xff0c;找到了这样一个决绝办法&#xff0c;通过增加一个Overlay相机只照射这个模型来实现&#xff0c;下面看看如何实现。 第一步 首先我们拖一个测试场景&#xff0c;有如下一些元素 一个盒子&#xff0c;以后后处理&#xff0c…

Python武器库开发-武器库篇之ThinkPHP6 多语言本地文件包含漏洞(六十七)

Python武器库开发-武器库篇之ThinkPHP6 多语言本地文件包含漏洞&#xff08;六十七&#xff09; 漏洞环境搭建 这里我们使用Kali虚拟机安装docker并搭建vulhub靶场来进行ThinkPHP漏洞环境的安装&#xff0c;我们进入 ThinkPHP漏洞环境&#xff0c;可以 cd ThinkPHP&#xff0…

【Solr 学习笔记】Solr 源码启动教程

Solr 源码启动教程 本教程记录了如何通过 IDEA 启动并调试 Solr 源码&#xff0c;从 Solr9 开始 Solr 项目已由 ant 方式改成了 gradle 构建方式&#xff0c;本教程将以 Solr 9 为例进行演示&#xff0c;IDE 选择使用 IntelliJ IDEA。 Solr github 地址&#xff1a;https://gi…

【机器学习】机器学习重要方法——深度学习:理论、算法与实践

文章目录 引言第一章 深度学习的基本概念1.1 什么是深度学习1.2 深度学习的历史发展1.3 深度学习的关键组成部分 第二章 深度学习的核心算法2.1 反向传播算法2.2 卷积神经网络&#xff08;CNN&#xff09;2.3 循环神经网络&#xff08;RNN&#xff09; 第三章 深度学习的应用实…

AI交互及爬虫【数据分析】

各位大佬好 &#xff0c;这里是阿川的博客&#xff0c;祝您变得更强 个人主页&#xff1a;在线OJ的阿川 大佬的支持和鼓励&#xff0c;将是我成长路上最大的动力 阿川水平有限&#xff0c;如有错误&#xff0c;欢迎大佬指正 Python 初阶 Python–语言基础与由来介绍 Python–…

区块链实验室(37) - 交叉编译百度xuperchain for arm64

纠结了很久&#xff0c;终于成功编译xuperchain for arm64。踩到1个坑&#xff0c;说明如下。 1、官方文档是这么说的&#xff1a;go语言版本推荐1.5-1.8 2、但是同一个页面&#xff0c;又是这么说的&#xff1a;不推荐使用1.11之前的版本。 3、问题来了&#xff1a;用什么版本…

2024年特种设备(门式起重机司机)考试真题题库。

181."ZZ"表示钢丝绳为&#xff08; &#xff09;。 A.右同向捻 B.左同向捻 C.右交互捻 D.左交互捻 答案:A 182.桥式起重机的金属结构主要由起重机桥架&#xff08;又称大车桥架&#xff09;、&#xff08; &#xff09;和操纵室&#xff08;司机室&#xff09;…

提升工作效率的实体和虚拟工具推荐

在现代工作中&#xff0c;我们常常需要利用各种工具来提高工作效率。本文将介绍一款实体工具和一款虚拟工具&#xff0c;它们都能够有效地提升工作效率&#xff0c;让我们更高效地完成任务。 实体工具&#xff1a;金鸣表格文字识别大师 金鸣表格文字识别大师是一款优秀的文字识…

Day 32:503. 下一个更大的元素Ⅱ

Leetcode 503. 下一个更大的元素Ⅱ 给定一个循环数组 nums &#xff08; nums[nums.length - 1] 的下一个元素是 nums[0] &#xff09;&#xff0c;返回 nums 中每个元素的 下一个更大元素 。 数字 x 的 下一个更大的元素 是按数组遍历顺序&#xff0c;这个数字之后的第一个比它…

Ltv 数据粘包处理

测试数据包的生成 校验程序处理结果和原始的日志保温解析是否一致 程序粘包分解正常

【NPS】哑终端设备如何实现域VLAN动态分配

在【NPS】微软NPS配置802.1x&#xff0c;验证域账号&#xff0c;动态分配VLAN&#xff08;有线网络续篇&#xff09;中&#xff0c;已经通过C3PL策略配置实现了802.1x验证没有通过时&#xff0c;自动分配一个Guest VLAN&#xff0c;以确保用户至少能够访问基本的网络服务。问题…

mysql学习——SQL中的DQL和DCL

SQL中的DQL和DCL DQL基本查询条件查询聚合函数分组查询排序查询分页查询 DCL管理用户权限控制 学习黑马MySQL课程&#xff0c;记录笔记&#xff0c;用于复习。 DQL DQL英文全称是Data Query Language(数据查询语言)&#xff0c;数据查询语言&#xff0c;用来查询数据库中表的记…

Windows资源管理器down了,怎么解

ctrlshiftesc 打开任务管理器 文件 运行新任务 输入 Explorer.exe 资源管理器重启 问题解决 桌面也回来了

java基于ssm+jsp 美好生活日志网

1前台首页功能模块 九宫格日志网站&#xff0c;在系统首页可以查看首页、日记信息、美食信息、景点信息、新闻推荐、日志展示、论坛信息、新闻资讯、留言反馈、我的、跳转到后台等内容&#xff0c;如图1所示。 图1前台首页功能界面图 用户注册&#xff0c;在用户注册页面可以填…

MySQL----undo log回滚日志原理、流程以及与redo log比较

回滚日志 回滚日志&#xff0c;保存了事务发生之前的数据的一个版本&#xff0c;用于事务执行时的回滚操作&#xff0c;同时也是实现多版本并发控制&#xff08;MVCC&#xff09;下读操作的关键技术。 如何理解Undo Log 事务需要保证原子性&#xff0c;也就是事务中的操作要…

【CentOS 7】深入指南:使用LVM和扩展文件系统增加root分区存储容量

【CentOS 7】深入指南&#xff1a;使用LVM和扩展文件系统增加root分区存储容量 大家好 我是寸铁&#x1f44a; 【CentOS 7】深入指南&#xff1a;使用LVM和扩展文件系统增加root分区存储容量 ✨ 喜欢的小伙伴可以点点关注 &#x1f49d; 前言 在运行CentOS 7服务器或虚拟机时&a…