Flink-CDC 全面解析

Flink-CDC 全面解析

一、CDC 概述

(一)什么是 CDC

CDC 即 Change Data Capture(变更数据获取),其核心要义在于严密监测并精准捕获数据库内发生的各种变动情况,像数据的插入、更新以及删除操作,还有数据表相关的变动等,都会被它一一察觉。并且它会严格按照这些变动实际发生的先后顺序,毫无遗漏地完整记录下来,随后将这些变更信息写入到消息中间件里,方便其他服务按需进行订阅与消费。形象地说,它就像是数据库的“贴身管家”,时刻留意着数据库的任何“风吹草动”,一旦有变化,立马就能获取到相应信息。

(二)CDC 的种类

CDC 主要划分为基于查询和基于 Binlog 这两种方式,下面来看看它们之间的差别:

  • 搭建 mysql 集群:在搭建时可以选择使用阿里巴巴的 mycat 来实现分库分表功能,以此更好地管理和扩展数据库架构。
  • Canal:当我们想要知晓数据库中某个表的变动情况时,Canal 就能派上用场了,它的原理是依托 mysql 的 binlog,通过解析 binlog 来获取表的变更信息。例如,要是没有 CDC 的话,若想在大屏幕上实时展示订单的统计数据,那就得利用 Canal 去读取 mysql 里的实时订单数据,然后传递给 kafka,再由 kafka 把相关信息发送给 Flink;而要是有了 CDC,Flink 就能直接检测到 mysql 数据的变化,进而得出各项指标了。值得一提的是,CDC 底层其实内置了一个软件叫 debezium。

(三)Flink-CDC

Flink 社区精心打造了 flink-cdc-connectors 组件,这可是个功能强大的 source 组件,它具备直接从 MySQL、PostgreSQL 等数据库读取全量数据以及增量变更数据的能力,极大地拓展了 Flink 与数据库交互的便捷性。而且 Flink 还有诸如 mysql、kafka、hbase、cdc 等多种连接器,其触发器方面,默认大多是基于时间的,像 eventTime、procssingTime 等,当然也支持自定义触发器,比如在智慧交通项目中就有过相关应用。目前这个组件已经开源,开源地址为 https://github.com/ververica/flink-cdc-connectors ,方便广大开发者使用和贡献代码。

二、Flink CDC 案例实操

(一)DataStream 方式的应用

  1. 导入依赖:详细的依赖导入可参考 https://blog.csdn.net/mynameisgt/article/details/125826905 这个链接内容。要是启动时报错了,也不用慌,解决方案可查看 https://blog.csdn.net/qq_27721169/article/details/132151345 ,一般来说,就是修改 mysql 的驱动包的版本就行。不过要测试相关代码,还得先开启 mysql 的 binlog 日志,具体操作就是开启 MySQL Binlog 并重启 MySQL。
  2. 编写代码:代码编写完成后,接着要进行建库、建表操作,随后开启 mysql 的 binlog,具体可参照相应文档来执行。
  3. 案例测试
    • 打包并上传至 Linux:将写好的代码打包好,然后上传到 Linux 系统中,为后续部署做准备。
    • 开启 MySQL Binlog 并重启 MySQL:再次强调这个操作的重要性,同时要查看 binlog 数据最新的大小,观察其前后变化情况,方便后续验证数据变更捕获是否准确。
    • 创建一个表:创建一个带有随便几个字段的表,用于测试 Flink-CDC 对数据变更的捕获功能。
    • 启动 Flink 集群:让 Flink 集群运行起来,为处理数据变更提供运行环境。
    • 启动 HDFS 集群:启动 HDFS 集群,保障数据存储等相关功能的正常运行。
    • 启动程序:正式启动编写好的 Flink-CDC 程序,开始检测数据变更情况。
    • 在 MySQL 的 cdc_test.z_user_info 表中添加、修改或者删除数据:人为地在指定表中制造数据变更,以此来检验 Flink-CDC 是否能准确捕获到这些变化。
    • 给当前的 Flink 程序创建 Savepoint:执行 bin/flink savepoint JobId hdfs://bigdata01:9820/flink/save 命令,创建 Savepoint,方便后续程序重启等操作时恢复状态。
    • 关闭程序以后从 Savepoint 重启程序:通过 bin/flink run -s hdfs://bigdata01:9820/flink/save/... -c com.bigdata.FlinkCDC flink-1.0-SNAPSHOT-jar-with-dependencies.jar 命令,实现从 Savepoint 重启程序,验证程序的稳定性和数据处理的连贯性。

