数据实时获取方案之Flink CDC

目录

  • 一、方案描述
  • 二、Flink CDC
    • 1.1 什么是CDC
    • 1.2 什么是Flink CDC
    • 1.3 其它CDC
    • 1.4 FlinkCDC所支持的数据库情况
  • 二、使用Pipeline连接器实时获取数据
    • 2.1 环境介绍
    • 2.2 相关版本信息
    • 2.3 详细步骤
      • 2.3.1 实时获取MySQL数据并发送到Kafka
      • 2.3.2 实时获取MySQL数据并同步到Doris数据库

一、方案描述

在这里插入图片描述

由Flink CDC来监测到源数据库数据变更并将其发送到Kafka或同步到目标数据库中,再由后续消费者或其它应用来使用数据。

二、Flink CDC

1.1 什么是CDC

CDC是Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

1.2 什么是Flink CDC

官方文档地址:[项目介绍 | Apache Flink CDC](Introduction | Apache Flink CDC)
官方描述:Flink CDC 是一个基于流的数据集成工具,旨在为用户提供一套功能更加全面的编程接口(API)。 该工具使得用户能够以 YAML 配置文件的形式,优雅地定义其 ETL(Extract, Transform, Load)流程,并协助用户自动化生成定制化的 Flink 算子并且提交 Flink 作业。 Flink CDC 在任务提交过程中进行了优化,并且增加了一些高级特性,如表结构变更自动同步(Schema Evolution)、数据转换(Data Transformation)、整库同步(Full Database Synchronization)以及 精确一次(Exactly-once)语义。

1.3 其它CDC

在这里插入图片描述

1.4 FlinkCDC所支持的数据库情况

在这里插入图片描述

Flink CDC 提供了可用于 YAML 作业的 Pipeline Source 和 Sink 连接器来与外部系统交互。可以直接使用这些连接器,只需将 JAR 文件添加到您的 Flink CDC 环境中,并在 YAML Pipeline 定义中指定所需的连接器。
在这里插入图片描述

Flink社区开发了flink-cdc-connectors组件,这是一个可以直接从MySQL、PostgreSQL等数据库直接读取全量数据和增量变更数据的source组件(其中一些组件是基于Debezium来获取数据变更,它可以充分利用Debezium的能力)。使用这些组件可以通过Flink SQL或代码开发的方式获取目标数据库的全量数据和增量变更数据。
ConnectoryDatabaseDrivermongodb-cdcMongoDB: 3.6, 4.x, 5.0MongoDB Driver: 4.3.4mysql-cdcMySQL: 5.6, 5.7, 8.0.xRDS MySQL: 5.6, 5.7, 8.0.xPolarDB MySQL: 5.6, 5.7, 8.0.xAurora MySQL: 5.6, 5.7, 8.0.xMariaDB: 10.xPolarDB X: 2.0.1JDBC Driver: 8.0.28oceanbase-cdcOceanBase CE: 3.1.x, 4.xOceanBase EE: 2.x, 3.x, 4.xOceanBase Driver: 2.4.xoracle-cdcOracle: 11, 12, 19, 21Oracle Driver: 19.3.0.0postgres-cdcPostgreSQL: 9.6, 10, 11, 12, 13, 14JDBC Driver: 42.5.1sqlserver-cdcSqlserver: 2012, 2014, 2016, 2017, 2019JDBC Driver: 9.4.1.jre8tidb-cdcTiDB: 5.1.x, 5.2.x, 5.3.x, 5.4.x, 6.0.0JDBC Driver: 8.0.27db2-cdcDb2: 11.5Db2 Driver: 11.5.0.0vitess-cdcVitess: 8.0.x, 9.0.xMySql JDBC Driver: 8.0.26

二、使用Pipeline连接器实时获取数据

2.1 环境介绍

我们下面将主要展示通过使用Pipeline连接器来获取实时数据的流程。
准备环境:

  • 单节点的standalone模式的Flink集群
  • Flink CDC
  • 单节点Kafka
  • Doris快速体验版数据库
  • Mysql测试数据库

2.2 相关版本信息

  • Flink 1.18
  • Flink CDC 3.11
  • Kafka 3.6.1
  • Doris doris-2.0.3-rc06

2.3 详细步骤

引入所需依赖包
将 flink-cdc-pipeline-connector-doris-3.1.1.jar flink-cdc-pipeline-connector-kafka-3.1.1.jar flink-cdc-pipeline-connector-mysql-3.1.1.jar放入flink cdc的lib文件夹下

2.3.1 实时获取MySQL数据并发送到Kafka

1.编写同步变更配置文件
将yaml文件放入到flink-cdc下的job文件夹中

# 数据来源
source:type: mysqlhostname: xxx.xxx.xxx.xxxport: 3306username: rootpassword: "password"tables: doris_test.\.*server-id: 5400-5404server-time-zone: UTC+8# 数据去向
sink:type: kafkatopic: test003properties.bootstrap.servers: xxx.xxx.xxx.xxx:9092format: jsonpipeline:name: Sync MySQL Data to KAFKAparallelism: 2

2.启动Flink集群

# 在flink/bin下执行
./start-cluster.sh

