Flink CDC 的 debezium-json 格式和 debezium 原生格式是一回事吗?

《大数据平台架构与原型实现:数据中台建设实战》博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧二维码进入京东手机购书页面。

这是一个很容易混淆和误解的问题,值得拿出来讨论对比一下。我们知道 Debezium 是专门用于捕获 CDC 数据的开源框架,它对接了多种数据库,同时也定义了自己的 CDC 数据交换格式,也就是常说的 debezium 格式。而Flink CDC 复用了 Debezium 的部分功能,也就是说:Debezium 是 Flink CDC 的底层采集工具,Flink CDC 的工程依赖会用使用到 Debezium 的 Jar 包,然后 Flink CDC 在 Debezium 基础之上,封装了额外的功能,例如:无锁读取,并发读取(全量数据的读取性能可以水平扩展),断点续传,这些功能是 Debezium 所不具备的,也是 Flink CDC 存在的意义。

同时,Flink 还有一种专门的数据格式 debezium-json,从名称上看,它似乎就是 debezium 格式的 json 表达形式,那 debezium-json 格式和 debezium 原生格式是一回事吗?

首先,我们要主要到这样一个细节:mysql-cdc 作为一个 source connector,并不要求指定 format,实际上,它的 format 是不可配置的,因为 Flink CDC 在内部实现依赖 debezium,获得的原始的数据格式就是 debezium 格式,对外,这不可配置,也不可见,只有向下游传导数据时,才会涉及到解析和转换的问题。

其次,我们还要先澄清一种误解:debezium-json 并不是跟 Flink CDC(例如mysql-cdc)绑定在一起的,作为一种独立的、可描述 changelog 的格式,实际上,它可以应用到任何动态表上,例如:如果上游表是:connector=upsert-kafka,format=json,下游依旧可以使用: connector=kafka,format=debezium-json,关于这一点,可以参考本文的实测 《Flink SQL:debezium-json 格式的表一定是数据库的 CDC 数据吗?》,这个测试给出了这样一个非常明确的结论:

使用 debezium-json 格式的表不一定是数据库的 CDC 数据,但一定是上游动态表的 changelog,然后使用 debezium-json 格式描述。

Flink CDC 从数据库 binlog 中提取数据时使用了 debezium,获得的原始的数据格式也是 debezium 格式,但是,这都是发生在 Flink CDC 内部的,对外是不可见的!当需要把 CDC 数据传给下游时,才会针对下游指定的格式进行转换,这种转换也是根据目标表 DDL 中定义的 Schema 自动地隐式地完成的。

我们还是靠举例和试验来说明这个问题吧。再次看一下 《Flink CDC 与 Kafka 集成:Snapshot 还是 Changelog?Upsert Kafka 还是 Kafka?》 一文的 ”测试组合(1):connector=kafka,format=debezium-json“ 一节给出的案例。

原生 Debezium 格式(样例)

使用如下 SQL 创建一个 mysql-cdc 的源表:

SET 'sql-client.execution.result-mode' = 'TABLEAU';DROP TABLE IF EXISTS orders_mysql_cdc;CREATE TABLE IF NOT EXISTS orders_mysql_cdc (`order_number` INT NOT NULL,`order_date` DATE NOT NULL,`purchaser` INT NOT NULL,`quantity` INT NOT NULL,`product_id` INT NOT NULL,CONSTRAINT `PRIMARY` PRIMARY KEY (`order_number`) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc','hostname' = '10.0.13.30','port' = '3307','username' = 'root','password' = 'Admin1234!','database-name' = 'inventory','table-name' = 'orders'
);

那从 Flink CDC 源表提取出来的数据应该是什么样子呢?前面我们已经说过,这个动作发生在 Flink CDC 内部,提取的数据也是外部不可见的,那我们能不能从其他渠道确定实际的数据格式吗?能,如果说 Flink CDC 就是通过 Debezium 来采集数据,那么采集到的最原始的数据格式就是标准的 Debezium 格式,通常,这是这个样子的:

{"before": null,"after": {"osci.mysql-server-3.inventory.orders.Value": {"order_number": 10006,"order_date": 16852,"purchaser": 1003,"quantity": 1,"product_id": 107}},"source": {"version": "2.2.0.Final","connector": "mysql","name": "osci.mysql-server-3","ts_ms": 1705645511000,"snapshot": {"string": "false"},"db": "inventory","sequence": null,"table": {"string": "orders"},"server_id": 223344,"gtid": null,"file": "mysql-bin.000004","pos": 640,"row": 0,"thread": {"long": 10},"query": null},"op": "c","ts_ms": {"long": 1705645511455},"transaction": null
}

再次强调,上述格式的数据在 Flink CDC 中是不可见的,发生于 Flink CDC 内部,以上格式是标准的 debezium 数据格式,Flink CDC一定是率先拿到了这种格式的数据然后再经处理转发给下游的,比如:如果 DDL 中提取了某些元数据,也是从上面这种原始的 Debezium 数据中获取的。

