Flink CDC基本概念以及MySQL同步到MySQL

目录

欢迎来到Flink CDC

核心概念

数据管道(Data Pipeline)

数据源(Data Source)

数据接收器(Data Sink)

表ID(Table ID)

转换(Transform)

路由(Route)

连接器(connectors)

管道连接器(pipeline connectors)

支持连接器

开发自己的连接器

Flink源

支持的连接器

 支持的Flink版本

特征

MySQL同步到MySQL

DataStream方式实现

需要的依赖pom.xml

准备工作

代码 

测试

​编辑

 SQL方式实现

需要的依赖pom.xml

代码

测试


        本文基于Flink CDC v2.4.2版本和Flink 1.17.1版本。

欢迎来到Flink CDC

        Flink CDC是一个流数据集成工具,旨在为用户提供更强大的API。它允许用户通过YAML优雅地描述他们的ETL管道逻辑,并帮助用户自动生成自定义Flink算子和提交作业。Flink CDC优先优化任务提交过程,并提供增强的功能,如模式演化(schema evolution)、数据转换(data transformation)、全数据库同步(full database synchronization)和仅一次语义(exactly-once semantic)。

        与Apache Flink深度集成并由其提供支持,Flink CDC提供:

✅端到端数据集成框架
✅基于数据集成API用户可轻松构建作业
✅源端/目标端中多表支持
✅整个数据库的同步
✅模式演化能力


核心概念