3.启动Flink CDC 任务

# 在flink-cdc-3.1.1/bin下运行
./flink-cdc.sh ../job/mysql-to-kafka.yaml

启动成功
在这里插入图片描述

4.启动Kafka消费者

kafka-console-consumer.sh --bootstrap-server xxx.xxx.xxx.xxx:9092 --topic test003

5.在源数据库中修改数据并观察Kafka消费者
当在源数据库testfid表进行数据新增,删除或修改,Kafka消费者即能消费到对应数据
在这里插入图片描述

2.3.2 实时获取MySQL数据并同步到Doris数据库

1.编写同步变更配置文件

# 数据来源
source:type: mysqlhostname: xxx.xxx.xxx.xxxport: 3306username: rootpassword: "password"tables: doris_test.\.*server-id: 5400-5404server-time-zone: UTC+8# 数据去向
sink:type: dorisfenodes: xxx.xxx.xxx.xxx:8030username: rootpassword: "password"table.create.properties.light_schema_change: truetable.create.properties.replication_num: 1pipeline:name: Sync MySQL Database to Dorisparallelism: 2将yaml文件放入到flink-cdc下的job文件夹中

2.在Doris中创建数据库 doris_test

create database doris_test;

3.启动Flink CDC任务

# 在/app/path/flink-cdc-3.1.1/bin下执行
./flink-cdc.sh ../job/mysql-to-doris.yaml

4.进行数据变更并观察结果
先查看任务启动前源库MySQL和目标库Doris的数据情况,源库MySQL中共有两个表且表中已存在一些数据,Doris中没有表
在这里插入图片描述
在这里插入图片描述
启动任务后,两个表及数据都已同步到Doris中,当源表数据变更及表结构变更时,也都会实时同步到Doris中
在这里插入图片描述

5.进行路由变更后再进行测试并观察结果
Flink CDC Pipeline连接器也支持将两个同样表结构表的数据同步到目标数据库的一个表中

source:type: mysqlhostname: xxx.xxx.xxx.xxxport: 3306username: rootpassword: "password"tables: doris_test.\.*server-id: 5400-5404server-time-zone: UTC+8sink:type: dorisfenodes: xxx.xxx.xxx.xxx:8030username: rootpassword: "password"table.create.properties.light_schema_change: truetable.create.properties.replication_num: 1# 将源库中testfid和testfid_copy1表的数据同步到目标库的表route_test中
route:- source-table: doris_test.testfidsink-table: doris_test.route_test- source-table: doris_test.testfid_copy1sink-table: doris_test.route_testpipeline:name: Sync MySQL Database to Dorisparallelism: 2

源数据
在这里插入图片描述
启动任务后Doris中的数据
在这里插入图片描述

源库中两个表的数据被合并同步到目标库的一个表中,但这只适用于相同表结构的合并,如果是不同表结构合并会造成数据错乱。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/380376.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Ubuntu22.04安装CUDA+CUDNN+Conda+PyTorch