(二)自定义反序列化器

  1. 代码实现:在 Flink-CDC 中,有像 Canal、Maxwell 等相关总结内容。而且代码版本和 sql 版本存在一定区别:
    • 版本支持:代码版本的 Flink 在 1.12 和 1.13 版本都支持相关操作,然而 sql 版本的 Flink 只有到 1.13 版本才支持。
    • 监听范围:代码版本支持一次监听多个数据库以及多个表,功能更为强大;而 sql 版本则只支持单库单表的监听。
    • 反序列化器:sql 版本中无需进行自定义反序列化器,相对简洁;但代码版本就需要自定义反序列化器了,当然,也可以选择不定义,根据具体业务场景和需求来决定。

(三)FlinkSQL 方式的应用

  1. 代码实现:在代码实现过程中,有一些需要特别留意的“坑”:
    • jar 包版本问题:在 maven 中,各个 jar 包之间的版本有可能出现不兼容等问题,比如可能会出现 java.lang.NoSuchMethodError:scala.Predef$.refArrayOps 这样的错误,需要仔细排查和调整 jar 包版本。
    • 主键问题:在 FlinkSQL 里,如果创建的表没有主键,尤其是在 Flink 1.13 版本之后,会遇到 The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled', default: true (fallback keys: [])' to 'true' 这样的限制,所以创建表的时候一定要记得加上主键。详细的常见 SQL 错误内容可参考 https://help.aliyun.com/zh/flink/support/common-sql-errors?spm=a2c4g.11186623.0.i32#section-9oq-z7x-sq0 。

以下是一段示例代码:

package com.bigdata.cdc;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;/*** @基本功能:* @program:FlinkProject* @author: 闫哥* @create:2024-06-13 11:01:11**/
public class CdcSQLTest {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);// 获取tableEnv对象// 通过env 获取一个table 环境StreamTableEnvironment tenv = StreamTableEnvironment.create(env);//2. 创建表对象//3. 编写sql语句//4. 将Table变为stream流tenv.executeSql("CREATE TABLE user_info2 (\n" +" id INT NOT NULL primary key,\n" +" name STRING,\n" +" age int\n" +") WITH (\n" +" 'connector' = 'mysql-cdc',\n" +" 'hostname' = 'bigdata01',\n" +" 'port' = '3306',\n" +" 'username' = 'root',\n" +" 'password' = '123456',\n" +" 'scan.startup.mode' = 'latest-offset', " +" 'database-name' = 'cdc_test',\n" +" 'table-name' = 'user_info'\n" +")");tenv.executeSql("select * from user_info2").print();Table table = tenv.sqlQuery("select * from user_info2");DataStream<Tuple2<Boolean, Row>> retractStream = tenv.toRetractStream(table, Row.class);retractStream.print();//5. execute-执行env.execute();}
}

在这段代码中,首先是准备 Flink 的运行环境,设置好运行模式以及并行度等基础参数,接着获取 StreamTableEnvironment 对象,用于后续的 SQL 操作。然后创建了名为 user_info2 的表,定义了表结构以及相关的连接配置信息,如连接的数据库、用户名、密码等,通过执行 SQL 查询语句并将结果转换为流的形式进行输出,最后执行整个程序,实现基于 FlinkSQL 方式对 Flink-CDC 的应用实践。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/505877.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

cache原理

