10 Flink CDC

10 Flink CDC

  • 1. CDC是什么
  • 2. CDC 的种类
  • 3. 传统CDC与Flink CDC对比
  • 4. Flink-CDC 案例
  • 5. Flink SQL 方式的案例

1. CDC是什么

CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。
在广义的概念上,只要能捕获数据变更的技术,我们都可以称为 CDC 。通常我们说的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。
CDC 技术应用场景非常广泛:
数据同步,用于备份,容灾;
数据分发,一个数据源分发给多个下游;
数据采集(E),面向数据仓库/数据湖的 ETL 数据集成。

2. CDC 的种类

CDC 主要分为基于查询和基于 Binlog 两种方式,我们主要了解一下这两种之间的区别:
在这里插入图片描述

3. 传统CDC与Flink CDC对比

  1. 传统 CDC ETL 分析
    在这里插入图片描述

  2. 基于 Flink CDC 的 ETL 分析
    在这里插入图片描述

  3. 基于 Flink CDC 的聚合分析
    在这里插入图片描述

  4. 基于 Flink CDC 的数据打宽
    在这里插入图片描述

4. Flink-CDC 案例

Flink 社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。
开源地址:https://github.com/ververica/flink-cdc-connectors。
示例代码:

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;public class FlinkCDC {public static void main(String[] args) throws Exception {//1.创建执行环境StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//2.Flink-CDC 将读取 binlog 的位置信息以状态的方式保存在 CK,如果想要做到断点
续传,需要从 Checkpoint 或者 Savepoint 启动程序//2.1 开启 Checkpoint,每隔 5 秒钟做一次 CKenv.enableCheckpointing(5000L);//2.2 指定 CK 的一致性语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//2.3 设置任务关闭的时候保留最后一次 CK 数据
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckp
ointCleanup.RETAIN_ON_CANCELLATION);//2.4 指定从 CK 自动重启策略env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));//2.5 设置状态后端env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flinkCDC"));//2.6 设置访问 HDFS 的用户名System.setProperty("HADOOP_USER_NAME", "atguigu");//3.创建 Flink-MySQL-CDC 的 Source//initial (default): Performs an initial snapshot on the monitored database tables upon 
first startup, and continue to read the latest binlog.//latest-offset: Never to perform snapshot on the monitored database tables upon first 
startup, just read from the end of the binlog which means only have the changes since the 
connector was started.//timestamp: Never to perform snapshot on the monitored database tables upon first 
startup, and directly read binlog from the specified timestamp. The consumer will traverse the 
binlog from the beginning and ignore change events whose timestamp is smaller than the 
specified timestamp.//specific-offset: Never to perform snapshot on the monitored database tables upon 
first startup, and directly read binlog from the specified offset.DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder().hostname("hadoop01").port(3306).username("root").password("000000").databaseList("gmall-flink").tableList("gmall-flink.z_user_info") //可选配置项,如果不指定该参数,则会
读取上一个配置下的所有表的数据,注意:指定的时候需要使用"db.table"的方式.startupOptions(StartupOptions.initial()).deserializer(new StringDebeziumDeserializationSchema()).build();//4.使用 CDC Source 从 MySQL 读取数据DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);//5.打印数据mysqlDS.print();//6.执行任务env.execute();} 
}

