FlinkCDC 实现 MySQL 数据变更实时同步

文章目录

  • 1、基本介绍
  • 2、代码实战
    • 2.1、数据源准备
    • 2.2、代码实战
    • 2.3、数据格式

1、基本介绍

Flink CDC 是 Apache Flink 提供的一个功能强大的组件,用于实时捕获和处理数据库中的数据变更。可以实时地从各种数据库(如MySQL、PostgreSQL、Oracle、MongoDB等)中捕获数据变更并将其转换为流式数据,FlinkCDC 同步数据有两种方式:

  1. FlinkSQL
  2. Flink DataStream 和 Table API(本文使用该方式)
    在这里插入图片描述
    对比其他的CDC开源方案,发现FlinkCDC是绝大多数场景最好的选择方式,别在傻傻的只关注Canal了,如下图所示:
    在这里插入图片描述

2、代码实战

2.1、数据源准备

本次我是用MySQL 8.0版本,并且创建好数据库(库名为quick_chat),本次演示表结构如下:

CREATE TABLE `quick_chat_msg` (`id` bigint NOT NULL COMMENT '主键id',`from_id` varchar(20) CHARACTER SET utf8 COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '账户id(发送人)',`to_id` varchar(20) CHARACTER SET utf8 COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '账户id(接收人)',`relation_id` varchar(50) CHARACTER SET utf8 COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '发送关联',`content` varchar(500) DEFAULT NULL COMMENT '消息内容',`msg_type` tinyint(1) DEFAULT NULL COMMENT '消息类型(1:文字,2:语音,3:表情包,4:文件,5:语音通话,6:视频通话)',`extra_info` varchar(500) DEFAULT NULL COMMENT '额外信息',`create_time` datetime DEFAULT NULL COMMENT '创建时间',`deleted` tinyint(1) DEFAULT NULL COMMENT '删除标识',PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;

需要保证MySQL的Binlog格式是ROW,不过MySQL 8.0版本格式默认就是ROW:
在这里插入图片描述
最后,要把数据库时区配置好,否则会出现问题,命令如下:

SET persist time_zone = '+8:00';
SET time_zone = '+8:00';
SHOW VARIABLES LIKE '%time_zone%';

在这里插入图片描述

2.2、代码实战

首先,引入Flink CDC相关依赖,内容如下:

<dependencies><!-- Flink connector连接器基础包 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.14.0</version></dependency><!-- Flink CDC MySQL源 --><dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>2.3.0</version></dependency><!-- Flink DataStream数据流API --><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.2.0</version><scope>provided</scope></dependency><!-- Flink客户端--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.14.0</version></dependency><!--Flink WebUI,端口8081(默认没有开启)--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.12</artifactId><version>1.14.0</version></dependency><!--Flink Table API&SQL程序可以连接到其他外部系统,用于读写批处理表和流式表。--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime_2.12</artifactId><version>1.14.0</version></dependency>
</dependencies>

第二步,开发 Sink 监听类,用于监听 MySQL 数据变化:

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;public class MySinkHandler extends RichSinkFunction<String> {@Overridepublic void invoke(String value, Context context) throws Exception {System.out.println(value);}@Overridepublic void open(Configuration parameters) throws Exception {}@Overridepublic void close() throws Exception {}
}

最后,配置好 Flink CDC 监听进程,随着项目启动运行:

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;@Component
public class MySqlSourceExample {@PostConstructpublic void init() throws Exception {// 配置监听数据源MySqlSource<String> source = MySqlSource.<String>builder().hostname("8.141.28.132").port(3306)// 数据库集合,可以配置多个.databaseList("quick_chat")// 表集合,可以配置多个.tableList("quick_chat.quick_chat_msg").username("root").password("root").deserializer(new JsonDebeziumDeserializationSchema()).includeSchemaChanges(true).build();// 配置 Flink WebUIConfiguration configuration = new Configuration();configuration.setInteger(RestOptions.PORT, 8081);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);// 检查点间隔时间// checkpoint的侧重点是“容错”,即Flink作业意外失败并重启之后,能够直接从早先打下的checkpoint恢复运行,且不影响作业逻辑的准确性。env.enableCheckpointing(5000);DataStreamSink<String> sink = env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL Source").addSink(new MySinkHandler());env.execute();}
}

项目启动完毕后,可以通过8081端口访问Flink UI页面:
在这里插入图片描述

2.3、数据格式

上述操作完毕后,我对表数据进行了新增、修改、删除操作,控制台可以看到MySQL变更监听日志输出信息:

# 新增
{"before": null,"after": {"id": 3,"from_id": "dog","to_id": "cat","relation_id": "dog:cat","content": "你好啊","msg_type": 1,"extra_info": null,"create_time": 1729164075000,"deleted": 0},"source": {"version": "1.6.4.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 1729135279000,"snapshot": "false","db": "quick_chat","sequence": null,"table": "quick_chat_msg","server_id": 1,"gtid": null,"file": "binlog.000002","pos": 2452,"row": 0,"thread": null,"query": null},"op": "c","ts_ms": 1729135278633,"transaction": null
}
# 修改
{"before": {"id": 3,"from_id": "dog","to_id": "cat","relation_id": "dog:cat","content": "你好啊","msg_type": 1,"extra_info": null,"create_time": 1729164075000,"deleted": 0},"after": {"id": 3,"from_id": "dog","to_id": "cat","relation_id": "dog:cat","content": "你好啊,小猫咪","msg_type": 1,"extra_info": null,"create_time": 1729164075000,"deleted": 0},"source": {"version": "1.6.4.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 1729135289000,"snapshot": "false","db": "quick_chat","sequence": null,"table": "quick_chat_msg","server_id": 1,"gtid": null,"file": "binlog.000002","pos": 2825,"row": 0,"thread": null,"query": null},"op": "u","ts_ms": 1729135288473,"transaction": null
}
# 删除
{"before": {"id": 3,"from_id": "dog","to_id": "cat","relation_id": "dog:cat","content": "你好啊,小猫咪","msg_type": 1,"extra_info": null,"create_time": 1729164075000,"deleted": 0},"after": null,"source": {"version": "1.6.4.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 1729135301000,"snapshot": "false","db": "quick_chat","sequence": null,"table": "quick_chat_msg","server_id": 1,"gtid": null,"file": "binlog.000002","pos": 3247,"row": 0,"thread": null,"query": null},"op": "d","ts_ms": 1729135300692,"transaction": null
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/450342.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【云岚到家】-day07-5-实战项目-优惠券活动-活动管理

【云岚到家】-day07-5-实战项目-优惠券活动-活动管理 2 优惠券活动管理2.1 需求分析2.1.1 **新增优惠券活动**1&#xff09;界面原型2&#xff09;数据分析3&#xff09;数据校验 2.1.2 **查询优惠券活动**1&#xff09;界面原型 2.2.3 **修改优惠券活动**1) 界面原型2&#xf…

Qt-窗口对话框QMessageBox的使用(51)

目录 前言 描述 使用 自定义按钮 简单方式创建 前言 Qt 提供了多种可复⽤的对话框类型&#xff0c;即 Qt 标准对话框。Qt 标准对话框全部继承于 QDialog类。常⽤标准对话框如下&#xff1a; 描述 消息对话框 QMessageBox 消息对话框是应⽤程序中最常⽤的界⾯元素。消息…

D3.js(五):实现组织架构图

实现组织架构图 效果初始化组织机构容器并实现缩放平移功能效果源码 渲染节点效果源码 渲染连线效果源码 完整源码 效果 初始化组织机构容器并实现缩放平移功能 效果 源码 import {useEffect} from react; import TreeData from ./json/tree-data.json;interface ITreeConfig…

crd介绍

在 Kubernetes 中&#xff0c;CRD&#xff08;Custom Resource Definition&#xff09;和 CR&#xff08;Custom Resource&#xff09;是用于扩展 Kubernetes 功能的机制。它们的关系和使用可以用一个完整案例来说明。 定义 CRD&#xff08;Custom Resource Definition&#x…

中后台 B 端产品设计

中后台 B 端产品设计 一、设计目标二、设计流程三、设计要点四、相关模块 叮嘟&#xff01;这里是小啊呜的学习课程资料整理。好记性不如烂笔头&#xff0c;今天也是努力进步的一天。一起加油进阶吧&#xff01; 中后台B端产品设计&#xff1a; 是指针对企业内部业务人员和管理…

python+appium+雷电模拟器安卓自动化及踩坑

一、环境安装 环境&#xff1a;window11 1.1 安装Android SDK AndroidDevTools - Android开发工具 Android SDK下载 Android Studio下载 Gradle下载 SDK Tools下载 这里面任选一个就可以&#xff0c;最终下载完主要要安装操作安卓的工具adb&#xff0c;安装这个步骤的前提是要…

Linux驱动开发——设备树

文章目录 1 什么是设备树&#xff1f;2 DTS、DTB和DTC3 DTS语法3.1 dtsi头文件3.2 设备节点3.3 标准属性3.4 根节点compatible属性3.5 向节点追加或修改内容 4 创建小型模板设备树5 设备树在系统中的体现6 绑定信息文档7 设备树常用OF操作函数7.1 查找节点的OF函数7.2 查找父/子…

【工具变量】上市公司当年是否发生财务重述指标整理Stata代码(2000-2023年)

计算说明&#xff1a;使用财务重述公告中所更正年报对应的年度作为财务重述的年度&#xff0c;若企业年报中发生财务重述取1&#xff0c;否则取0。本示例的财务重述是指上市公司对以前年度财务报表中的会计差错进行更正和披露&#xff0c;不包括股票拆分、股票红利、终止经营、…

Java 类和对象详解(上 )

个人主页&#xff1a; 鲤鱼王打挺-CSDN博客 Java专栏&#xff1a;https://blog.csdn.net/2401_83779763/category_12801101.html?fromshareblogcolumn&sharetypeblogcolumn&sharerId12801101&sharereferPC&sharesource2401_83779763&sharefromfrom_link &…

SwiftUI 如何取得 @Environment 中 @Observable 对象的绑定?

概述 从 SwiftUI 5.0&#xff08;iOS 17&#xff09;开始&#xff0c;苹果推出了全新的 Observation 框架。它作为下一代内容改变响应者全面参与到数据流和事件流的系统中。 有了 Observation 框架的加持&#xff0c;原本需要多种状态类型的 SwiftUI 视图现在只需要 3 种即可大…

R语言详解predict函数

R语言中predict函数在建立模型&#xff0c;研究关系时常用。但是不同type得到的结果常常被混为一谈&#xff0c;接下来&#xff0c;探讨predict得到的不同结果。 #数据 set.seed(123) n<-1000 age<-rnorm(n,mean50,sd10) gender<-rbinom(n,1,0.5) disease<-rbinom…

CDC变更数据捕捉技术是什么?和ETL有什么不同?

一、什么是CDC技术? 变更数据捕获&#xff08;Change Data Capture&#xff0c;简称 CDC&#xff09;是一种用于识别和跟踪数据源中发生变化的数据的技术。 工作原理&#xff1a; 1.监测数据源&#xff1a;CDC 工具会持续监测指定的数据源&#xff0c;如数据库表、文件系统…

【踩坑随笔】Tensorflow-GPU训练踩坑

一个无语的坑&#xff0c;4060单卡训练&#xff0c;8G内存本来就不够&#xff0c;还没开始训练就已经爆内存了&#xff0c;但是居然正常跑完了训练&#xff0c;然后一推理发现结果就是一坨。。。往回翻日志才发现原来中间有异常。 首先解决第一个问题&#xff1a;Could not lo…

k8s部署Kafka集群超详细讲解

准备部署环境 Kubernetes集群信息 NAMEVERSIONk8s-masterv1.29.2k8s-node01v1.29.2k8s-node02v1.29.2 Kafka&#xff1a;3.7.1版本&#xff0c;apche版本 Zookeeper&#xff1a;3.6.3版本 准备StorageClass # kubectl get sc NAME PROVISIONER RECLA…

音频文件处理 m4a 格式转为 wav 格式 - python 实现

在做音频算法开发时&#xff0c;有时获取的样本为 .m4a格式需要将其转为 .wav,方便之后的数据处理。 安装 python 库&#xff1a; pip install AudioSegment 代码实现具体如下&#xff1a; #-*-coding:utf-8-*- # date:2024-10 # Author: DataBall - XIAN # Function: 音频文件…

LIN从节点:波特率测试

文章目录 1、为什么需要测&#xff1f;2、如何实现测试&#xff1f;3、测试结果4、注意事项 1、为什么需要测&#xff1f; 调节波特率的变化&#xff0c;使主节点同步场位速率变化&#xff0c;验证从节点能否通过同步段进行调节自身位速率。对应ISO17987协议。 2、如何实现测…

锥线性规划【分布鲁棒、两阶段鲁棒方向知识点】

1 锥线性对偶理论 本部分看似和分布鲁棒、两阶段鲁棒优化没什么关系&#xff0c;但值得优先学习&#xff0c;原因将在最后揭晓。 二阶锥 二阶锥&#xff08;second-order cone&#xff0c;又称ice-cream/Lorentz cone&#xff09;的形式为&#xff1a; 非负象限锥 半正定锥 …

jmeter出参保存到文件,保存失败解决

1、添加JSON提取 2、添加beanshell FileWriter writer new FileWriter("C:/Users/xxx/Desktop/signUrl.csv", true); writer.write(vars.get("company_name")"\t"vars.get("signUrl")"\n"); writer.close(); 写文件的两个…

python的多线程和多进程

首先需要明确的是&#xff0c;多进程和其他语言的一样&#xff0c;能够利用多核cpu&#xff0c;但是python由于GIL的存在&#xff0c;多线程在执行的时候&#xff0c;实际上&#xff0c;每一时刻只有一个线程在执行。相当于是单线程。然而多线程在某些情况下&#xff0c;还是能…

Go-知识泛型

Go-知识泛型 1. 认识泛型1.1 不使用泛型1.2 使用泛型 2. 泛型的特点2.1 函数泛化2.2 类型泛化 3. 类型约束3.1 类型集合3.2 interface 类型集合3.2.1 内置interface类型集合3.2.2 自定义interface类型集合3.2.2.1 任意类型元素3.2.2.2 近似类型元素3.2.2.3 联合类型元素 3.2.3 …