Flink安装和Flink CDC实现数据同步

 一,Flink 和Flink CDC

1, Flink

Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。

中文文档 Apache Flink Documentation | Apache Flink

官方文档 :https://flink.apache.org
Flink 中文社区视频课程:https://github.com/flink-china/flink-training-course
Flink 中文社区 :https://www.slidestalk.com/FlinkChina
ververica 教程 :https://training.ververica.com/
ververica 教程中文文档:https://ci.apache.org/projects/flink/flink-docs-master/zh/
源码:https://github.com/apache/flink
Flink 知识图谱 https://developer.aliyun.com/article/744741

2,Flink CDC

CDC 的全称是 Change Data Capture ,在广义的概念上,只要是能捕获数据变更的技术,我们都可以称之为 CDC 。

目前通常描述的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。CDC 技术的应用场景非常广泛:

  •  数据迁移:常用于数据库备份、容灾等;
  •  数据分发:将一个数据源分发给多个下游,常用于业务解耦、微服务;
  •  数据采集:将分散异构的数据源集成到数据仓库中,消除数据孤岛,便于后续的分析。

目前业界主流的 CDC 实现机制可以分为两种:

2.1 基于查询的 CDC

  • 离线调度查询作业,批处理。依赖表中的更新时间字段,每次执行查询去获取表中最新的数据;
  • 无法捕获删除事件,从而无法保证数据一致性;
  • 无法保障实时性,基于离线调度存在天然的延迟。

2.2 基于日志的 CDC

  • 实时消费日志,流处理。例如 MySQL 的 binlog 日志完整记录了数据库中的变更,可以把binlog 文件当作流的数据源;
  • 保障数据一致性,因为 binlog 文件包含了所有历史变更明细;
  • 保障实时性,因为类似 binlog 的日志文件是可以流式消费的,提供的是实时数据。

2.3 主流CDC技术对比

1) DataX 不支持增量同步,Canal 不支持全量同步。虽然两者都是非常流行的数据同步工具,但在场景支持上仍不完善。

2) 在全量+增量一体化同步方面,只有 Flink CDC、Debezium、Oracle Goldengate 支持较好。

3) 在架构方面,Apache Flink 是一个非常优秀的分布式流处理框架,因此 Flink CDC 作为Apache Flink 的一个组件具有非常灵活的水平扩展能力。而 DataX 和 Canal 是个单机架构,在大数据场景下容易面临性能瓶颈的问题。

4) 在数据加工的能力上,CDC 工具是否能够方便地对数据做一些清洗、过滤、聚合,甚至关联打宽。Flink CDC 依托强大的 Flink SQL 流式计算能力,可以非常方便地对数据进行加工。而Debezium 等则需要通过复杂的 Java 代码才能完成,使用门槛比较高。

5) 另外,在生态方面,这里指的是上下游存储的支持。Flink CDC 上下游非常丰富,支持对接MySQL、PostgreSQL 等数据源,还支持写入到 TiDB、HBase、Kafka、Hudi 等各种存储系统中,也支持灵活的自定义 connector。

二,安装Flink

1,为了运行Flink,只需提前安装好 Java 11或以上版本

java -version

2,下载Flink

下载地址:Index of /dist/flinkicon-default.png?t=O83Ahttps://archive.apache.org/dist/flink

可以找到你要安装的版本

wget https://archive.apache.org/dist/flink/flink-1.19.1/flink-1.19.1-bin-scala_2.12.tgz

3,解压

 tar -xzf flink-1.19.1-bin-scala_2.12.tgz

4,进入到flink-1.19.1目录,启动集群

./bin/start-cluster.sh

5,如果想访问WebUI,可以看下面

【问题解决】Flink在linux上运行成功但是无法访问webUI界面_flink web ui的ip访问-CSDN博客

6,停止集群

./bin/stop-cluster.sh

三,运行Flink示例

在examples目录下有很多示例,可以试着运行

1,运行单词统计示例

./bin/flink run examples/streaming/WordCount.jar

进入到log目录,查看日志

tail -f flink-root-taskexecutor-0-rocky8-template.out

四,运行Flink CDC示例

1,通过编写sql脚本来实现同步数据

