目录
欢迎来到Flink CDC
核心概念
数据管道(Data Pipeline)
数据源(Data Source)
数据接收器(Data Sink)
表ID(Table ID)
转换(Transform)
路由(Route)
连接器(connectors)
管道连接器(pipeline connectors)
支持连接器
开发自己的连接器
Flink源
Flink CDC 源
支持的连接器
支持的Flink版本
特征
MySQL同步到MySQL
DataStream方式实现
需要的依赖pom.xml
准备工作
代码
测试
编辑
SQL方式实现
需要的依赖pom.xml
代码
测试
本文基于Flink CDC v2.4.2版本和Flink 1.17.1版本。
欢迎来到Flink CDC
Flink CDC是一个流数据集成工具,旨在为用户提供更强大的API。它允许用户通过YAML优雅地描述他们的ETL管道逻辑,并帮助用户自动生成自定义Flink算子和提交作业。Flink CDC优先优化任务提交过程,并提供增强的功能,如模式演化(schema evolution)、数据转换(data transformation)、全数据库同步(full database synchronization)和仅一次语义(exactly-once semantic)。
与Apache Flink深度集成并由其提供支持,Flink CDC提供:
✅端到端数据集成框架
✅基于数据集成API用户可轻松构建作业
✅源端/目标端中多表支持
✅整个数据库的同步
✅模式演化能力
核心概念
数据管道(Data Pipeline)
由于Flink CDC中的事件(events)以管道( pipeline)方式从上游流向下游,因此整个ETL任务被称为数据管道(Data Pipeline)。
数据源(Data Source)
数据源用于访问元数据(metadata)并从外部系统读取变更的数据(the changed data)。一个数据源可以同时从多个表中读取数据。
注意,这里的数据源并不是指的外部系统这个数据源,而是Flink中自身定义的数据源,Flink用这个数据源来从外部系统读取变更的数据。
数据接收器(Data Sink)
数据接收器用于应用模式更改(schema changes)并将更改数据写入外部系统。一个数据接收器可以同时写多个表。
表ID(Table ID)
在与外部系统连接时,需要与外部系统的存储对象建立映射关系。需要唯一确定存储对象,这就是Table ID所指。为了与大多数外部系统兼容,表ID由一个3元组表示:(namespace, schemaName, tableName)。连接器应该在表ID和外部系统中的存储对象之间建立映射。下表列出了不同数据系统表ID中的部分:
数据系统 | 表ID组成 | 例子 |
Oracle/PostgreSQL | database, schema, table | mydb.default.orders |
MySQL/Doris/StarRocks | database, table | mydb.orders |
Kafka | topic | orders |
转换(Transform)
Transform模块帮助用户根据表中的数据列来删除和扩展数据列。此外,它还可以帮助用户在同步过程中过滤一些不必要的数据。
路由(Route)
路由指定匹配源表列表和映射到目标表的规则。最典型的场景是合并子数据库和子表,将多个上游源表路由到同一个目标表。
连接器(connectors)
这里connector分了两个章节,需要说明connector、souce、sink的区别。source和sink都可以称为connector。或者connector包括source和sink,由于历史原因,先是source,sink,后面使用connector对source和sink做了统一。
管道连接器(pipeline connectors)
Flink CDC提供了几个源和接收器连接器来与外部系统进行交互。通过将发布的jar添加到Flink CDC环境中,并在YAML管道定义中指定连接器,您可以使用开箱即用的连接器。
支持连接器
连接器 | 支持的连接器类型 | 外部系统 |
---|---|---|
Apache Doris | Sink |
|
Kafka | Sink |
|
MySQL | Source |
|
Paimon | Sink |
|
StarRocks | Sink |
|
开发自己的连接器
如果提供的连接器不能满足您的要求,您可以开发自己的连接器,以使您的外部系统参与Flink CDC管道。查看Flink CDC api,了解如何开发自己的连接器。
Flink源
Flink CDC 源
Flink CDC源是Apache Flink的一组源连接器(source connectors),使用变更数据捕获(CDC)从不同的数据库摄取更改。一些CDC源集成了Debezium作为捕获数据变化的引擎。所以它可以充分利用Debezium的能力。了解更多关于什么是Debezium。
debezium
支持的连接器
连接器 | 数据库 | 驱动 |
---|---|---|
mongodb-cdc |
| MongoDB Driver: 4.9.1 |
mysql-cdc |
| JDBC Driver: 8.0.28 |
oceanbase-cdc |
| OceanBase Driver: 2.4.x |
oracle-cdc |
| Oracle Driver: 19.3.0.0 |
postgres-cdc |
| JDBC Driver: 42.5.1 |
sqlserver-cdc |
| JDBC Driver: 9.4.1.jre8 |
tidb-cdc |
| JDBC Driver: 8.0.27 |
db2-cdc |
| Db2 Driver: 11.5.0.0 |
vitess-cdc |
| MySql JDBC Driver: 8.0.26 |
支持的Flink版本
Flink CDC 版本 | Flink版本 |
---|---|
1.0.0 | 1.11.* |
1.1.0 | 1.11.* |
1.2.0 | 1.12.* |
1.3.0 | 1.12.* |
1.4.0 | 1.13.* |
2.0.* | 1.13.* |
2.1.* | 1.13.* |
2.2.* | 1.13.*, 1.14.* |
2.3.* | 1.13.*, 1.14.*, 1.15.*, 1.16.* |
2.4.* | 1.13.*, 1.14.*, 1.15.*, 1.16.*, 1.17.* |
3.0.* | 1.14.*, 1.15.*, 1.16.*, 1.17.*, 1.18.* |
特征
1、支持读取数据库快照,即使发生故障,也能以仅一次处理方式继续读取binlogs。
2、数据流API的CDC连接器,用户可以在单个作业中消费多个数据库和表上的更改,而无需部署Debezium和Kafka。
3、用于表/SQL API的CDC连接器,用户可以使用SQL DDL创建CDC源来监视单个表上的更改。
下表显示了连接器(connector)的当前特性:
连接器 | 无锁读 | 并行读 | 仅一次读 | 增量快照读 |
---|---|---|---|---|
mongodb-cdc | ✅ | ✅ | ✅ | ✅ |
mysql-cdc | ✅ | ✅ | ✅ | ✅ |
oracle-cdc | ✅ | ✅ | ✅ | ✅ |
postgres-cdc | ✅ | ✅ | ✅ | ✅ |
sqlserver-cdc | ✅ | ✅ | ✅ | ✅ |
oceanbase-cdc | ❌ | ❌ | ❌ | ❌ |
tidb-cdc | ✅ | ❌ | ✅ | ❌ |
db2-cdc | ✅ | ✅ | ✅ | ✅ |
vitess-cdc | ✅ | ❌ | ✅ | ❌ |
MySQL同步到MySQL
DataStream方式实现
需要的依赖pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.leboop.www</groupId><artifactId>flink</artifactId><version>1.0-SNAPSHOT</version><properties><flink.version>1.17.1</flink.version><scala.binary.version>2.12</scala.binary.version><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><dependencies><!-- flink客户端 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><!-- Table API for Java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>${flink.version}</version></dependency><!-- flink cdc for mysql --><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.4.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.18.0</version></dependency><!-- json解析 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.78</version> <!-- 请使用最新的版本号 --></dependency></dependencies>
</project>
如果缺少依赖,可能报错如下:
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/connector/base/source/reader/RecordEmitterat com.leboop.cdc.MysqlCDCDemo.main(MysqlCDCDemo.java:17)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.connector.base.source.reader.RecordEmitterat java.net.URLClassLoader.findClass(URLClassLoader.java:382)at java.lang.ClassLoader.loadClass(ClassLoader.java:424)at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)at java.lang.ClassLoader.loadClass(ClassLoader.java:357)... 1 more
添加如下依赖即可:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.18.0</version>
</dependency>
如果报错如下:
Exception in thread "main" java.lang.IllegalStateException: No ExecutorFactory found to execute the application.at org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:88)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getPipelineExecutor(StreamExecutionEnvironment.java:2717)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2194)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2084)at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:68)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2058)at com.leboop.cdc.MysqlCDCDemo.main(MysqlCDCDemo.java:62)
添加如下依赖:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version>
</dependency>
准备工作
本文仅仅为了演示,在windows本地安装了8.0.30版本的MySQL。如图:
准备两个数据库,分别作为本次案例的source和sink,如图:
建表语句分别如下:
CREATE TABLE `human` (`id` bigint NOT NULL AUTO_INCREMENT,`name` varchar(100) DEFAULT NULL,`age` int DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;CREATE TABLE `human_sink` (`id` bigint NOT NULL AUTO_INCREMENT,`name` varchar(100) DEFAULT NULL,`age` int DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
MySQL CDC需要开启biglog日志,执行如下SQL查看biglog日志是否开启
SHOW VARIABLES LIKE 'log_bin';
如图:
Value为ON,表示开启。
代码
package com.leboop.cdc;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;/*** Description TODO.* Date 2024/7/28 15:48** @author leb* @version 2.0*/
public class MysqlCDCDemo {public static void main(String[] args) throws Exception {// flink source,source类型为mysqlMySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("localhost").port(80).databaseList("cdc_demo").tableList("cdc_demo.human").username("root").password("root").deserializer(new JsonDebeziumDeserializationSchema()).serverId("1").build();// 初始化环境.final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// enable checkpointenv.enableCheckpointing(3000);DataStreamSource<String> stringDataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")// set 1 parallel source tasks.setParallelism(1);// 将数据打印到客户端.stringDataStreamSource.print().setParallelism(1); // use parallelism 1 for sink// 数据同步到mysqlstringDataStreamSource.addSink(new RichSinkFunction<String>() {private Connection connection = null;private PreparedStatement preparedStatement = null;@Overridepublic void open(Configuration parameters) throws Exception {if (connection == null) {Class.forName("com.mysql.cj.jdbc.Driver");//加载数据库驱动connection = DriverManager.getConnection("jdbc:mysql://localhost:80", "root", "root");//获取连接connection.setAutoCommit(false);//关闭自动提交}}@Overridepublic void invoke(String value, Context context) throws Exception {JSONObject jsonObject = JSON.parseObject(value);String op = jsonObject.getString("op");if ("r".equals(op)) { // 首次全量System.out.println("执行清表操作");connection.prepareStatement("truncate table cdc_sink.human_sink").execute(); // 清空目标表数据JSONObject after = jsonObject.getJSONObject("after");Integer id = after.getInteger("id");String name = after.getString("name");Integer age = after.getInteger("age");preparedStatement = connection.prepareStatement("insert into cdc_sink.human_sink values (?,?,?)");preparedStatement.setInt(1, id);preparedStatement.setString(2, name);preparedStatement.setInt(3, age);preparedStatement.execute();connection.commit();//预处理完成后统一提交}else if("c".equals(op)) { // 新增.JSONObject after = jsonObject.getJSONObject("after");Integer id = after.getInteger("id");String name = after.getString("name");Integer age = after.getInteger("age");preparedStatement = connection.prepareStatement("insert into cdc_sink.human_sink values (?,?,?)");preparedStatement.setInt(1, id);preparedStatement.setString(2, name);preparedStatement.setInt(3, age);preparedStatement.execute();connection.commit();//预处理完成后统一提交}else if ("d".equals(op)) { // 删除JSONObject after = jsonObject.getJSONObject("before");Integer id = after.getInteger("id");preparedStatement = connection.prepareStatement("delete from cdc_sink.human_sink where id = ?");preparedStatement.setInt(1, id);preparedStatement.execute();connection.commit();//预处理完成后统一提交} else if ("u".equals(op)) { // 更新JSONObject after = jsonObject.getJSONObject("after");Integer id = after.getInteger("id");String name = after.getString("name");Integer age = after.getInteger("age");preparedStatement = connection.prepareStatement("update cdc_sink.human_sink set name = ?, age = ? where id = ?");preparedStatement.setString(1, name);preparedStatement.setInt(2, age);preparedStatement.setInt(3, id);preparedStatement.execute();connection.commit();//预处理完成后统一提交} else {System.out.println("不支持的操作op=" + op);}}@Overridepublic void close() throws Exception {System.out.println("执行close方法");if (preparedStatement != null) {preparedStatement.close();}if (connection != null) {connection.close();}}});env.execute("Print MySQL Snapshot + Binlog");}
}
(1)Flink源
如下代码连接了本地MySQL数据库cdc_demo。
MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("localhost").port(80).databaseList("cdc_demo").tableList("cdc_demo.human").username("root").password("root").deserializer(new JsonDebeziumDeserializationSchema()).serverId("1").build();
new JsonDebeziumDeserializationSchema()将读取的MySQL binlog数据反序列为JSON字符串数据,后面通过控制台输出可以看到。
(2)server id
每个用于读取binlog的MySQL数据库客户端都应该有一个唯一的id,称为服务器id。MySQL服务器将使用此id来维护网络连接和binlog位置。因此,如果不同的作业共享相同的服务器id,可能会导致从错误的binlog位置读取。因此,建议为每个阅读器设置不同的服务器id,例如,假设源并行度为4,那么我们可以使用'5401-5404',为4个源阅读器中的每一个分配唯一的服务器id。
(3)从MySQL源读取数据
DataStreamSource<String> stringDataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")// set 1 parallel source tasks.setParallelism(1);
代码从MySQL源读取了数据,并设置读取并行度为1,如果这里并行度为4,则前面需要4个server id,例如"1-4"。
(3)将读取的MySQL数据打印到控制台
// 将数据打印到客户端.stringDataStreamSource.print().setParallelism(1); // use parallelism 1 for sink
这里仅仅为了查看Binglog日志读取后,转换成Json字符串是什么样的。下面展示了三条该字符串:
{"before":null,"after":{"id":7,"name":"lisi","age":23},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1722218198388,"transaction":null}
{"before":{"id":6,"name":"zhangsan","age":12},"after":null,"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1722218564000,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":1,"gtid":null,"file":"LEBOOP-bin.000005","pos":75954,"row":0,"thread":57,"query":null},"op":"d","ts_ms":1722218564587,"transaction":null}
{"before":{"id":7,"name":"lisi","age":23},"after":{"id":7,"name":"lisi","age":10},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1722218597000,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":1,"gtid":null,"file":"LEBOOP-bin.000005","pos":76582,"row":0,"thread":57,"query":null},"op":"u","ts_ms":1722218597551,"transaction":null}
第一条Json数据格式化后如下:
{"before": null,"after": {"id": 7,"name": "lisi","age": 23},"source": {"version": "1.9.7.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 0,"snapshot": "false","db": "cdc_demo","sequence": null,"table": "human","server_id": 0,"gtid": null,"file": "","pos": 0,"row": 0,"thread": null,"query": null},"op": "r","ts_ms": 1722218198388,"transaction": null
}
其中before表示操作前的数据,after表示操作后的数据。op表示操作类型,分为:
-
"op": "d"
代表删除操作 -
"op": "u"
代表更新操作 -
"op": "c"
代表新增操作 -
"op": "r"
代表全量读取,而不是来自 binlog 的增量读取
例如上面第一条为首次全量同步cdc_demo数据库human表Json格式的binglog数据,因此before为null,after为数据,op为r。类似地,第二条为更新数据;第三条数据为删除一条数据,其op值为d。
(4)sink
这里使用匿名内部类RichSinkFunction实现了MySQL sink。
测试
先向human表中插入2条数据,SQL如下:
insert into cdc_demo.human(id,name,age) values(1,"zhangsan",12);
insert into cdc_demo.human(id,name,age) values(2,"lisi",23);
然后启动程序,输出日志如下:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
log4j:WARN No appenders could be found for logger (org.apache.flink.shaded.netty4.io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
SLF4J: Defaulting to no-operation MDCAdapter implementation.
SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further details.
{"before":null,"after":{"id":1,"name":"zhangsan","age":12},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1722219401429,"transaction":null}
{"before":null,"after":{"id":2,"name":"lisi","age":23},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1722219401430,"transaction":null}
七月 29, 2024 10:16:42 上午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
信息: Connected to localhost:80 at LEBOOP-bin.000005/80097 (sid:1, cid:803)
执行清表操作
执行清表操作
查看human_sink表,可以看到human表中的两条数据已经被同步:
接着执行如下更新、删除、新增SQL:
update cdc_demo.human set age = 10 where id = 1;
delete from cdc_demo.human where id = 2;
insert into cdc_demo.human(id,name,age) values(3,"zhangsan",12);
输出日志如下:
{"before":{"id":1,"name":"zhangsan","age":12},"after":{"id":1,"name":"zhangsan","age":10},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1722219563000,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":1,"gtid":null,"file":"LEBOOP-bin.000005","pos":81312,"row":0,"thread":57,"query":null},"op":"u","ts_ms":1722219563829,"transaction":null}
{"before":{"id":2,"name":"lisi","age":23},"after":null,"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1722219563000,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":1,"gtid":null,"file":"LEBOOP-bin.000005","pos":81647,"row":0,"thread":57,"query":null},"op":"d","ts_ms":1722219563849,"transaction":null}
{"before":null,"after":{"id":3,"name":"zhangsan","age":12},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1722219563000,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":1,"gtid":null,"file":"LEBOOP-bin.000005","pos":81954,"row":0,"thread":57,"query":null},"op":"c","ts_ms":1722219563872,"transaction":null}
如图:
最终看到两张表数据保持一致,如图:
SQL方式实现
需要的依赖pom.xml
在DataStream方式上,还需要添加如下依赖:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.0.0-1.16</version></dependency>
如果报错如下:
Exception in thread "main" org.apache.flink.table.api.ValidationException: Could not find any factories that implement 'org.apache.flink.table.delegation.ExecutorFactory' in the classpath.at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:534)at org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:277)at org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:95)at com.leboop.cdc.MysqlCDCSqlDemo.main(MysqlCDCSqlDemo.java:30)
添加如下依赖:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>
如果报错如下:
Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.human_sink'.Table options are:'connector'='jdbc'
'driver'='com.mysql.cj.jdbc.Driver'
'password'='******'
'table-name'='human_sink'
'url'='jdbc:mysql://localhost:80/cdc_sink'
'username'='root'at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:270)at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:459)at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:236)at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:194)at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:234)at scala.collection.Iterator.foreach(Iterator.scala:929)at scala.collection.Iterator.foreach$(Iterator.scala:929)at scala.collection.AbstractIterator.foreach(Iterator.scala:1406)at scala.collection.IterableLike.foreach(IterableLike.scala:71)at scala.collection.IterableLike.foreach$(IterableLike.scala:70)at scala.collection.AbstractIterable.foreach(Iterable.scala:54)at scala.collection.TraversableLike.map(TraversableLike.scala:234)at scala.collection.TraversableLike.map$(TraversableLike.scala:227)at scala.collection.AbstractTraversable.map(Traversable.scala:104)at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:194)at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1803)at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:881)at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:989)at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:765)at com.leboop.cdc.MysqlCDCSqlDemo.main(MysqlCDCSqlDemo.java:68)
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='jdbc'at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:736)at org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:710)at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:265)... 19 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.Available factory identifiers are:blackhole
datagen
mysql-cdc
printat org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:546)at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:732)... 21 more
请添加如下依赖:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.0.0-1.16</version></dependency>
代码
package com.leboop.cdc;import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;/*** Description TODO.* Date 2024/7/28 15:48** @author leb* @version 2.0*/
public class MysqlCDCSqlDemo {public static void main(String[] args) throws Exception {EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();TableEnvironment tableEnv = TableEnvironment.create(settings);tableEnv.getConfig().getConfiguration().setLong("execution.checkpointing.interval", 3000L);// sourceTableResult createSourceTable = tableEnv.executeSql("CREATE TABLE flink_human ( \n" +"id BIGINT ,\n" +"name STRING ,\n" +"age INT ,\n" +"PRIMARY KEY (id) NOT ENFORCED \n" +") WITH (\n" +" 'connector' = 'mysql-cdc',\n" +" 'hostname' = 'localhost',\n" +" 'port' = '80',\n" +" 'username' = 'root',\n" +" 'password' = 'root',\n" +" 'database-name' = 'cdc_demo',\n" +" 'table-name' = 'human') ");// 输出source表createSourceTable.print();System.out.println("创建源表结束");// sinkTableResult createSinkTable = tableEnv.executeSql("CREATE TABLE flink_human_sink (" +"id BIGINT ," +"name STRING ," +"age INT ," +"PRIMARY KEY(id) NOT ENFORCED " +") WITH (" +" 'connector' = 'jdbc'," +" 'url' = 'jdbc:mysql://localhost:80/cdc_sink', " +" 'driver' = 'com.mysql.cj.jdbc.Driver', " +" 'username' = 'root'," +" 'password' = 'root'," +" 'table-name' = 'human_sink' )");createSinkTable.print();System.out.println("创建sink表结束");// 插入tableEnv.executeSql("insert into flink_human_sink select id,name,age from flink_human");System.out.println("插入sink表结束");}
}
(1)创建源表
如下代码创建了Flink中的源表,为什么说是Flink中呢?原因是该代码将mysql中的human表映射为Flink中的flink_human表,后文代码中就可以使用flink_human表了,代码如下:
// sourceTableResult createSourceTable = tableEnv.executeSql("CREATE TABLE flink_human ( \n" +"id BIGINT ,\n" +"name STRING ,\n" +"age INT ,\n" +"PRIMARY KEY (id) NOT ENFORCED \n" +") WITH (\n" +" 'connector' = 'mysql-cdc',\n" +" 'hostname' = 'localhost',\n" +" 'port' = '80',\n" +" 'username' = 'root',\n" +" 'password' = 'root',\n" +" 'database-name' = 'cdc_demo',\n" +" 'table-name' = 'human') ");
注意这里connector必须是mysql-cdc。
(2)创建目标表
代码如下:
// sinkTableResult createSinkTable = tableEnv.executeSql("CREATE TABLE flink_human_sink (" +"id BIGINT ," +"name STRING ," +"age INT ," +"PRIMARY KEY(id) NOT ENFORCED " +") WITH (" +" 'connector' = 'jdbc'," +" 'url' = 'jdbc:mysql://localhost:80/cdc_sink', " +" 'driver' = 'com.mysql.cj.jdbc.Driver', " +" 'username' = 'root'," +" 'password' = 'root'," +" 'table-name' = 'human_sink' )");
这里connector的值必须是jdbc,即通过jdbc连接器实现。
(3)同步数据
通过如下SQL即可以实现数据同步:
tableEnv.executeSql("insert into flink_human_sink select id,name,age from flink_human");
测试
与DataStream测试过程相同。
值得注意的是:对MySQL的insert、update、delete操作可以完成同步,但对有些操作并不能完成同步,例如truncate操作。