数据管道(Data Pipeline

        由于Flink CDC中的事件(events)以管道( pipeline)方式从上游流向下游,因此整个ETL任务被称为数据管道(Data Pipeline)。

数据源(Data Source

        数据源用于访问元数据(metadata)并从外部系统读取变更的数据(the changed data)。一个数据源可以同时从多个表中读取数据。

        注意,这里的数据源并不是指的外部系统这个数据源,而是Flink中自身定义的数据源,Flink用这个数据源来从外部系统读取变更的数据。

数据接收器(Data Sink

        数据接收器用于应用模式更改(schema changes)并将更改数据写入外部系统。一个数据接收器可以同时写多个表。

表ID(Table ID)

        在与外部系统连接时,需要与外部系统的存储对象建立映射关系。需要唯一确定存储对象,这就是Table ID所指。为了与大多数外部系统兼容,表ID由一个3元组表示:(namespace, schemaName, tableName)。连接器应该在表ID和外部系统中的存储对象之间建立映射。下表列出了不同数据系统表ID中的部分:

数据系统表ID组成例子
Oracle/PostgreSQLdatabase, schema, tablemydb.default.orders
MySQL/Doris/StarRocksdatabase, tablemydb.orders
Kafkatopicorders

转换(Transform

        Transform模块帮助用户根据表中的数据列来删除和扩展数据列。此外,它还可以帮助用户在同步过程中过滤一些不必要的数据。

路由(Route

        路由指定匹配源表列表和映射到目标表的规则。最典型的场景是合并子数据库和子表,将多个上游源表路由到同一个目标表。


连接器(connectors)

        这里connector分了两个章节,需要说明connector、souce、sink的区别。source和sink都可以称为connector。或者connector包括source和sink,由于历史原因,先是source,sink,后面使用connector对source和sink做了统一。

管道连接器(pipeline connectors)

        Flink CDC提供了几个源和接收器连接器来与外部系统进行交互。通过将发布的jar添加到Flink CDC环境中,并在YAML管道定义中指定连接器,您可以使用开箱即用的连接器。

支持连接器

连接器支持的连接器类型外部系统
Apache DorisSink
  • Apache Doris: 1.2.x, 2.x.x
KafkaSink
  • Kafka
MySQLSource
  • MySQL: 5.6, 5.7, 8.0.x
  • RDS MySQL: 5.6, 5.7, 8.0.x
  • PolarDB MySQL: 5.6, 5.7, 8.0.x
  • Aurora MySQL: 5.6, 5.7, 8.0.x
  • MariaDB: 10.x
  • PolarDB X: 2.0.1
PaimonSink
  • Paimon: 0.6, 0.7, 0.8
StarRocksSink
  • StarRocks: 2.x, 3.x

开发自己的连接器

        如果提供的连接器不能满足您的要求,您可以开发自己的连接器,以使您的外部系统参与Flink CDC管道。查看Flink CDC api,了解如何开发自己的连接器。

Flink源

        Flink CDC源是Apache Flink的一组源连接器(source connectors),使用变更数据捕获(CDC)从不同的数据库摄取更改。一些CDC源集成了Debezium作为捕获数据变化的引擎。所以它可以充分利用Debezium的能力。了解更多关于什么是Debezium。

debezium

支持的连接器

连接器数据库驱动
mongodb-cdc
  • MongoDB: 3.6, 4.x, 5.0, 6.0, 6.1
MongoDB Driver: 4.9.1
mysql-cdc
  • MySQL: 5.6, 5.7, 8.0.x
  • RDS MySQL: 5.6, 5.7, 8.0.x
  • PolarDB MySQL: 5.6, 5.7, 8.0.x
  • Aurora MySQL: 5.6, 5.7, 8.0.x
  • MariaDB: 10.x
  • PolarDB X: 2.0.1
JDBC Driver: 8.0.28
oceanbase-cdc
  • OceanBase CE: 3.1.x, 4.x
  • OceanBase EE: 2.x, 3.x, 4.x
OceanBase Driver: 2.4.x
oracle-cdc
  • Oracle: 11, 12, 19, 21
Oracle Driver: 19.3.0.0
postgres-cdc
  • PostgreSQL: 9.6, 10, 11, 12, 13, 14
JDBC Driver: 42.5.1
sqlserver-cdc
  • Sqlserver: 2012, 2014, 2016, 2017, 2019
JDBC Driver: 9.4.1.jre8
tidb-cdc
  • TiDB: 5.1.x, 5.2.x, 5.3.x, 5.4.x, 6.0.0
JDBC Driver: 8.0.27
db2-cdc
  • Db2: 11.5
Db2 Driver: 11.5.0.0
vitess-cdc
  • Vitess: 8.0.x, 9.0.x
MySql JDBC Driver: 8.0.26

 支持的Flink版本

Flink CDC 版本Flink版本
1.0.01.11.*
1.1.01.11.*
1.2.01.12.*
1.3.01.12.*
1.4.01.13.*
2.0.*1.13.*
2.1.*1.13.*
2.2.*1.13.*, 1.14.*
2.3.*1.13.*, 1.14.*, 1.15.*, 1.16.*
2.4.*1.13.*, 1.14.*, 1.15.*, 1.16.*, 1.17.*
3.0.*1.14.*, 1.15.*, 1.16.*, 1.17.*, 1.18.*

特征

1、支持读取数据库快照,即使发生故障,也能以仅一次处理方式继续读取binlogs。
2、数据流API的CDC连接器,用户可以在单个作业中消费多个数据库和表上的更改,而无需部署Debezium和Kafka。
3、用于表/SQL API的CDC连接器,用户可以使用SQL DDL创建CDC源来监视单个表上的更改。

        下表显示了连接器(connector)的当前特性:

连接器无锁读并行读仅一次读增量快照读
mongodb-cdc
mysql-cdc
oracle-cdc
postgres-cdc
sqlserver-cdc
oceanbase-cdc
tidb-cdc
db2-cdc
vitess-cdc

MySQL同步到MySQL

DataStream方式实现

需要的依赖pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.leboop.www</groupId><artifactId>flink</artifactId><version>1.0-SNAPSHOT</version><properties><flink.version>1.17.1</flink.version><scala.binary.version>2.12</scala.binary.version><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><dependencies><!-- flink客户端 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><!--  Table API for Java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>${flink.version}</version></dependency><!-- flink cdc for mysql --><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.4.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.18.0</version></dependency><!-- json解析 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.78</version> <!-- 请使用最新的版本号 --></dependency></dependencies>
</project>

如果缺少依赖,可能报错如下:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/connector/base/source/reader/RecordEmitterat com.leboop.cdc.MysqlCDCDemo.main(MysqlCDCDemo.java:17)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.connector.base.source.reader.RecordEmitterat java.net.URLClassLoader.findClass(URLClassLoader.java:382)at java.lang.ClassLoader.loadClass(ClassLoader.java:424)at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)at java.lang.ClassLoader.loadClass(ClassLoader.java:357)... 1 more

添加如下依赖即可:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.18.0</version>
</dependency>

如果报错如下:

Exception in thread "main" java.lang.IllegalStateException: No ExecutorFactory found to execute the application.at org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:88)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getPipelineExecutor(StreamExecutionEnvironment.java:2717)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2194)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2084)at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:68)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2058)at com.leboop.cdc.MysqlCDCDemo.main(MysqlCDCDemo.java:62)