SET execution.checkpointing.interval = 5s;drop table if exists  order_01;
CREATE TABLE order_01 (id INT NOT NULL,`order_id`  VARCHAR,amount VARCHAR,remark VARCHAR,PRIMARY KEY (`id`) NOT ENFORCED) WITH ('connector' = 'mysql-cdc','hostname' = 'x.x.x.x','port' = '3306','username' = 'root','password' = 'p4ssword','database-name' = 'account','server-time-zone' = 'UTC','table-name' = 'order_01');drop table if exists  order_02;
CREATE TABLE order_02 (id INT NOT NULL,`order_id`  VARCHAR,amount VARCHAR,remark VARCHAR,PRIMARY KEY (`id`) NOT ENFORCED) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://x.x.x.x:3306/account?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC','username' = 'root','password' = 'p4ssword','table-name' = 'order_02','driver' = 'com.mysql.cj.jdbc.Driver','scan.fetch-size' = '200');INSERT INTO order_02
SELECT id,order_id,amount,remark
FROM order_01;

编写以上脚本,命名为flinkCdc2Mysql.sql,上传到flink的sql目录下,这里的sql是我新建的,你可以自己指定。

需要下载flink-connecter和flink-sql-connector包

下载地址:Central Repository: com/ververica

 通过以下命令执行

./bin/sql-client.sh -f ./sql/flinkCdc2Mysql.sql

就可以完成数据从order_01表同步到order_02表。

2, 编写Java类,编译成jar,执行jar来实现数据同步

AccountVoucherSumaryDWSSQL类
package com.xxx.demoimport org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** @date 2024/10/31 下午3:57*/
public class AccountVoucherSumaryDWSSQL {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);env.enableCheckpointing(5000);final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);tEnv.executeSql("CREATE DATABASE IF NOT EXISTS account");// 动态表,此为source表tEnv.executeSql("CREATE TABLE account.order_01 (\n" +"    id                           INT,\n" +"    order_id                     VARCHAR,\n" +"    amount                       VARCHAR,\n" +"    remark                       VARCHAR,\n" +"    PRIMARY KEY (id) NOT ENFORCED\n" +") WITH (\n" +"  'connector' = 'mysql-cdc',\n" +"  'hostname' = 'x.x.x.x',\n" +"  'port' = '3306',\n" +"  'username' = 'root',\n" +"  'password' = 'p4ssword',\n" +"  'database-name' = 'account',\n" +"  'table-name' = 'order_01',\n" +"  'server-time-zone' = 'UTC',\n" +"  'scan.incremental.snapshot.enabled' = 'false'\n" +")");// 动态表,此为sink表。sink表和source表的connector不一样tEnv.executeSql("CREATE TABLE account.order_02 (\n" +"    id                           INT,\n" +"    order_id                     VARCHAR,\n" +"    amount                       VARCHAR,\n" +"    remark                       VARCHAR,\n" +"    PRIMARY KEY (id) NOT ENFORCED\n" +") WITH (\n" +"  'connector' = 'jdbc',\n" +"  'url' = 'jdbc:mysql://x.x.x.x:3306/account?useSSL=false&allowPublicKeyRetrieval=true&connectionTimeZone=UTC',\n" +"  'username' = 'root',\n" +"  'password' = 'p4ssword',\n" +"  'table-name' = 'order_02',\n" +"  'sink.buffer-flush.max-rows' = '1',\n" +"  'sink.buffer-flush.interval' = '1s',\n" +"  'sink.max-retries' = '3' \n"+")");tEnv.executeSql("INSERT INTO account.order_02 (id, order_id, amount, remark)\n" +" select t1.id,\n" +"       t1.order_id,\n" +"       t1.amount,\n" +"       t1.remark\n" +" from account.order_01 t1 \n");env.execute();}
}

pom文件
 

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.xxx.demo</groupId><artifactId>FlinkDemo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><scala.version>2.12</scala.version><java.version>8</java.version><flink.version>1.19.1</flink.version><fastjson.version>1.2.62</fastjson.version><hadoop.version>2.8.3</hadoop.version><scope.mode>compile</scope.mode><slf4j.version>1.7.30</slf4j.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.2.0-1.19</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>3.2.0-1.19</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><!-- Add log dependencies when debugging locally --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.7</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.26</version><scope>compile</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version></dependency><!-- Test dependencies --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-test-utils</artifactId><version>${flink.version}</version><scope>test</scope></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.26</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.doris</groupId><artifactId>flink-doris-connector-1.15</artifactId><version>1.2.1</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>3.0.1</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>8</source><target>8</target></configuration></plugin><!-- 打fatjar配置 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.3.0</version><configuration><archive><manifest><!--这里指定要运行的main类--><mainClass>com.xxx.demo.AccountVoucherSumaryDWSSQL</mainClass></manifest></archive><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id> <!-- 此处指定继承合并 --><phase>package</phase> <!-- 绑定到打包阶段 --><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>

