flink-connector-mysql-cdc:01 mysql-cdc础配置代码演示

flink-connector-mysql-cdc:

  • 01 mysql-cdc基础配置代码演示
  • 02 mysql-cdc高级扩展
  • 03 mysql-cdc常见问题汇总
  • 04 mysql-cdc-kafka生产级代码分享
  • 05 flink-kafka-doris生产级代码分享
  • 06 flink-kafka-hudi生产级代码分享

flink-cdc版本:3.2.0

flink版本:flink-1.18.0

mysql版本:8.0.26

java版本:1.8

maven版本:3.8.4

目录

1. Mysql数据库设置

1.1 开启binlog日志

1.2 创建用户

1.3 准备测试数据 

2. 编写测试代码

2.1 maven 依赖

2.2 测试代码

3. mysql-cdc扩展

3.1 时区设置

3.2 为每个读取器设置不同的 SERVER ID


1. Mysql数据库设置

1.1 开启binlog日志

编辑 MySQL 配置文件

  • 在 Unix/Linux 系统中,通常是 /etc/my.cnf 或 /etc/mysql/my.cnf

  • 在 Windows 上,可能位于 C:\ProgramData\MySQL\MySQL Server X.Y\my.ini

# 在 mysqld 部分下添加以下内容(如果已经存在,请确认其值):
[mysqld]
log-bin=mysql-bin  # 二进制日志文件前缀,MySQL将生成名为 mysql-bin.000001, mysql-bin.000002 等的文件。
binlog-format=row   # 设置二进制日志格式为行级(row),可选值为 STATEMENT、ROW 和 MIXED;这里推荐使用行级。
expire_logs_days=7  # 设置二进制日志的过期时间,单位为天,超过这个天数后的日志将被自动删除,这里以 7 天为例。
max-binlog-size=100M # 设置单个二进制日志文件的最大大小,超出后将自动创建一个新的日志文件(可以根据需要调整)。

1.2 创建用户

以 “flinkcdc”用户为例

# 创建 MySQL 用户:
CREATE USER 'flinkcdc'@'localhost' IDENTIFIED BY '123456';
# 向用户授予所需的权限:
GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flinkcdc' IDENTIFIED BY '123456';
# 授权所有权限
GRANT ALL PRIVILEGES ON *.* TO 'flinkcdc'@'localhost';
GRANT ALL PRIVILEGES ON *.* TO 'flinkcdc'@'%';
# 完成用户的权限:
FLUSH PRIVILEGES;

1.3 准备测试数据 