理论基础 时间局部性空间局部性 存储结构 存储器 ROMRAM SRAM->CACHEDRAM->MEM CACHE与主存映射 直接映射 假定主存储器32位地址&#xff0c;cache行64B&#xff0c;cache容量512B&#xff0c;则cache有8行 全相联映射 假定主存储器32位地址&#xff0c;cache…

嵌入式入门Day38

C Day1 第一个C程序C中的输入输出输出操作coutcin练习 命名空间使用方法自定义命名空间冲突问题 C对字符串的扩充C风格字符串的使用定义以及初始化C风格字符串与C风格字符串的转换C风格的字符串的关系运算常用的成员变量输入方法 布尔类型C对堆区空间使用的扩充作业 第一个C程序…

流浪猫流浪狗领养PHP网站源码

源码介绍 流浪猫流浪狗领养PHP网站源码&#xff0c;适合做猫狗宠物类的发信息发布。当然其他信息发布也是可以的。 导入数据库&#xff0c;修改数据库配置/application/database.php 设置TP伪静态&#xff0c;设置运行目录&#xff0c; 后台&#xff1a;/abcd.php/dashboard?…

深度学习|表示学习|一个神经元可以干什么|02

如是我闻&#xff1a; 如果我们只有一个神经元&#xff08;即一个单一的线性或非线性函数&#xff09;&#xff0c;仍然可以完成一些简单的任务。以下是一个神经元可以实现的功能和应用&#xff1a; 1. 实现简单的线性分类 输入&#xff1a;一组特征向量 x x x 输出&#xff…

【Vim Masterclass 笔记09】S06L22:Vim 核心操作训练之 —— 文本的搜索、查找与替换操作(第一部分)

文章目录 S06L22 Search, Find, and Replace - Part One1 从光标位置起&#xff0c;正向定位到当前行的首个字符 b2 从光标位置起&#xff0c;反向查找某个字符3 重复上一次字符查找操作4 定位到目标字符的前一个字符5 单字符查找与 Vim 命令的组合6 跨行查找某字符串7 Vim 的增…

使用 SQL 和表格数据进行问答和 RAG(7)—将表格数据(CSV 或 Excel 文件)加载到向量数据库(ChromaDB)中

将表格数据&#xff08;CSV 或 Excel 文件&#xff09;加载到向量数据库&#xff08;ChromaDB&#xff09;中。这里定义的类 PrepareVectorDBFromTabularData&#xff0c;它的主要功能是读取表格数据文件到DataFrame中、生成嵌入向量、并将这些数据存储在向量数据库的集合中&am…

【git】-2 分支管理

目录 一、分支的概念 二、查看、创建、切换分支 1、查看分支-git branch 2、创建分支- git branch 分支名 3、切换分支- git checkout 分支名 三、git指针 -实现分支和版本间的切换 四、普通合并分支 git merge 文件名 五、冲突分支合并 ​​​​​​【git】-初始gi…

搜广推面经五

饿了么推荐算法 一、介绍InfoNCE Loss、InfoNCE温度系数的作用 InfoNCE Loss&#xff08;Information Noise Contrastive Estimation Loss&#xff09;是一种常用于自监督学习和对比学习中的损失函数&#xff0c;特别是在信息论和无监督学习中有广泛应用。 它的核心思想是通过…

如何选择适合的证件照制作软件,让您的照片制作更轻松

在当今数字化的时代&#xff0c;制作证件照不再需要专门前往照相馆。选择一款合适的证件照制作软件&#xff0c;您可以在家中轻松完成标准证件照的拍摄与制作。然而&#xff0c;面对市面上琳琅满目的软件&#xff0c;找到最适合您需求的软件并不简单。本文将为您详细介绍选择证…

【数据库】一、数据库系统概述

文章目录 一、数据库系统概述1 基本概念2 现实世界的信息化过程3 数据库系统内部体系结构4 数据库系统外部体系结构5 数据管理方式 一、数据库系统概述 1 基本概念 数据&#xff1a;描述事物的符号记录 数据库&#xff08;DB&#xff09;&#xff1a;长期存储在计算机内的、…

安卓硬件加速hwui