添加如下依赖: 

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version>
</dependency>

准备工作

        本文仅仅为了演示,在windows本地安装了8.0.30版本的MySQL。如图:

准备两个数据库,分别作为本次案例的source和sink,如图: 

 建表语句分别如下:


CREATE TABLE `human` (`id` bigint NOT NULL AUTO_INCREMENT,`name` varchar(100) DEFAULT NULL,`age` int DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;CREATE TABLE `human_sink` (`id` bigint NOT NULL AUTO_INCREMENT,`name` varchar(100) DEFAULT NULL,`age` int DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

 MySQL CDC需要开启biglog日志,执行如下SQL查看biglog日志是否开启

SHOW VARIABLES LIKE 'log_bin';

如图:

 Value为ON,表示开启。

代码 

package com.leboop.cdc;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;/*** Description TODO.* Date 2024/7/28 15:48** @author leb* @version 2.0*/
public class MysqlCDCDemo {public static void main(String[] args) throws Exception {// flink source,source类型为mysqlMySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("localhost").port(80).databaseList("cdc_demo").tableList("cdc_demo.human").username("root").password("root").deserializer(new JsonDebeziumDeserializationSchema()).serverId("1").build();// 初始化环境.final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// enable checkpointenv.enableCheckpointing(3000);DataStreamSource<String> stringDataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")// set 1 parallel source tasks.setParallelism(1);// 将数据打印到客户端.stringDataStreamSource.print().setParallelism(1); // use parallelism 1 for sink// 数据同步到mysqlstringDataStreamSource.addSink(new RichSinkFunction<String>() {private Connection connection = null;private PreparedStatement preparedStatement = null;@Overridepublic void open(Configuration parameters) throws Exception {if (connection == null) {Class.forName("com.mysql.cj.jdbc.Driver");//加载数据库驱动connection = DriverManager.getConnection("jdbc:mysql://localhost:80", "root", "root");//获取连接connection.setAutoCommit(false);//关闭自动提交}}@Overridepublic void invoke(String value, Context context) throws Exception {JSONObject jsonObject = JSON.parseObject(value);String op = jsonObject.getString("op");if ("r".equals(op)) { // 首次全量System.out.println("执行清表操作");connection.prepareStatement("truncate table cdc_sink.human_sink").execute(); // 清空目标表数据JSONObject after = jsonObject.getJSONObject("after");Integer id = after.getInteger("id");String name = after.getString("name");Integer age = after.getInteger("age");preparedStatement = connection.prepareStatement("insert into cdc_sink.human_sink values (?,?,?)");preparedStatement.setInt(1, id);preparedStatement.setString(2, name);preparedStatement.setInt(3, age);preparedStatement.execute();connection.commit();//预处理完成后统一提交}else if("c".equals(op)) { // 新增.JSONObject after = jsonObject.getJSONObject("after");Integer id = after.getInteger("id");String name = after.getString("name");Integer age = after.getInteger("age");preparedStatement = connection.prepareStatement("insert into cdc_sink.human_sink values (?,?,?)");preparedStatement.setInt(1, id);preparedStatement.setString(2, name);preparedStatement.setInt(3, age);preparedStatement.execute();connection.commit();//预处理完成后统一提交}else if ("d".equals(op)) { // 删除JSONObject after = jsonObject.getJSONObject("before");Integer id = after.getInteger("id");preparedStatement = connection.prepareStatement("delete from cdc_sink.human_sink where id = ?");preparedStatement.setInt(1, id);preparedStatement.execute();connection.commit();//预处理完成后统一提交} else if ("u".equals(op)) { // 更新JSONObject after = jsonObject.getJSONObject("after");Integer id = after.getInteger("id");String name = after.getString("name");Integer age = after.getInteger("age");preparedStatement = connection.prepareStatement("update cdc_sink.human_sink set name = ?, age = ? where id = ?");preparedStatement.setString(1, name);preparedStatement.setInt(2, age);preparedStatement.setInt(3, id);preparedStatement.execute();connection.commit();//预处理完成后统一提交} else {System.out.println("不支持的操作op=" + op);}}@Overridepublic void close() throws Exception {System.out.println("执行close方法");if (preparedStatement != null) {preparedStatement.close();}if (connection != null) {connection.close();}}});env.execute("Print MySQL Snapshot + Binlog");}
}

(1)Flink源

如下代码连接了本地MySQL数据库cdc_demo。

        MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("localhost").port(80).databaseList("cdc_demo").tableList("cdc_demo.human").username("root").password("root").deserializer(new JsonDebeziumDeserializationSchema()).serverId("1").build();

new JsonDebeziumDeserializationSchema()将读取的MySQL binlog数据反序列为JSON字符串数据,后面通过控制台输出可以看到。

(2)server id

        每个用于读取binlog的MySQL数据库客户端都应该有一个唯一的id,称为服务器id。MySQL服务器将使用此id来维护网络连接和binlog位置。因此,如果不同的作业共享相同的服务器id,可能会导致从错误的binlog位置读取。因此,建议为每个阅读器设置不同的服务器id,例如,假设源并行度为4,那么我们可以使用'5401-5404',为4个源阅读器中的每一个分配唯一的服务器id。

(3)从MySQL源读取数据

        DataStreamSource<String> stringDataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")// set 1 parallel source tasks.setParallelism(1);

代码从MySQL源读取了数据,并设置读取并行度为1,如果这里并行度为4,则前面需要4个server id,例如"1-4"。

(3)将读取的MySQL数据打印到控制台

        // 将数据打印到客户端.stringDataStreamSource.print().setParallelism(1); // use parallelism 1 for sink

这里仅仅为了查看Binglog日志读取后,转换成Json字符串是什么样的。下面展示了三条该字符串:

{"before":null,"after":{"id":7,"name":"lisi","age":23},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1722218198388,"transaction":null}
{"before":{"id":6,"name":"zhangsan","age":12},"after":null,"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1722218564000,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":1,"gtid":null,"file":"LEBOOP-bin.000005","pos":75954,"row":0,"thread":57,"query":null},"op":"d","ts_ms":1722218564587,"transaction":null}
{"before":{"id":7,"name":"lisi","age":23},"after":{"id":7,"name":"lisi","age":10},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1722218597000,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":1,"gtid":null,"file":"LEBOOP-bin.000005","pos":76582,"row":0,"thread":57,"query":null},"op":"u","ts_ms":1722218597551,"transaction":null}

第一条Json数据格式化后如下: 

{"before": null,"after": {"id": 7,"name": "lisi","age": 23},"source": {"version": "1.9.7.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 0,"snapshot": "false","db": "cdc_demo","sequence": null,"table": "human","server_id": 0,"gtid": null,"file": "","pos": 0,"row": 0,"thread": null,"query": null},"op": "r","ts_ms": 1722218198388,"transaction": null
}

其中before表示操作前的数据,after表示操作后的数据。op表示操作类型,分为:

  • "op": "d" 代表删除操作

  • "op": "u" 代表更新操作

  • "op": "c" 代表新增操作

  • "op": "r" 代表全量读取,而不是来自 binlog 的增量读取

例如上面第一条为首次全量同步cdc_demo数据库human表Json格式的binglog数据,因此before为null,after为数据,op为r。类似地,第二条为更新数据;第三条数据为删除一条数据,其op值为d。

(4)sink

这里使用匿名内部类RichSinkFunction实现了MySQL sink。

测试

        先向human表中插入2条数据,SQL如下:

insert into cdc_demo.human(id,name,age) values(1,"zhangsan",12);
insert into cdc_demo.human(id,name,age) values(2,"lisi",23);

然后启动程序,输出日志如下:

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
log4j:WARN No appenders could be found for logger (org.apache.flink.shaded.netty4.io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
SLF4J: Defaulting to no-operation MDCAdapter implementation.
SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further details.
{"before":null,"after":{"id":1,"name":"zhangsan","age":12},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1722219401429,"transaction":null}
{"before":null,"after":{"id":2,"name":"lisi","age":23},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1722219401430,"transaction":null}
七月 29, 2024 10:16:42 上午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
信息: Connected to localhost:80 at LEBOOP-bin.000005/80097 (sid:1, cid:803)
执行清表操作
执行清表操作

查看human_sink表,可以看到human表中的两条数据已经被同步:

接着执行如下更新、删除、新增SQL:

update cdc_demo.human set age = 10 where id = 1;
delete from cdc_demo.human where id = 2;
insert into cdc_demo.human(id,name,age) values(3,"zhangsan",12);

 输出日志如下:

{"before":{"id":1,"name":"zhangsan","age":12},"after":{"id":1,"name":"zhangsan","age":10},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1722219563000,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":1,"gtid":null,"file":"LEBOOP-bin.000005","pos":81312,"row":0,"thread":57,"query":null},"op":"u","ts_ms":1722219563829,"transaction":null}
{"before":{"id":2,"name":"lisi","age":23},"after":null,"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1722219563000,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":1,"gtid":null,"file":"LEBOOP-bin.000005","pos":81647,"row":0,"thread":57,"query":null},"op":"d","ts_ms":1722219563849,"transaction":null}
{"before":null,"after":{"id":3,"name":"zhangsan","age":12},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1722219563000,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":1,"gtid":null,"file":"LEBOOP-bin.000005","pos":81954,"row":0,"thread":57,"query":null},"op":"c","ts_ms":1722219563872,"transaction":null}

如图:

最终看到两张表数据保持一致,如图: 

 SQL方式实现

需要的依赖pom.xml

        在DataStream方式上,还需要添加如下依赖:

        <dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.0.0-1.16</version></dependency>

如果报错如下:

Exception in thread "main" org.apache.flink.table.api.ValidationException: Could not find any factories that implement 'org.apache.flink.table.delegation.ExecutorFactory' in the classpath.at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:534)at org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:277)at org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:95)at com.leboop.cdc.MysqlCDCSqlDemo.main(MysqlCDCSqlDemo.java:30)

添加如下依赖: 

        <dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>

如果报错如下: 

Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.human_sink'.Table options are:'connector'='jdbc'
'driver'='com.mysql.cj.jdbc.Driver'
'password'='******'
'table-name'='human_sink'
'url'='jdbc:mysql://localhost:80/cdc_sink'
'username'='root'at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:270)at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:459)at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:236)at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:194)at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:234)at scala.collection.Iterator.foreach(Iterator.scala:929)at scala.collection.Iterator.foreach$(Iterator.scala:929)at scala.collection.AbstractIterator.foreach(Iterator.scala:1406)at scala.collection.IterableLike.foreach(IterableLike.scala:71)at scala.collection.IterableLike.foreach$(IterableLike.scala:70)at scala.collection.AbstractIterable.foreach(Iterable.scala:54)at scala.collection.TraversableLike.map(TraversableLike.scala:234)at scala.collection.TraversableLike.map$(TraversableLike.scala:227)at scala.collection.AbstractTraversable.map(Traversable.scala:104)at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:194)at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1803)at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:881)at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:989)at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:765)at com.leboop.cdc.MysqlCDCSqlDemo.main(MysqlCDCSqlDemo.java:68)
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='jdbc'at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:736)at org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:710)at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:265)... 19 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.Available factory identifiers are:blackhole
datagen
mysql-cdc
printat org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:546)at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:732)... 21 more