完成之后,通过maven来打包。

上传包到flink下examples目录下,这里也新建一个目录CDC

通过以下命令来运行

./bin/flink run examples/cdc/FlinkDemo-1.0-SNAPSHOT-jar-with-dependencies.jar

上面两种运行方式运行之后,都可以去Flink的Web页面查看到运行的任务。

五,小结

上面只是搭建了Flink,并运行了示例项目,初步体验了一下Flink的功能,还完全到不了实际项目中运用的程度。后续会一步步去探索Flink在项目中应用的技巧。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/468119.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

VBA高级应用30例应用3在Excel中的ListObject对象:插入行和列

《VBA高级应用30例》&#xff08;版权10178985&#xff09;&#xff0c;是我推出的第十套教程&#xff0c;教程是专门针对高级学员在学习VBA过程中提高路途上的案例展开&#xff0c;这套教程案例与理论结合&#xff0c;紧贴“实战”&#xff0c;并做“战术总结”&#xff0c;以…

windows运行ffmpeg的脚本报错:av_ts2str、av_ts2timestr、av_err2str => E0029 C4576

问题描述 我目前的环境是&#xff1a; 编辑器&#xff1a; Microsoft Visual Studio Community 2022 (64 位) 运行的脚本是ffmpeg自带的remux样例&#xff0c;只不过我想用c语言执行这个样例。在执行的过程中报错如下图&#xff1a; C4576 后跟初始值设定项列表的带圆括…

如何利用 Python 的爬虫技术获取淘宝天猫商品的价格信息?

以下是使用 Python 的爬虫技术获取淘宝天猫商品价格信息的两种常见方法&#xff1a; 方法一&#xff1a;使用 Selenium 一、环境准备&#xff1a; 安装 selenium 库&#xff1a;在命令行中运行 pip install selenium。下载浏览器驱动&#xff1a;如 ChromeDriver&#xff08;确…

Linux系统程序设计--2. 文件I/O

文件I/O 标准C的I/O FILE结构体 下面只列出了5个成员 可以观察到&#xff0c;有些函数没有FILE类型的结构体指针例如printf主要是一些标准输出&#xff0c;因为其内部用到了stdin&#xff0c;stdout&#xff0c;stderr查找文件所在的位置:find \ -name stat.h查找头文件所…

Spark 中 RDD 的诞生:原理、操作与分区规则

Spark 的介绍与搭建&#xff1a;从理论到实践-CSDN博客 Spark 的Standalone集群环境安装与测试-CSDN博客 PySpark 本地开发环境搭建与实践-CSDN博客 Spark 程序开发与提交&#xff1a;本地与集群模式全解析-CSDN博客 Spark on YARN&#xff1a;Spark集群模式之Yarn模式的原…

[2024最新] macOS 发起 Bilibili 直播(不使用 OBS)

文章目录 1、B站账号 主播认证2、开启直播3、直播设置添加素材、隐私设置指定窗口添加/删除 窗口 4、其它说明官方直播帮助中心直播工具教程 目前搜到的 macOS 直播教程都比较古早&#xff0c;大部分都使用 OBS&#xff0c;一番探索下来&#xff0c;发现目前已经不需要 OBS了&a…

内核设备树,你真的了解吗?

在嵌入式系统和内核开发中&#xff0c;设备树&#xff08;Device Tree, 简称 DT&#xff09;扮演着至关重要的角色&#xff0c;帮助系统在启动时准确识别硬件配置并匹配合适的驱动程序。虽然设备树应用广泛&#xff0c;但其结构、工作机制及应用细节却不总是被深入理解。本文将…

yelp数据集上识别潜在的热门商家

yelp数据集是研究B2C业态的一个很好的数据集&#xff0c;要识别潜在的热门商家是一个多维度的分析过程&#xff0c;涉及用户行为、商家特征和社区结构等多个因素。从yelp数据集里我们可以挖掘到下面信息有助于识别热门商家 用户评分和评论分析 评分均值: 商家的平均评分是反映其…

Mac如何将多个pdf文件归并到一个

电脑&#xff1a;MacBook Pro M1 操作方式&#xff1a; very easy 选中想要归并的所有pdf文件&#xff0c;然后 右键 -> quick actions -> Create PDF 然后就可以看到将所选pdf文件归并为一个pdf的文件了

华为eNSP实验:IP Source Guard

一&#xff1a;IP Source Guard: IP Source Guard&#xff08;简称IPSG&#xff09;是一种基于二层接口的源IP地址过滤技术&#xff0c;用于防止恶意主机伪造合法主机的IP地址进行网络攻击。以下是对IP Source Guard的详细解析&#xff1a; 基本概念&#xff1a; IP Source Gu…