# 使用flinkcdc用户登录数据库# 创建测试数据库
create database cdc_demo;
# 创建测试表
CREATE TABLE cdc_demo.flink_cdc_test (id INT AUTO_INCREMENT PRIMARY KEY,name VARCHAR(50) NOT NULL,description TEXT,age INT,balance DECIMAL(10, 2),is_active BOOLEAN DEFAULT TRUE,created_at DATETIME DEFAULT CURRENT_TIMESTAMP,birth_date DATE,long_value BIGINT,last_login TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
# 插入测试数据
INSERT INTO cdc_demo.flink_cdc_test (name, description, age, balance, is_active, created_at, birth_date, long_value, last_login) VALUES
('Alice Smith', 'Alice is a software engineer with 5 years of experience.', 30, 2500.50, TRUE, '2023-01-01 10:00:00', '1992-05-15', 12345678901234, '2023-05-20 10:00:00'),
('Bob Johnson', 'Bob enjoys hiking and outdoor activities.', 25, 1500.00, TRUE, '2023-02-15 12:30:00', '1998-08-22', 987654321054321, '2023-05-18 14:00:00'),
('Charlie Brown', 'Charlie is an avid reader and coffee lover.', 35, 3200.75, FALSE, '2023-03-22 14:45:00', '1988-01-11', 135792468012345, '2023-05-19 09:20:00'),
('Daisy Miller', 'Daisy loves painting and traveling.', 28, 1800.25, TRUE, '2023-04-05 09:15:00', '1994-11-03', 24681357901234, '2023-05-21 12:30:00'),
('Ethan White', 'Ethan enjoys playing guitar and writing songs.', 40, 5000.00, TRUE, '2023-05-18 16:20:00', '1983-07-30', 98765432102468, '2023-05-22 15:00:00');

2. 编写测试代码

2.1 maven 依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.toroidal</groupId><artifactId>flink-connector-mysql-cdc-demo</artifactId><name>flink-connector-mysql-cdc-demo</name><version>1.0-SNAPSHOT</version><repositories><repository><id>aliyunmaven</id><name>阿里云公共仓库</name><url>https://maven.aliyun.com/repository/public/</url></repository><repository><id>mirrorId</id><name>Human Readable Name for this Mirror.</name><url>http://my.repository.com/repo/path</url></repository></repositories><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.18.0</flink.version><scala.binary.version>2.12</scala.binary.version><flinkcdc.version>3.2.0</flinkcdc.version><mysql.version>8.0.26</mysql.version><log4j.version>2.17.1</log4j.version><lombok.version>1.18.24</lombok.version><fastjson.version>1.2.83</fastjson.version></properties><dependencies><!-- flink Dependency --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb</artifactId><version>${flink.version}</version></dependency><!-- mysql-cdc --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>${flinkcdc.version}</version></dependency><!-- mysql --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version></dependency><!-- json --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><!-- log --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>${lombok.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version></dependency></dependencies><build><sourceDirectory>src/main/java</sourceDirectory><plugins><!-- 编译插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.9.0</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.18.1</version><configuration><useFile>false</useFile><disableXmlReport>true</disableXmlReport><includes><include>**/*Test.*</include><include>**/*Suite.*</include></includes></configuration></plugin><!-- 打包插件(会包含所有依赖) --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><!--zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF --><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><!-- 设置jar包的入口类(可选) --><mainClass>com.toroidal.mysql.MysqlCdcStreamApp</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build>
</project>

2.2 测试代码

package com.toroidal.mysql;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;/*** @Author Toroidal* @Date 2024/12/04 14:42* @Version 1.0*/
public class MysqlCdcStreamApp {public static void main(String[] args) throws Exception {MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("localhost").port(3306).username("flinkcdc").password("123456")// 表所在的数据库库名,如果需要捕获整个库的表,配置为:".*".;如果需要捕获多个数据库配置为:.databaseList("cdc01", "cdc02").databaseList("cdc_demo")// 设置需要捕获日志的表名,注意需要配置库名,大小敏感.tableList("cdc_demo.flink_cdc_test")// 将 SourceRecord 转换为 JSON 字符串。.deserializer(new JsonDebeziumDeserializationSchema()).build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// enable checkpointenv.enableCheckpointing(3000);env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source").setParallelism(4).print().setParallelism(1);env.execute("Print MySQL Snapshot + Binlog");}
}

运行结果 

3. mysql-cdc扩展

3.1 时区设置

mysql-cdc读取出来的 timestamp 字段时区相差8小时,将时区和MySQL服务器时区设置一致即可:

查询当前数据库时区:

SELECT * FROM mysql.time_zone_name;

设置时区为东八时区

.serverTimeZone("Asia/Shanghai")

        MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("localhost").port(3306).username("flinkcdc").password("123456")// 表所在的数据库库名,如果需要捕获整个库的表,配置为:".*".;如果需要捕获多个数据库配置为:.databaseList("cdc01", "cdc02").databaseList("cdc_demo")// 设置需要捕获日志的表名,注意需要配置库名,大小敏感.tableList("cdc_demo.flink_cdc_test").serverTimeZone("Asia/Shanghai")// 将 SourceRecord 转换为 JSON 字符串。.deserializer(new JsonDebeziumDeserializationSchema()).build();

3.2 为每个读取器设置不同的 SERVER ID

每个用于读取 binlog 的 MySQL 数据库客户端都应该有一个唯一的 ID,称为 server id。MySQL 服务器将使用此 id 来维护网络连接和 binlog 位置。因此,如果不同的作业共享相同的服务器 ID,则可能会导致从错误的 binlog 位置读取。 

.serverId("flink-cdc-01")
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("localhost").port(3306)// 表所在的数据库库名,如果需要捕获整个库的表,配置为:".*".;如果需要捕获多个数据库配置为:.databaseList("cdc01", "cdc02").databaseList("cdc")// 设置需要捕获日志的表名,注意需要配置库名,大小敏感.tableList("cdc.flink_cdc_test").username("flinkcdc").serverTimeZone("Asia/Shanghai").serverId("flink-cdc-01").password("123456")// 将 SourceRecord 转换为 JSON 字符串。.deserializer(new JsonDebeziumDeserializationSchema()).build();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/484018.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

c++数据结构算法复习基础--11--高级排序算法-快速排序-归并排序-堆排序

高阶排序 1、快速排序 冒泡排序的升级算法 每次选择一个基准数&#xff0c;把小于基准数的放到基准数的左边&#xff0c;把大于基准数的放到基准数的右边&#xff0c;采用 “ 分治算法 ”处理剩余元素&#xff0c;直到整个序列变为有序序列。 最好和平均的复杂度&#xff1a…

分类算法中的样本不平衡问题及其解决方案

一、样本不平衡问题概述 在机器学习的分类任务中&#xff0c;样本不平衡是指不同类别训练样本数量存在显著差异的现象。这一差异会给模型训练和性能评估带来挑战&#xff0c;尤其在处理少数类样本时&#xff0c;模型可能难以有效学习其特征。 以二分类为例&#xff0c;理想情况…

GPS模块/SATES-ST91Z8LR:电路搭建;直接用电脑的USB转串口进行通讯;模组上报定位数据转换地图识别的坐标手动查询地图位置

从事嵌入式单片机的工作算是符合我个人兴趣爱好的,当面对一个新的芯片我即想把芯片尽快搞懂完成项目赚钱,也想着能够把自己遇到的坑和注意事项记录下来,即方便自己后面查阅也可以分享给大家,这是一种冲动,但是这个或许并不是原厂希望的,尽管这样有可能会牺牲一些时间也有哪天原…

AVL树:自平衡的二叉搜索树

AVL树是一种自平衡的二叉搜索树&#xff08;BST&#xff09;。它最早由G.M. Adelson-Velsky和E.M. Landis在1962年提出。AVL树的特点是任何节点的两个子树的高度最多相差1&#xff0c;这确保了树的平衡性&#xff0c;从而保证了树的操作&#xff08;如查找、插入和删除&#xf…

AMR移动机器人赋能制造业仓储自动化升级

在当今制造业的激烈竞争中&#xff0c;智能化、数字化已成为企业转型升级的关键路径。一家制造业巨头&#xff0c;凭借其庞大的生产体系和多个仓库资源&#xff0c;正以前所未有的决心和行动力&#xff0c;在制造业智能化浪潮中勇立潮头&#xff0c;开启了降本增效的新篇章。这…

IIC相关介绍及oled实验(二)

//模块&#xff1a;OLED显示屏 1. 0.96寸OLED屏幕介绍 0.96 寸 4P OLED 屏幕模块是一种显示屏模块&#xff0c;它包括一个 0.96 英寸的 OLED 显示屏和四个引脚。这种 OLED 屏幕模块通常用于嵌入式系统和小型电子设备中&#xff0c;可以显示文本、图像和其他类型的信息。由于其…

Python 数据分析用库 获取数据(二)

Beautiful Soup Python的Beautiful Soup&#xff08;常被称为“美丽汤”&#xff09;是一个用于解析HTML和XML文档的第三方库&#xff0c;它在网页爬虫和数据提取领域具有广泛的应用。 作用 HTML/XML解析&#xff1a; Beautiful Soup能够解析HTML和XML文档&#xff0c;包括不…

数据链路层(四)---PPP协议的工作状态

1 PPP链路的初始化 通过前面几章的学习&#xff0c;我们学了了PPP协议帧的格式以及组成&#xff0c;那么对于使用PPP协议的链路是怎么初始化的呢&#xff1f; 当用户拨号上网接入到ISP后&#xff0c;就建立起了一条个人用户到ISP的物理链路。这时&#xff0c;用户向ISP发送一…

QT 实现QStackedWidget切换页面右移动画

1.实现效果 以下是一个QStackedWidget,放了两个QPushButton在上面,点击切换不同的界面。 为了方便查看动画特效,设置了每个界面的背景图片。 2.实现思路 首先截取当前界面的图片,渲染到一个QLabel上,然后设置QPropertyAnimation动画,动画的作用对象就是这个QLabel,不断…

开源C代码之路:一、Gitee

开源c代码之路&#xff1a;一&#xff0c;Gitee 前言1、开源项目2、从哪里找&#xff1f;3、举个例子4、总结&#xff1a; 本系列回顾清单开源代码示例 前言 从开源开发的角度&#xff0c;由浅入深&#xff0c;一步步初探C语言编程的入门之路。 本篇讲解&#xff1a;Gitee 1…

系统思考—战略决策

最近与一位企业创始人深入交流&#xff0c;聊到了他这几年来的多次尝试与探索。回顾过去&#xff0c;他尝试了很多方向&#xff0c;投入了大量的精力与资源&#xff0c;但今天他却感到&#xff0c;无论哪个业务模块&#xff0c;都没有真正突破&#xff0c;原本的业务也未见明显…

视频监控汇聚平台Liveweb视频安防监控实时视频监控系统操作方案

Liveweb国标GB28181视频平台是一种基于国标GB/T28181协议的安防视频流媒体能力平台。它支持多种视频功能&#xff0c;包括实时监控直播、录像、检索与回看、语音对讲、云存储、告警以及平台级联等功能。该平台部署简单、可扩展性强&#xff0c;支持全终端、全平台分发接入的视频…

HTML 添加 文本水印

body,html {margin: 0;height: 100vh;width: 100vw;} // 自定义文案const setting {text: "水印文案", // 水印内容innerDate: true, // 在水印下方增加日期width: 110, // 水印宽度};// 自定义文字水印const watermark (function () {return {build: function (a…

计算机的错误计算(一百七十四)

摘要 探讨 MATLAB 关于计算机的错误计算&#xff08;一百七十三&#xff09;中多项式的秦九韶&#xff08;或Horner&#xff09;形式的计算误差。 在计算机的错误计算&#xff08;一百七十三&#xff09;中&#xff0c;我们讨论了一个多项式的计算误差。本节探讨其对应秦九韶&…

扩展 SOC 的能力以应对更多威胁

过去 18 个月中&#xff0c;安全运营的一个主要趋势是注重“用更少的资源做更多的事情”。这种趋势一直延续到今天&#xff0c;尽管这一挑战并非安全领域独有——许多其他技术子领域也感受到了同样的压力——但安全领域可能是最不能有效吸收这种额外工作量的领域。 Exabeam 和…

堆叠的简析

堆叠 堆叠的概念 堆叠是指将一台以上的交换机组合起来共同工作&#xff0c;以便在有限的空间内提供尽可能多的端口。‌ 堆叠技术可以通过专用连接电缆将多台交换机连接成一个堆叠单元&#xff0c;从而增加端口密度和管理效率。‌12 堆叠与级联有所不同。级联的交换机之间可以…

vulnhub靶场之momentum-2

前言 靶机采用virtual box虚拟机&#xff0c;桥接网卡 攻击采用VMware虚拟机&#xff0c;桥接网卡 靶机&#xff1a;momentum-2 192.168.1.40 攻击&#xff1a;kali 192.168.1.16 主机发现 使用arp-scan -l扫描 信息收集 使用namp扫描 这里的命令对目标进行vulner中的漏…

【Elasticsearch】实现分布式系统日志高效追踪

&#x1f9d1; 博主简介&#xff1a;CSDN博客专家&#xff0c;历代文学网&#xff08;PC端可以访问&#xff1a;https://literature.sinhy.com/#/literature?__c1000&#xff0c;移动端可微信小程序搜索“历代文学”&#xff09;总架构师&#xff0c;15年工作经验&#xff0c;…

【0x3D】HCI_Remote_Host_Supported_Features_Notification事件详解

目录 一、事件概述 二、事件格式及参数说明 2.1. HCI_Remote_Host_Supported_Features_Notification事件格式 2.2. BD_ADDR 2.3. Remote_Host_Supported_Features 三、事件作用 3.1. 设备特性沟通与理解 3.2. 功能协商与性能优化 3.3. 设备管理与配置更新 四、应用场…

【C++】栈和队列的模拟实现(适配器模式)

不论是C语言还是C&#xff0c;我们都用其对应的传统写法对栈和队列进行了模拟实现&#xff0c;现在我们要用新的方法模拟实现栈和队列&#xff0c;这个新方法就是适配器模式。 C语言传统写法&#xff1a; C语言模拟实现栈 C传统写法&#xff1a;C模拟实现栈 1.容器适配器 …