5. Flink SQL 方式的案例

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkSQL_CDC {public static void main(String[] args) throws Exception {//1.创建执行环境StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//2.创建 Flink-MySQL-CDC 的 SourcetableEnv.executeSql("CREATE TABLE user_info (" +" id INT," +" name STRING," +" phone_num STRING" +") WITH (" +" 'connector' = 'mysql-cdc'," +" 'hostname' = 'hadoop01'," +" 'port' = '3306'," +" 'username' = 'root'," +" 'password' = '000000'," +" 'database-name' = 'gmall-flink'," +" 'table-name' = 'z_user_info'" +")");tableEnv.executeSql("select * from user_info").print();env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/11092.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Redis】set 和 zset 类型的介绍和常用命令

1. set 1.1 介绍 set 类型和 list 不同的是&#xff0c;存储的元素是无序的&#xff0c;并且元素不允许重复&#xff0c;Redis 除了支持集合内的增删查改操作&#xff0c;还支持多个集合取交集&#xff0c;并集&#xff0c;差集 1.2 常用命令 命令 介绍 时间复杂度 sadd …

happytime

happytime 一、查壳 无壳&#xff0c;64位 二、IDA分析 1.main 2.cry函数 总体&#xff1a;是魔改的XXTEA加密 在main中可以看到被加密且分段的flag在最后的循环中与V6进行比较&#xff0c;刚好和上面v6数组相同。 所以毫无疑问密文是v6. 而与flag一起进入加密函数的v5就…

【etcd】二进制安装etcd

由于生产服务器不能使用yum 安装 etcd ,或者 安装的etcd 版本比较老&#xff0c;这里介绍一个使用二进制安装的方式。 根据安装文档编写一个下载脚本即可 &#xff1a; 指定 etcd 的版本 提供了两个下载地址 一个 Google 一个 Github&#xff0c; 不过都需要外网 注释掉删除保…

MediaPipe与YOLO已训练模型实现可视化人脸和手势关键点检测

项目首页 - ZiTai_YOLOV11:基于前沿的 MediaPipe 技术与先进的 YOLOv11 预测试模型&#xff0c;精心打造一款强大的实时检测应用。该应用无缝连接摄像头&#xff0c;精准捕捉画面&#xff0c;能即时实现人脸检测、手势识别以及骨骼关键点检测&#xff0c;将检测结果实时、直观地…

学术总结Ai Agent中firecrawl(大模型爬虫平台)的超简单的docker安装方式教程

之前开源了学术总结ai agent&#xff0c;但是对非计算机专业来说&#xff0c;门槛有点高&#xff0c;再加上docker hub镜像被屏蔽&#xff0c;更是不容易上手啊。也有考虑用dify或者扣子去复刻一个&#xff0c;但是从专业用户的角度出发通过界面来拖拽配置实在是不高效&#xf…

交易股指期货有什么技巧吗?

交易股指期货有啥窍门呢&#xff1f;其实啊&#xff0c;追涨杀跌这招&#xff0c;虽然能挣点小钱&#xff0c;但风险也不小&#xff0c;一不小心就可能亏大了。我说的追涨杀跌&#xff0c;不是那种天天追着价格跑的小打小闹&#xff0c;而是要看大趋势&#xff0c;做宏观操作。…

Java线程认识和Object的一些方法ObjectMonitor

专栏系列文章地址&#xff1a;https://blog.csdn.net/qq_26437925/article/details/145290162 本文目标&#xff1a; 要对Java线程有整体了解&#xff0c;深入认识到里面的一些方法和Object对象方法的区别。认识到Java对象的ObjectMonitor&#xff0c;这有助于后面的Synchron…

linux 函数 sem_init () 信号量、sem_destroy()

&#xff08;1&#xff09; &#xff08;2&#xff09; 代码举例&#xff1a; #include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <semaphore.h> #include <unistd.h>sem_t semaphore;void* thread_function(void* arg) …

ComfyUI中For Loop的使用

研究了半天&#xff0c;终于弄明白了如何使用For Loop。 1、在For中节点&#xff0c;必须有输出连接到For Loop End的initial_value点&#xff0c;才能确保节点执行完毕后才 进入下一轮循环&#xff0c;否则&#xff0c;可能导致节点没执行完&#xff0c;就进入下一个循环了。…

UbuntuWindows双系统安装

做系统盘&#xff1a; Ubuntu20.04双系统安装详解&#xff08;内容详细&#xff0c;一文通关&#xff01;&#xff09;_ubuntu 20.04-CSDN博客 ubuntu系统调整大小&#xff1a; 调整指南&#xff1a; 虚拟机中的Ubuntu扩容及重新分区方法_ubuntu重新分配磁盘空间-CSDN博客 …

ASP.NET Core 启动并提供静态文件

ASP.NET Core 启动并提供静态文件 即是单个可执行文件&#xff0c;它既运行 API 项目&#xff0c;也托管 前端项目&#xff08;通常是前端的发布文件&#xff09;。 这种方式一般是通过将 前端项目 的发布文件&#xff08;例如 HTML、CSS、JavaScript&#xff09;放入 Web AP…

网络原理(3)—— 传输层详解

目录 一. 再谈端口号 二. UDP协议(用户数据报协议) 2.1 UDP协议端格式 2.2 UDP报文长度 2.3 UDP校验和 三. TCP协议(传输控制协议) 3.1 TCP协议段格式 3.2 核心机制 3.2.1 确认应答 —— “感知对方是否收到” 3.2.2 超时重传 3.3.3 连接管理 —— 三次握手与四…

【算法设计与分析】实验7:复杂装载及0/1背包问题的回溯法设计与求解

目录 一、实验目的 二、实验环境 三、实验内容 四、核心代码 五、记录与处理 六、思考与总结 七、完整报告和成果文件提取链接 一、实验目的 针对复杂装载问题、及0/1背包问题开展分析、建模、评价&#xff0c;算法设计与优化&#xff0c;并进行编码实践。 理解复杂装载…

oracle: 多表查询之联合查询[交集intersect, 并集union,差集minus]

把多个查询结果上下合并, 即, 通过操作符将多个 SELECT 语句的结果集合并为一个结果集。虽然联合查询通常用于从多个表中检索数据&#xff0c;但它也可以用于从同一个表中检索不同的数据集。 联合查询: 交集,并集,差集 默认的排序规则通常是基于查询结果集中的列的自然顺序。…

增删改查(CRUD)操作

文章目录 MySQL系列&#xff1a;1.CRUD简介2.Create(创建)2.1单行数据全列插入2.2 单行数据指定插入2.3 多⾏数据指定列插⼊ 3.Retrieve(读取)3.1 Select查询3.1.1 全列查询3.1.2 指定列查询3.1.3 查询字段为表达式&#xff08;都是临时表不会对原有表数据产生影响&#xff09;…

早期车主告诉后来者,很后悔买电车,一辈子都被车企拿捏了

从2015年开始大力发展电车&#xff0c;至今已有快10年了&#xff0c;头几批车主或是已换车&#xff0c;或是准备换车&#xff0c;他们用车这么多年的困扰以及换车的麻烦&#xff0c;却告诉准备买电车的消费者&#xff0c;电车没有媒体宣传的那么好&#xff0c;买了电车基本上一…

架构技能(四):需求分析

需求分析&#xff0c;即分析需求&#xff0c;分析软件用户需要解决的问题。 需求分析的下一环节是软件的整体架构设计&#xff0c;需求是输入&#xff0c;架构是输出&#xff0c;需求决定了架构。 决定架构的是软件的所有需求吗&#xff1f;肯定不是&#xff0c;真正决定架构…

H264原始码流格式分析

1.H264码流结构组成 H.264裸码流&#xff08;Raw Bitstream&#xff09;数据主要由一系列的NALU&#xff08;网络抽象层单元&#xff09;组成。每个NALU包含一个NAL头和一个RBSP&#xff08;原始字节序列载荷&#xff09;。 1.1 H.264码流层次 H.264码流的结构可以分为两个层…

pytorch生成对抗网络

人工智能例子汇总&#xff1a;AI常见的算法和例子-CSDN博客 生成对抗网络&#xff08;GAN&#xff0c;Generative Adversarial Network&#xff09;是一种深度学习模型&#xff0c;由两个神经网络组成&#xff1a;生成器&#xff08;Generator&#xff09;和判别器&#xff0…

AIGC技术中常提到的 “嵌入转换到同一个向量空间中”该如何理解

在AIGC&#xff08;人工智能生成内容&#xff09;技术中&#xff0c;“嵌入转换到同一个向量空间中”是一个核心概念&#xff0c;其主要目的是将不同类型的输入数据&#xff08;如文本、图像、音频等&#xff09;映射到一个统一的连续向量空间中&#xff0c;从而实现数据之间的…