请添加如下依赖: 

        <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.0.0-1.16</version></dependency>

代码

package com.leboop.cdc;import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;/*** Description TODO.* Date 2024/7/28 15:48** @author leb* @version 2.0*/
public class MysqlCDCSqlDemo {public static void main(String[] args) throws Exception {EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();TableEnvironment tableEnv = TableEnvironment.create(settings);tableEnv.getConfig().getConfiguration().setLong("execution.checkpointing.interval", 3000L);// sourceTableResult createSourceTable = tableEnv.executeSql("CREATE TABLE flink_human ( \n" +"id BIGINT ,\n" +"name STRING ,\n" +"age INT ,\n" +"PRIMARY KEY (id) NOT ENFORCED \n" +") WITH (\n" +" 'connector' = 'mysql-cdc',\n" +" 'hostname' = 'localhost',\n" +" 'port' = '80',\n" +" 'username' = 'root',\n" +" 'password' = 'root',\n" +" 'database-name' = 'cdc_demo',\n" +" 'table-name' = 'human') ");// 输出source表createSourceTable.print();System.out.println("创建源表结束");// sinkTableResult createSinkTable = tableEnv.executeSql("CREATE TABLE flink_human_sink (" +"id BIGINT ," +"name STRING ," +"age INT ," +"PRIMARY KEY(id) NOT ENFORCED " +") WITH (" +" 'connector' = 'jdbc'," +" 'url' = 'jdbc:mysql://localhost:80/cdc_sink', " +" 'driver' = 'com.mysql.cj.jdbc.Driver', " +" 'username' = 'root'," +" 'password' = 'root'," +" 'table-name' = 'human_sink' )");createSinkTable.print();System.out.println("创建sink表结束");// 插入tableEnv.executeSql("insert into flink_human_sink select id,name,age from flink_human");System.out.println("插入sink表结束");}
}