debezium-json 格式(样例)

如下的 SQL 在 Kafka 上创建了一个 debezium-json 格式的目标表,然后使用 INSERT INTO ... SELECT ... 把源表和目标表的数据流驱动起来:

DROP TABLE IF EXISTS orders_kafka_debezium_json;CREATE TABLE IF NOT EXISTS orders_kafka_debezium_json (order_number int,order_date   date,purchaser    int,quantity     int,product_id   int
) WITH ('connector' = 'kafka','topic' = 'orders_kafka_debezium_json','properties.bootstrap.servers' = 'b-2.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092,b-3.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092,b-1.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092','properties.group.id' = 'orders_kafka_debezium_json','scan.startup.mode' = 'earliest-offset','format' = 'debezium-json'
);-- 提交持续查询,驱动整个 Pipelineinsert into orders_kafka_debezium_json select * from orders_mysql_cdc;

这时,写入 Kafka 中的 debezium-json 格式的数据是这样的:

{"before": {"order_number": 10003,"order_date": "2016-02-19","purchaser": 1002,"quantity": 2,"product_id": 106},"after": null,"op": "d"
}

结论

比较上述两种消息格式就能看出:

debezium-json 格式并不等于原生的 debezium 格式,两者有很多相似之处,都有 before,after,op,原生 debezium 格式仅发生并存在于 Flink CDC 内部,对外不可见,debezium-json 格式可用于表达任何动态表的 changelog,与数据库 CDC 数据已无必然的绑定关系。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/312195.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

计算机网络 Cisco远程Telnet访问交换机和Console终端连接交换机

一、实验要求和内容 1、配置交换机进入特权模式密文密码为“abcd两位班内学号”,远程登陆密码为“123456” 2、验证PC0通过远程登陆到交换机上,看是否可以进去特权模式 二、实验步骤 1、将一台还没配置的新交换机,利用console线连接设备的…

深度解析SPARK的基本概念

关联阅读博客文章: 深入理解MapReduce:从Map到Reduce的工作原理解析 引言: 在当今大数据时代,数据处理和分析成为了企业发展的重要驱动力。Apache Spark作为一个快速、通用的大数据处理引擎,受到了广泛的关注和应用。…

IAR 使用笔记(IAR BIN大小为0异常解决)

烧写 由于芯片的内部SPI FLASH的0级BOOT 程序起到到开启JTAG SW 仿真功能,一旦内部SPI FLASH存储的BL0启动代码被损坏,芯片的JTAG 将不能被连接。所以对BL0的烧写需要谨慎,烧写BL0过程保证芯片不断电。 如果烧写了多备份的启动代码&#xff…

每日两题2

