FlinkCDC 实现 MySQL 数据变更实时同步

文章目录
  • 1、基本介绍
  • 2、代码实战
    • 2.1、数据源准备
    • 2.2、代码实战
    • 2.3、数据格式

1、基本介绍

Flink CDC 是 Apache Flink 提供的一个功能强大的组件,用于实时捕获和处理数据库中的数据变更。可以实时地从各种数据库(如MySQL、PostgreSQL、Oracle、MongoDB等)中捕获数据变更并将其转换为流式数据,FlinkCDC 同步数据有两种方式:

  1. FlinkSQL
  2. Flink DataStream 和 Table API(本文使用该方式)
    在这里插入图片描述
    对比其他的CDC开源方案,发现FlinkCDC是绝大多数场景最好的选择方式,别在傻傻的只关注Canal了,如下图所示:
    在这里插入图片描述

2、代码实战

2.1、数据源准备

本次我是用MySQL 8.0版本,并且创建好数据库(库名为quick_chat),本次演示表结构如下:

CREATE TABLE `quick_chat_msg` (`id` bigint NOT NULL COMMENT '主键id',`from_id` varchar(20) CHARACTER SET utf8 COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '账户id(发送人)',`to_id` varchar(20) CHARACTER SET utf8 COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '账户id(接收人)',`relation_id` varchar(50) CHARACTER SET utf8 COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '发送关联',`content` varchar(500) DEFAULT NULL COMMENT '消息内容',`msg_type` tinyint(1) DEFAULT NULL COMMENT '消息类型(1:文字,2:语音,3:表情包,4:文件,5:语音通话,6:视频通话)',`extra_info` varchar(500) DEFAULT NULL COMMENT '额外信息',`create_time` datetime DEFAULT NULL COMMENT '创建时间',`deleted` tinyint(1) DEFAULT NULL COMMENT '删除标识',PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;

需要保证MySQL的Binlog格式是ROW,不过MySQL 8.0版本格式默认就是ROW:
在这里插入图片描述
最后,要把数据库时区配置好,否则会出现问题,命令如下:

SET persist time_zone = '+8:00';
SET time_zone = '+8:00';
SHOW VARIABLES LIKE '%time_zone%';

在这里插入图片描述

2.2、代码实战

首先,引入Flink CDC相关依赖,内容如下:

<dependencies><!-- Flink connector连接器基础包 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.14.0</version></dependency><!-- Flink CDC MySQL源 --><dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>2.3.0</version></dependency><!-- Flink DataStream数据流API --><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.2.0</version><scope>provided</scope></dependency><!-- Flink客户端--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.14.0</version></dependency><!--Flink WebUI,端口8081(默认没有开启)--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.12</artifactId><version>1.14.0</version></dependency><!--Flink Table API&SQL程序可以连接到其他外部系统,用于读写批处理表和流式表。--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime_2.12</artifactId><version>1.14.0</version></dependency>
</dependencies>

第二步,开发 Sink 监听类,用于监听 MySQL 数据变化:

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;public class MySinkHandler extends RichSinkFunction<String> {@Overridepublic void invoke(String value, Context context) throws Exception {System.out.println(value);}@Overridepublic void open(Configuration parameters) throws Exception {}@Overridepublic void close() throws Exception {}
}

最后,配置好 Flink CDC 监听进程,随着项目启动运行:

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;@Component
public class MySqlSourceExample {@PostConstructpublic void init() throws Exception {// 配置监听数据源MySqlSource<String> source = MySqlSource.<String>builder().hostname("8.141.28.132").port(3306)// 数据库集合,可以配置多个.databaseList("quick_chat")// 表集合,可以配置多个.tableList("quick_chat.quick_chat_msg").username("root").password("root").deserializer(new JsonDebeziumDeserializationSchema()).includeSchemaChanges(true).build();// 配置 Flink WebUIConfiguration configuration = new Configuration();configuration.setInteger(RestOptions.PORT, 8081);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);// 检查点间隔时间// checkpoint的侧重点是“容错”,即Flink作业意外失败并重启之后,能够直接从早先打下的checkpoint恢复运行,且不影响作业逻辑的准确性。env.enableCheckpointing(5000);DataStreamSink<String> sink = env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL Source").addSink(new MySinkHandler());env.execute();}
}

项目启动完毕后,可以通过8081端口访问Flink UI页面:
在这里插入图片描述

2.3、数据格式

上述操作完毕后,我对表数据进行了新增、修改、删除操作,控制台可以看到MySQL变更监听日志输出信息:

# 新增
{"before": null,"after": {"id": 3,"from_id": "dog","to_id": "cat","relation_id": "dog:cat","content": "你好啊","msg_type": 1,"extra_info": null,"create_time": 1729164075000,"deleted": 0},"source": {"version": "1.6.4.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 1729135279000,"snapshot": "false","db": "quick_chat","sequence": null,"table": "quick_chat_msg","server_id": 1,"gtid": null,"file": "binlog.000002","pos": 2452,"row": 0,"thread": null,"query": null},"op": "c","ts_ms": 1729135278633,"transaction": null
}# 修改
{"before": {"id": 3,"from_id": "dog","to_id": "cat","relation_id": "dog:cat","content": "你好啊","msg_type": 1,"extra_info": null,"create_time": 1729164075000,"deleted": 0},"after": {"id": 3,"from_id": "dog","to_id": "cat","relation_id": "dog:cat","content": "你好啊,小猫咪","msg_type": 1,"extra_info": null,"create_time": 1729164075000,"deleted": 0},"source": {"version": "1.6.4.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 1729135289000,"snapshot": "false","db": "quick_chat","sequence": null,"table": "quick_chat_msg","server_id": 1,"gtid": null,"file": "binlog.000002","pos": 2825,"row": 0,"thread": null,"query": null},"op": "u","ts_ms": 1729135288473,"transaction": null
}# 删除
{"before": {"id": 3,"from_id": "dog","to_id": "cat","relation_id": "dog:cat","content": "你好啊,小猫咪","msg_type": 1,"extra_info": null,"create_time": 1729164075000,"deleted": 0},"after": null,"source": {"version": "1.6.4.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 1729135301000,"snapshot": "false","db": "quick_chat","sequence": null,"table": "quick_chat_msg","server_id": 1,"gtid": null,"file": "binlog.000002","pos": 3247,"row": 0,"thread": null,"query": null},"op": "d","ts_ms": 1729135300692,"transaction": null
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/16733.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

金融风控项目-1

文章目录 一. 案例背景介绍二. 代码实现1. 加载数据2. 数据处理3. 查询 三. 业务解读 一. 案例背景介绍 通过对业务数据分析了解信贷业务状况 数据集说明 从开源数据改造而来&#xff0c;基本反映真实业务数据销售&#xff0c;客服可以忽略账单周期&#xff0c;放款日期账单金…

CANMV K230入手体验(1)u盘安装镜像

这是安装镜像后的磁盘管理。 使用镜像文件名为&#xff1a; CanMV-K230_01Studio_micropython_v1.2.2-0-g4b8cae1_nncase_v2.9.0.img。 安装结束。 套件的sd卡损坏&#xff0c;已申请更换。 小伙伴们注意sd卡的问题&#xff0c;一个上午过去了。C... 下图是资源管理器的截…

策略模式-小结

总结一下看到的策略模式&#xff1a; A:一个含有一个方法的接口 B:具体的实行方式行为1,2,3&#xff0c;实现上面的接口。 C:一个环境类&#xff08;或者上下文类&#xff09;&#xff0c;形式可以是&#xff1a;工厂模式&#xff0c;构造器注入模式&#xff0c;枚举模式。 …

springCloud-2021.0.9 之 GateWay 示例

文章目录 前言springCloud-2021.0.9 之 GateWay 示例1. GateWay 官网2. GateWay 三个关键名称3. GateWay 工作原理的高级概述4. 示例4.1. POM4.2. 启动类4.3. 过滤器4.4. 配置 5. 启动/测试 前言 如果您觉得有用的话&#xff0c;记得给博主点个赞&#xff0c;评论&#xff0c;收…

[FastAdmin] 上传图片并加水印,压缩图片

1.app\common\library\Upload.php 文件 upload方法 /*** 普通上传* return \app\common\model\attachment|\think\Model* throws UploadException*/public function upload($savekey null){if (empty($this->file)) {throw new UploadException(__(No file upload or serv…

windows系统远程桌面连接ubuntu18.04

记录一下自己在配置过程中遇到的问题&#xff0c;记录遇到的两大坑&#xff1a; windows系统通过xrdp远程桌面连接ubuntu18.04的蓝屏问题。参考以下第一章解决。 同一局域网内网段不同的连接问题。参考以下第三章解决&#xff0c;前提是SSH可连。 1. 在ubuntu上安装xrdp 参考&…

逻辑回归不能解决非线性问题,而svm可以解决

逻辑回归和支持向量机&#xff08;SVM&#xff09;是两种常用的分类算法&#xff0c;它们在处理数据时有一些不同的特点&#xff0c;特别是在面对非线性问题时。 1. 逻辑回归 逻辑回归本质上是一个线性分类模型。它的目的是寻找一个最适合数据的直线&#xff08;或超平面&…

23页PDF | 国标《GB/T 44109-2024 信息技术 大数据 数据治理实施指南 》发布

一、前言 《信息技术 大数据 数据治理实施指南》是中国国家标准化管理委员会发布的关于大数据环境下数据治理实施的指导性文件&#xff0c;旨在为组织开展数据治理工作提供系统性的方法和框架。报告详细阐述了数据治理的实施过程&#xff0c;包括规划、执行、评价和改进四个阶…

ESM3(1)-介绍:用语言模型模拟5亿年的进化历程

超过30亿年的进化在天然蛋白质空间中编码形成了一幅生物学图景。在此&#xff0c;作者证明在进化数据上进行大规模训练的语言模型&#xff0c;能够生成与已知蛋白质差异巨大的功能性蛋白质&#xff0c;并推出了ESM3&#xff0c;这是一款前沿的多模态生成式语言模型&#xff0c;…

在大型语言模型(LLM)框架内Transformer架构与混合专家(MoE)策略的概念整合

文章目录 传统的神经网络框架存在的问题一. Transformer架构综述1.1 transformer的输入1.1.1 词向量1.1.2 位置编码&#xff08;Positional Encoding&#xff09;1.1.3 编码器与解码器结构1.1.4 多头自注意力机制 二.Transformer分步详解2.1 传统词向量存在的问题2.2 详解编解码…

【黑马点评】 使用RabbitMQ实现消息队列——3.批量获取1k个用户token,使用jmeter压力测试

【黑马点评】 使用RabbitMQ实现消息队列——3.批量获取用户token&#xff0c;使用jmeter压力测试 3.1 需求3.2 实现3.2.1 环境配置3.2.2 修改登录接口UserController和实现类3.2.3 测试类 3.3 使用jmeter进行测试3.4 测试结果3.5 将用户登录逻辑修改回去3.6 批量删除生成的用户…

【安全靶场】信息收集靶场

靶场&#xff1a;https://app.hackinghub.io/hubs/prison-hack 信息收集 子域名收集 1.subfinder files.jabprisons.com staging.jabprisons.com cobrowse.jabprisons.com a1.top.jabprisons.com cf1.jabprisons.com va.cobrowse.jabprisons.com vs.jabprisons.com c…

springboot239-springboot在线医疗问答平台(源码+论文+PPT+部署讲解等)

&#x1f495;&#x1f495;作者&#xff1a; 爱笑学姐 &#x1f495;&#x1f495;个人简介&#xff1a;十年Java&#xff0c;Python美女程序员一枚&#xff0c;精通计算机专业前后端各类框架。 &#x1f495;&#x1f495;各类成品Java毕设 。javaweb&#xff0c;ssm&#xf…

(一)获取数据和读取数据

获取公开数据 下载、爬虫、API 一些公开数据集网站&#xff1a; 爬虫&#xff1a; 发送请求获取网页源代码——解析网页源代码内容&#xff0c;提取数据 通过公开API获取&#xff1a; API定义了两个程序之间的服务合约&#xff0c;即双方是如何使用请求和响应来进行通讯的…

在MacBook Air上本地部署大模型deepseek指南

随着大模型技术的兴起&#xff0c;越来越多的人开始关注如何在本地部署这些强大的AI模型。如果你也想体验大模型的魅力&#xff0c;那么这篇文章将指导你如何在你的MacBook Air上本地部署大模型. 工具准备 为了实现本地部署&#xff0c;你需要以下工具&#xff1a; Ollama&a…

Windows中使用Docker安装Anythingllm,基于deepseek构建自己的本地知识库问答大模型,可局域网内多用户访问、离线运行

文章目录 Windows中使用Docker安装Anythingllm&#xff0c;基于deepseek构建自己的知识库问答大模型1. 安装 Docker Desktop2. 使用Docker拉取Anythingllm镜像2. 设置 STORAGE_LOCATION 路径3. 创建存储目录和 .env 文件.env 文件的作用关键配置项 4. 运行 Docker 命令docker r…

git学习【个人记录b站尚硅谷】

git学习 Git基本命令操作设置用户签名初始化本地库添加文件从工作区到暂存区将文件从暂存区添加到本地库修改文件重新提交 Git分支Github操作创建远程库上传到远程库克隆到本地文件夹拉取远程库最新版本到本地 总结 Git基本命令操作 设置用户签名 git config --global user.n…

【R语言】t检验

一、基本介绍 t检验&#xff08;t-test&#xff09;是用于比较两个样本均值是否存在显著差异的一种统计方法。 t.test()函数的调用格式&#xff1a; t.test(x, yNULL, alternativec("two.sided", "less", "greater"), mu0, pairFALSE, var.eq…

TDengine 产品由哪些组件构成

目 录 背景产品生态taosdtaosctaosAdaptertaosKeepertaosExplorertaosXtaosX Agent应用程序或第三方工具 背景 了解一个产品&#xff0c;最好从了解产品包括哪些内容开始&#xff0c;我这里整理了一份儿 TDegnine 产品包括有哪些组件&#xff0c;每个组件作用是什么的说明&a…

实现限制同一个账号最多只能在3个客户端(有电脑、手机等)登录(附关键源码)

如上图&#xff0c;我的百度网盘已登录设备列表&#xff0c;有一个手机&#xff0c;2个windows客户端。手机设备有型号、最后登录时间、IP等。windows客户端信息有最后登录时间、操作系统类型、IP地址等。这些具体是如何实现的&#xff1f;下面分别给出android APP中采集手机信…