 (1)创建源表

        如下代码创建了Flink中的源表,为什么说是Flink中呢?原因是该代码将mysql中的human表映射为Flink中的flink_human表,后文代码中就可以使用flink_human表了,代码如下:

        // sourceTableResult createSourceTable = tableEnv.executeSql("CREATE TABLE flink_human ( \n" +"id BIGINT ,\n" +"name STRING ,\n" +"age INT ,\n" +"PRIMARY KEY (id) NOT ENFORCED \n" +") WITH (\n" +" 'connector' = 'mysql-cdc',\n" +" 'hostname' = 'localhost',\n" +" 'port' = '80',\n" +" 'username' = 'root',\n" +" 'password' = 'root',\n" +" 'database-name' = 'cdc_demo',\n" +" 'table-name' = 'human') ");

注意这里connector必须是mysql-cdc。

(2)创建目标表

代码如下:

        // sinkTableResult createSinkTable = tableEnv.executeSql("CREATE TABLE flink_human_sink (" +"id BIGINT ," +"name STRING ," +"age INT ," +"PRIMARY KEY(id) NOT ENFORCED " +") WITH (" +" 'connector' = 'jdbc'," +" 'url' = 'jdbc:mysql://localhost:80/cdc_sink', " +" 'driver' = 'com.mysql.cj.jdbc.Driver', " +" 'username' = 'root'," +" 'password' = 'root'," +" 'table-name' = 'human_sink' )");

这里connector的值必须是jdbc,即通过jdbc连接器实现。

(3)同步数据