安卓硬件加速 本文基于安卓11。 从 Android 3.0 (API 级别 11) 开始&#xff0c;Android 2D 渲染管道支持硬件加速&#xff0c;这意味着在 View 的画布上执行的所有绘图操作都使用 GPU。由于启用硬件加速所需的资源增加&#xff0c;你的应用程序将消耗更多内存。 软件绘制&am…

第R4周:LSTM-火灾温度预测

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 文章目录 一、代码流程1、导入包&#xff0c;设置GPU2、导入数据3、数据集可视化4、数据集预处理5、设置X&#xff0c;y6、划分数据集7、构建模型8、定义训练函…

Spring 设计模式:经典设计模式

Spring 设计模式&#xff1a;经典设计模式 引言 Spring 框架广泛使用了经典设计模式。 这些模式在 Spring 内部发挥着重要作用。 通过理解这些设计模式在 Spring 中的应用&#xff0c;开发者可以更深入地掌握 Spring 框架的设计哲学和实现细节。 经典设计模式 控制反转&am…

现代企业架构白皮书(可以在线阅读完整PDF文件)

数据架构元模型综述 数据架构的内容元模型包括“结构”、“端口”两个部分&#xff0c;如下图所示&#xff1a; 结构部分用来对数据模型、数据处理建模&#xff0c;其中包括数据对象、数据组件 端口部分用来对数据模型的边界建模&#xff0c;其中包括数据服务 数据架构元模型…

krpano 实现文字热点中的三角形和竖杆

krpano 实现文字热点中的三角形和竖杆 实现文字热点中的三角形和竖杆 一个后端写前端真的是脑阔疼 一个后端写前端真的是脑阔疼 一个后端写前端真的是脑阔疼 实现文字热点中的三角形和竖杆 上图看效果 v&#xff1a;2549789059

Win10本地部署大语言模型ChatGLM2-6B

鸣谢《ChatGLM2-6B&#xff5c;开源本地化语言模型》作者PhiltreX 作者显卡为英伟达4060 安装程序 打开CMD命令行&#xff0c;在D盘新建目录openai.wiki if not exist D:\openai.wiki mkdir D:\openai.wiki 强制切换工作路径为D盘的openai.wiki文件夹。 cd /d D:\openai.wik…

互联网架构变迁:从 TCP/IP “呼叫” 到 NDN “内容分发” 的逐浪之旅

本文将给出关于互联网架构演进的一个不同视角。回顾一下互联网的核心理论基础产生的背景&#xff1a; 左边是典型的集中控制通信网络&#xff0c;很容易被摧毁&#xff0c;而右边的网络则没有单点问题&#xff0c;换句话说它很难被全部摧毁&#xff0c;与此同时&#xff0c;分…

priority_queue优先队列

目录 1. 最短路径算法&#xff08;Dijkstra算法&#xff09; 应用场景&#xff1a; 优先队列的作用&#xff1a; 2. 最小生成树算法&#xff08;Prim算法&#xff09; 应用场景&#xff1a; 优先队列的作用&#xff1a; 3. 哈夫曼编码&#xff08;Huffman Coding&#x…

vs2022编译webrtc步骤

1、主要步骤说明 概述&#xff1a;基础环境必须有&#xff0c;比如git&#xff0c;Powershell这些&#xff0c;就不写到下面了。 1.1 安装vs2022 1、选择使用C的桌面开发 2、 Windows 10 SDK安装10.0.20348.0 3、勾选MFC及ATL这两项 4、 安装完VS2022后&#xff0c;必须安…

如何评价deepseek-V3 VS OpenAI o1 自然语言处理成Sql的能力

DeepSeek-V3 介绍 在目前大模型主流榜单中&#xff0c;DeepSeek-V3 在开源模型中位列榜首&#xff0c;与世界上最先进的闭源模型不分伯仲。 准备工作&#xff1a; 笔者只演示实例o1 VS DeepSeek-V3两个模型&#xff0c;大家可以自行验证结果或者实验更多场景&#xff0c;同时…