步骤: 1、安装显卡驱动; 2、安装CUDA; 3、安装CUDNN; 4、安装Conda; 5、安装Pytorch。 一、系统和硬件信息 1、Ubuntu 22.04 2、显卡:4060Ti 二、安装显卡驱动 (已经安装的可以跳过&a…

秒懂设计模式--学习笔记(11)【结构型-享元模式】

目录 10、享元模式10.1 享元模式10.2 举例10.2.1 马赛克10.2.2 游戏地图(以草原地图作为范例) 10.3 总结 10、享元模式 10.1 享元模式 “享元”则是共享元件的意思享元模式的英文flyweight是轻量级的意思,这就意味着享元模式能使程序变得更…

Selenium+Pytest自动化测试框架实战

前言# selenium自动化 pytest测试框架 本章你需要 一定的python基础——至少明白类与对象,封装继承 一定的selenium基础——本篇不讲selenium,不会的可以自己去看selenium中文翻译网 测试框架简介# 测试框架有什么优点呢: 代码复用率高&a…

IDEA的断点调试(Debug)

《IDEA破解、配置、使用技巧与实战教程》系列文章目录 第一章 IDEA破解与HelloWorld的实战编写 第二章 IDEA的详细设置 第三章 IDEA的工程与模块管理 第四章 IDEA的常见代码模板的使用 第五章 IDEA中常用的快捷键 第六章 IDEA的断点调试(Debug) 第七章 …

# Redis 入门到精通(七)-- redis 删除策略

Redis 入门到精通(七)-- redis 删除策略 一、redis 删除策略–过期数据的概念 1、Redis 中的数据特征 Redis 是一种内存级数据库,所有数据均存放在内存中,内存中的数据可以通过TTL指令获取其状态。 XX :具有时效性…

脑肿瘤有哪些分类? 哪些人会得脑肿瘤?

脑肿瘤,作为一类严重的脑部疾病,其分类复杂多样,主要分为原发性脑肿瘤和脑转移瘤两大类。原发性脑肿瘤起源于颅内组织,常见的有胶质瘤、脑膜瘤、生殖细胞瘤、颅内表皮样囊肿及鞍区肿瘤等。其中,胶质瘤作为最常见的脑神…

【Vue】深入了解 Axios 在 Vue 中的使用:从基本操作到高级用法的全面指南

文章目录 一、Axios 简介与安装1. 什么是 Axios?2. 安装 Axios 二、在 Vue 组件中使用 Axios1. 发送 GET 请求2. 发送 POST 请求 三、Axios 拦截器1. 请求拦截器2. 响应拦截器 四、错误处理五、与 Vuex 结合使用1. 在 Vuex 中定义 actions2. 在组件中调用 Vuex acti…

C语言 ——— 写一个函数,调整 整型数组 中 奇数偶数的顺序

目录 题目要求 代码实现 题目要求 创建一个整型数组 自定义函数实现:调整该数组中数字的顺序,使得数组中所有的奇数位于数组的前半部分,数组中所有的偶数位于数组的后半部分 举例: 输入的整型数组为:[234,24,45,…

价格较低,功能最强?OpenAI 推出 GPT-4o mini,一个更小、更便宜的人工智能模型

OpenAI美东时间周四推出“GPT-4o mini”,入局“小而精”AI模型竞争,称这款新模型是“功能最强、成本偏低的模型”,计划今后整合图像、视频、音频到这个模型中。 OpenAI表示,GPT-4o mini 相较于 OpenAI 目前最先进的 AI 模型更加便…

第2章 矩阵

A 乘以此列向量,1的位置依次往下,所以A的列向量全为0 B C、D 取BE 要统一

ETL数据集成丨主流ETL工具(ETLCloud、DataX、Kettle)数据传输性能大PK

目前市面上的ETL工具众多,为了方便广大企业用户在选择ETL工具时有一个更直观性能方面的参考值,我们选取了目前市面上最流行的三款ETL工具(ETLCloud、DataX、Kettle)来作为本次性能传输的代表,虽然性能测试数据有很多相…

2024年7月17日(nodejs,npm设置国内镜像,vue脚手架,远程管理ssh,踢出用户,scp命令,ssh免密登录)

1、安装nodejs服务 nodejs是一个运行1环境,和javajdk运行环境格式一样 [roota ~]# yum -y install nodejs.x86_64 安装完成之后,使用node -v 查看版本 [roota ~]# node -v v16.20.2 2、简易服务器的环境安装npm 安装包管理器 npm node packae manger [ro…

为什么要从C语言开始编程

在开始前刚好我有一些资料,是我根据网友给的问题精心整理了一份「C语言的资料从专业入门到高级教程」, 点个关注在评论区回复“888”之后私信回复“888”,全部无偿共享给大家!!!很多小伙伴在入门编程时。都…

01 机器学习概述

目录 1. 基本概念 2. 机器学习三要素 3. 参数估计的四个方法 3.1 经验风险最小化 3.2 结构风险最小化 3.3 最大似然估计 3.4 最大后验估计 4. 偏差-方差分解 5. 机器学习算法的类型 6. 数据的特征表示 7. 评价指标 1. 基本概念 机器学习(Machine Le…

【GraphRAG】微软 graphrag 效果实测

GraphRAG 本文将基于以下来源,对Microsoft GraphRAG分析优缺点、以及示例实测分析。 1. Source 代码仓库: Welcome to GraphRAGhttps://microsoft.github.io/graphrag/ 微软文章1(2024.2.13):GraphRAG: Unlocking…

FinClip 率先入驻 AWS Marketplace,加速全球市场布局

近日,凡泰极客旗下的小程序数字管理平台 FinClip 已成功上线亚马逊云科技(AWS)Marketplace。未来,FinClip 将主要服务于海外市场的开放银行、超级钱包、财富管理、社交电商、智慧城市解决方案等领域。 在全球市场的多样性需求推动…

Guns v7.3.0:基于 Vue3、Antdv 和 TypeScript 打造的开箱即用型前端框架

摘要 本文深入探讨了Guns v7.3.0前端项目,该项目是基于Vue3、Antdv和TypeScript的前端框架,以Vben Admin的脚手架为基础进行了改造。文章分析了Guns 7.3.0的技术特点,包括其使用Vue3、vite2和TypeScript等最新前端技术栈,以及提供…

缓存弊处的体验:异常

缓存(cache),它是什么东西,有神马用,在学习内存的时候理解它作为一个存储器,来对接cpu和内存,来调节cpu与内存的速度不匹配的问题。 缓存,一个偶尔可以听到的专业名词,全…

哪种SSL证书可以快速签发保护http安全访问?

用户访问网站,经常会遇到访问http网页时,提示网站不安全或者不是私密连接的提示,因为http是使用明文传输,数据传输中可能被篡改,数据不被保护,通常需要SSL证书来给数据加密。 SSL证书的签发速度&#xff0…

甲骨文闲置ARM实例防回收的方法

前几日挖了个大坑,今天补一下,谈谈甲骨文闲置实例如何防止回收。 回收原则 2022年11月16日 Oracle添加声明: 从 2022 年 11 月 24 日开始,您闲置的 Always Free 计算实例可能会停止。巴拉巴拉,您还可以随时升级您的帐…