        通过如下SQL即可以实现数据同步:

        tableEnv.executeSql("insert into flink_human_sink select id,name,age from flink_human");

测试

        与DataStream测试过程相同。


        值得注意的是:对MySQL的insert、update、delete操作可以完成同步,但对有些操作并不能完成同步,例如truncate操作。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/388495.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

视频编辑SDK提供配套DEMO源码,提高开发效率

面对日益增长的视频制作需求&#xff0c;如何快速、高效且低成本地生产出专业级视频&#xff0c;成为众多企业面临的共同挑战。美摄科技&#xff0c;作为视频编辑技术的领航者&#xff0c;携其强大的视频编辑SDK及配套DEMO源码&#xff0c;为企业视频创作带来了革命性的解决方案…

手摸手教你撕碎西门子S7通讯协议02--socket连接

1、S7协议通讯流程回顾 1&#xff09;建立Socket连接&#xff1a;进行TCP三次握手 这里是指要建立socket的tcp连接&#xff0c;是tcp连接而不是udp连接&#xff0c;tcp连接是可靠连接&#xff0c;tcp连接就是要有稳定的IP地址&#xff0c;它是通过字节方式进行通讯&#xff…

无心剑七绝《潘展乐神》

七绝潘展乐神 潘江陆海忘情游 展志凌云筑玉楼 乐创全球新纪录 神姿英发舞金钩 2024年8月1日 平水韵十一尤平韵 潘展乐神&#xff0c;这四个字&#xff0c;如同四座矗立的丰碑&#xff0c;分别代表了潘展乐在游泳领域的卓越成就、豪情壮志、快乐创新和非凡风采。无心剑的这首…

C:图案打印

引言 本篇文章讲了一些常见的图形编程题&#xff0c;并总结了一些规律。 1、打印空心正方形 1.1 代码展示&#xff1a; #include<stdio.h> int main() {int a 0;//边长初始化scanf("%d", &a);//输入边长的值{int i 0;for (i 0; i < a; i)//控制行…

【实战】SpringBoot整合ffmpeg实现动态拉流转推

SpringBoot整合ffmpeg实现动态拉流转推 在最近的开发中&#xff0c;遇到一个 rtsp 协议的视频流&#xff0c;前端vue并不能直接播放&#xff0c;因此需要对流进行处理。在网上查阅后&#xff0c;ffmpeg和webrtc是最多的解决方案&#xff0c;但是使用webrtc的时候没成功&#x…

layui table 重新设置表格的高度

在layui的table模块中&#xff0c;如果使用table.render({})渲染了一个表格实例时&#xff0c;确定了height配置&#xff0c;后续用table.resize(id)方法重置表格尺寸时&#xff0c;表格的高度是不会变化的&#xff08;如果我的理解没有错的话&#xff09;。 有时我们希望根据…

k8s核心知识总结

写在前面 时间一下子到了7月份尾&#xff1b;整个7月份都乱糟糟的&#xff0c;不管怎么样&#xff0c;日子还是得过啊&#xff0c; 1、7月份核心了解个关于k8s&#xff0c;iceberg等相关技术&#xff0c;了解了相关的基础逻辑&#xff0c;虽然和数开主线有点偏&#xff0c;但是…

传统自然语言处理(NLP)与大规模语言模型(LLM)详解

自然语言处理&#xff08;NLP&#xff09;和大规模语言模型&#xff08;LLM&#xff09;是理解和生成人类语言的两种主要方法。本文将介绍传统NLP和LLM的介绍、运行步骤以及它们之间的比较&#xff0c;帮助新手了解这两个领域的基础知识。 传统自然语言处理&#xff08;NLP&…

OpenEuler安装部署教程

目录 OpenEuler安装部署教程 MobaXterm一款全能的远程工具 yum安装软件 vim编辑器&#xff08;了解&#xff09; 防火墙 常用命令 网络工具netstat & telnet 进程管理工具top ps 磁盘free、fdisk 用户、组&#xff08;了解&#xff09; 权限&#xff08;了解&am…

君正T41开发板环境搭建_串口登陆_配置IP_telnet登陆_mount挂载_安装交叉编译工具链

目录 1 开发板外观 2 串口连接 3芯片内存情况 4 配置IP地址 5 telnet登陆 6 mount挂载目录 7 安装交叉编译工具链 1 开发板外观 2 串口连接 我直接用MobaXterm连接&#xff0c;虽然我还没有文档&#xff0c;但是我觉得波特率大概率就是115200&#xff0c;试了下确实可以…

webstorm配置项目Typescript编译环境

使用npm命令安装typeScript编译器 npm install typescript -g 安装好&#xff0c;在命令行可以查看编译器的版本 tsc --version 用Webstorm打开一个Typescript的项目。为TypeScript文件更改编译设置&#xff0c;File->Settings->toosl -> File Watchers->TypeScri…

Python爬虫入门01:在Chrome浏览器轻松抓包

文章目录 爬虫基本概念爬虫定义爬虫工作原理爬虫流程爬虫类型爬虫面临的挑战 使用Chrome浏览器抓包查看网页HTML代码查看HTTP请求请求头&#xff08;Request Header&#xff09;服务器响应抓包的意义 爬虫基本概念 爬虫定义 爬虫&#xff08;Web Crawler 或 Spider&#xff0…

Vulnhub靶机-Jangow 1.0.1

Vulnhub靶机-Jangow 1.0.1 修改为NAT模式 ?buscarecho <?php eval($_POST[cmd])?> >shell.php后面试了试很多网上的方法反弹shell但都不行

只用一个 HTML 元素可以写出多少形状?——平行四边形篇

您有没有想过一个问题&#xff0c;如果我们只用一个 div 元素&#xff0c;一共可以写出多少种形状呢&#xff1f; 暂停一下&#xff0c;思考三秒钟&#xff0c;默默记下自己的答案&#xff0c;看看自己想到的答案对不对。然后&#xff0c;我们就来一起盘点一下吧…… 今天的主…

java开发环境搭建基础之3----开发工具eclipse中Maven配置

一.背景 公司安排了带徒弟任务&#xff0c;写点基础的环境搭建这些吧。搭建基础开发环境&#xff0c;主要是jdk、eclipse、git、maven、mysql。后续再考虑编写jenkins、nexus、docker、1panel等CI/CD环境搭建。本次主要内容是eclipse中maven环境的配置。我的开发环境&#xff0…

React 学习——路由跳转(Link、useNavigate)、跳转时传递参数(问号传递、path中冒号拼接)

需要四个页面&#xff1a;项目入口index.js文件&#xff0c;router配置路由跳转文件&#xff0c;article组件页面&#xff0c;login组件页面 1、项目入口index.js文件 注意&#xff1a;要安装这个依赖 react-router-dom import React from react import { createRoot } fro…

TZDYM001矩阵系统源码 矩阵营销系统多平台多账号一站式管理

外面稀有的TZDYM001矩阵系统源码&#xff0c;矩阵营销系统多平台多账号一站式管理&#xff0c;一键发布作品。智能标题&#xff0c;关键词优化&#xff0c;排名查询&#xff0c;混剪生成原创视频&#xff0c;账号分组&#xff0c;意向客户自动采集&#xff0c;智能回复&#xf…

vue3使用递归组件渲染层级结构

先看看是不是你想要的&#xff1a; 当有层级去渲染的时候&#xff0c;嵌套的层级不明确&#xff0c;这时只能通过递归组件去渲染。 数据如下&#xff1a; 通过判断subCatalog这个字段的长度是否大于0来确定是否有下级。 上代码&#xff1a;(代码是使用uniapp开发的&#xff0…

简单洗牌算法

&#x1f389;欢迎大家收看&#xff0c;请多多支持&#x1f339; &#x1f970;关注小哇&#xff0c;和我一起成长&#x1f680;个人主页&#x1f680; ⭐目前主更 专栏Java ⭐数据结构 ⭐已更专栏有C语言、计算机网络⭐ 在学习了ArrayList之后&#xff0c;我们可以通过写一个洗…