API接口精准获取商品详情信息案例

在当今数字化时代&#xff0c;电子商务平台的蓬勃发展&#xff0c;使得商品信息的获取变得尤为重要。API&#xff08;Application Programming Interface&#xff0c;应用程序编程接口&#xff09;作为连接前端用户界面与后端服务的桥梁&#xff0c;扮演着至关重要的角色。本文…

比流计算资源效率最高提升 1000 倍,“增量计算”新模式能否颠覆数据分析?

作者 | 关涛 云器科技CTO 数据平台领域发展 20 年&#xff0c;逐渐成为每个企业的基础设施。作为一个进入“普惠期”的领域&#xff0c;当下的架构已经完美了吗&#xff0c;主要问题和挑战是什么&#xff1f;在 2023 年 AI 跃变式爆发的大背景下&#xff0c;数据平台又该如何演…

牧神记开分9.7,2024新国漫巅峰出现了

现在国漫越来越卷了&#xff0c;卷播放量也卷评分。最近&#xff0c;b站上线不久的国漫《牧神记》开分9.7&#xff0c;口碑还是相当不错的&#xff0c;已经和《凡人修仙传》评分齐平。这部国漫仅仅播出4集&#xff0c;为什么就能获得这么高的评分呢&#xff1f;下面就一起来看看…

MeterSphere接口自动化-ForEach循环

接口自动化场景&#xff1a;一个接口根据不同的参数取值来运行测试&#xff0c;本场景中只有一个参数来去不同值。举例如下&#xff1a; https:://test.csdn/query?placementList1接口&#xff0c;测试id1,2,3时&#xff0c;断言接口返回的data数据都有返回。&#xff08;当然…

ServletContext介绍

文章目录 1、ServletContext对象介绍1_方法介绍2_用例分析 2、ServletContainerInitializer1_整体结构2_工作原理3_使用案例 3、Spring案例源码分析1_注册DispatcherServlet2_注册配置类3_SpringServletContainerInitializer 4_总结 ServletContext 表示上下文对象&#xff0c;…

聚观早报 | 比亚迪腾势D9登陆泰国;苹果 iOS 18.2 将发布

聚观早报每日整理最值得关注的行业重点事件&#xff0c;帮助大家及时了解最新行业动态&#xff0c;每日读报&#xff0c;就读聚观365资讯简报。 整理丨Cutie 11月5日消息 比亚迪腾势D9登陆泰国 苹果 iOS 18.2 将发布 真我GT7 Pro防尘防水细节 小米15 Ultra最快明年登场 …

【Linux系统编程】第四十四弹---从TID到线程封装:全面掌握线程管理的核心技巧

✨个人主页&#xff1a; 熬夜学编程的小林 &#x1f497;系列专栏&#xff1a; 【C语言详解】 【数据结构详解】【C详解】【Linux系统编程】 目录 1、tid是什么 1.1、理解库 1.2、理解tid 1.3、tid中线程局部存储 2、封装线程 2.1、基本结构 2.2、函数实现 2.3、使用…

智慧城市路面垃圾识别系统产品介绍方案

方案介绍 智慧城市中的路面垃圾识别算法通常基于深度学习框架&#xff0c;这些算法因其在速度和精度上的优势而被广泛采用。这些模型能够通过训练识别多种类型的垃圾&#xff0c;包括塑料袋、纸屑、玻璃瓶等。系统通过训练深度学习模型&#xff0c;使其能够识别并定位多种类型…

大模型人工智能课程全栈完整学习路径

嘿&#xff0c;朋友们&#xff0c;今天我们聊点高级的——大模型人工智能课程的全栈学习路径。不过别慌&#xff0c;虽然听起来高大上&#xff0c;但咱们慢慢来。从零开始&#xff0c;一步步带你走进这个神奇的世界。喝杯咖啡&#xff0c;穿上最舒适的拖鞋&#xff0c;准备好踏…

「Mac畅玩鸿蒙与硬件32」UI互动应用篇9 - 番茄钟倒计时应用

本篇将带你实现一个番茄钟倒计时应用&#xff0c;用户可以设置专注时间和休息时间的时长&#xff0c;点击“开始专注”或“开始休息”按钮启动计时&#xff0c;应用会在倒计时结束时进行提醒。番茄钟应用对于管理时间、提升工作效率非常有帮助&#xff0c;并且还会加入猫咪图片…