不同路径 class Solution { public:int uniquePaths(int m, int n) {vector<vector<int>> dp(m1, vector<int>(n1,0));//创建dp表dp[0][1] 1;//初始化//填表for(int i 1; i < m; i){for(int j 1; j < n; j){dp[i][j] dp[i-1][j] dp[i][j-1];}}ret…

Linux 内核学习(1) --- 时钟子系统

标题 时钟系统说明时钟树Clock Provider时钟通用数据结构clock_device 的注册clock_provider DTS配置和注册clock consumer时钟系统总结 时钟系统说明 时钟就是 SoC 中的脉搏&#xff0c;由它来控制各个部件按各自的节奏跳动。比如&#xff0c;CPU主频设置&#xff0c;串口的波…

潮玩宇宙小程序定制大逃杀游戏APP开发H5游戏

游戏名称&#xff1a;潮玩宇宙大逃杀 游戏类型&#xff1a;休闲竞技类小游戏 游戏目标&#xff1a;玩家通过选择房间躲避杀手&#xff0c;生存下来并瓜分被杀房间的元宝。 核心功能 房间选择&#xff1a;玩家进入游戏后&#xff0c;可以选择一间房间躲避杀手。杀手行动&…

IDEA Warnings:SQL dialect is not configured.

springboot项目XxxMapper.xml文件打开后显示warnings&#xff1a;SQL dialect is not configured......&#xff08;翻译&#xff1a;未配置SQL语言。&#xff09; 大概意思是没有在IDEA中配置当前sql是MySQl、Oracle还是MariaDB等语言。 配置一下就好&#xff1a; 完了&#…

APIGateway的认证

APIGateway的支持的认证如下&#xff1a; 我们从表格中可以看到&#xff0c;HTTP API 不支持资源策略的功能&#xff0c;另外是通过JWT的方式集成Cognito的。 对于REST API则是没有显示说明支持JWT认证&#xff0c;这个我们可以通过Lambda 自定义的方式来实现。 所以按照这个…

本地搭建属于你自己的AI搜索引擎 支持多家AI模型

FreeAskInternet 是一个完全免费、私有且本地运行的搜索聚合器&#xff0c;并使用 MULTI LLM 生成答案&#xff0c;无需 GPU。用户可以提出问题&#xff0c;系统将进行多引擎搜索&#xff0c;并将搜索结果合并到LLM中&#xff0c;并根据搜索结果生成答案。全部免费使用。 项目…

Java springboot使用EasyExcel读Excel文件,映射不到属性值,对象属性值都是null

如果你的类上有这个注解&#xff0c;去掉火或注释掉就可以了 Accessors(chain true)解决方法

解决EasyPoi导入Excel获取不到第一列的问题

文章目录 1. 复现错误2. 分析错误2.1 导入的代码2.2 DictExcel实体类2.2 表头和标题3. 解决问题1. 复现错误 使用EasyPoi导入数据时,Excel表格如下图: 但在导入时,出现如下错误: name为英文名称,在第一列,Excel表格有值,但导入的代码中为null,就很奇怪? 2. 分析错误 …

【数据结构1-基本概念和术语】

这里写自定义目录标题 0.数据&#xff0c;数据元素&#xff0c;数据项&#xff0c;数据对项&#xff0c;数据结构&#xff0c;逻辑结构&#xff0c;存储结构1.结构1.1逻辑结构1.2存储结构1.2.1 顺序结构1.2.2链式结构 1.3数据结构1.3.1基本数据类型1.3.2抽象数据类型1.3.2.1一个…

【系统分析师】系统安全分析与设计

文章目录 1、安全基础技术1.1 密码相关1.1.1对称加密1.1.2非对称加密1.1.3信息摘要1.1.4数字签名1.1.5数字信封 1.2 PKI公钥体系 2、信息系统安全2.1 保障层次2.2 网络安全2.2.1WIFI2.2.2 网络威胁与攻击2.2.3 安全保护等级 2.3计算机病毒与木马2.4安全防范体系 1、安全基础技术…

探索数据结构:BF与KMP的灵活应用

✨✨ 欢迎大家来到贝蒂大讲堂✨✨ &#x1f388;&#x1f388;养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; 所属专栏&#xff1a;数据结构与算法 贝蒂的主页&#xff1a;Betty’s blog 1. 什么是字符串匹配算法 字符串匹配是计算机科学中的一个基础概念&…

python创建word文档并向word中写数据

一、docx库的安装方法 python创建word文档需要用到docx库&#xff0c;安装命令如下&#xff1a; pip install python-docx 注意&#xff0c;安装的是python-docx。 二、使用方法 使用方法有很多&#xff0c;这里只介绍创建文档并向文档中写入数据。 import docxmydocdocx.Do…

SEO之搜索引擎的工作原理(三)

初创企业需要建站的朋友看这篇文章&#xff0c;谢谢支持&#xff1a;我给不会敲代码又想搭建网站的人建议 &#xff08;接上一篇。。。&#xff09; 排名 经过搜索引擎蜘蛛抓取页面&#xff0c;索引程序计算得到倒排索引后&#xff0c;搜索引擎就准备好可以随时处理用户搜索了…

C# 两种方法截取活动窗口屏幕,实现窗体截图

方法1&#xff0c;截屏内容仅包括活动窗口界面&#xff0c;而方法2是从屏幕范围取图&#xff0c;截屏内容会包括屏幕上所有内容。例如有一些程序在桌面顶层显示半透明的悬浮窗&#xff0c;用方法2截屏就会包括这些内容&#xff0c;并不是单纯的活动窗口内容。 方法1&#xff0c…

pyqt的人脸识别 基于face_recognition库

参考文献&#xff1a; 1、python face_recognition实现人脸识别系统_python facerecognition检测人脸-CSDN博客 2、cv2.VideoCapture()_cv2.videocapture(0)-CSDN博客 1、camera.py文件代码如下&#xff1b;目录如下 import sys from PyQt5.QtWidgets import QApplication, …

【机器学习】贝叶斯算法在机器学习中的应用与实例分析

贝叶斯算法在机器学习中的应用与实例分析 一、贝叶斯算法原理及重要性二、朴素贝叶斯分类器的实现三、贝叶斯网络在自然语言处理中的应用四、总结与展望 在人工智能的浪潮中&#xff0c;机器学习以其独特的魅力引领着科技领域的创新。其中&#xff0c;贝叶斯算法以其概率推理的…

Docker安装xxl-job分布式任务调度平台

文章目录 Docker安装xxl-job分布式任务调度平台1.xxl-job介绍2. 初始化“调度数据库”3、docker挂载运行xxl-job容器3.1、在linux的opt目录下创建xxl_job文件夹&#xff0c;并在里面创建logs文件夹和application.properties文件3.2、配置application.properties文件&#xff0c…