Flink CDC 提取记录变更时间作为事件时间和 Hudi 表的 precombine.field 以及1970-01-01 取值问题

《大数据平台架构与原型实现:数据中台建设实战》博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧二维码进入京东手机购书页面。

CDC 数据中的记录变更时间标记着这条记录在数据库中执行对应操作(创建/更新/删除)的时间,可以说是天然的“事件时间”,特别是对于那些本身没有记录时间字段的表来说就更加合适了。Flink 官方文档 也建议在使用 CDC 的情况下,优先使用 CDC 中的这个时间字段,这个时间更加精准。

与此同时,在定义 Hudi 表时,precombine.field 也是一个非常重要的配置,显然 CDC 数据中的记录变更时间是最合适的,没有之一。

CDC 数据中的记录变更时间属于元数据范畴,以 Flink CDC 的 MySQL 数据库为例,它提供四种元数据的抽取:

KeyDataTypeDescription
table_nameSTRING NOT NULLName of the table that contain the row.
database_nameSTRING NOT NULLName of the database that contain the row.
op_tsTIMESTAMP_LTZ(3) NOT NULLIt indicates the time that the change was made in the database. If the record is read from snapshot of the table instead of the binlog, the value is always 0.
row_kindSTRING NOT NULLIt indicates the row kind of the changelog,Note: The downstream SQL operator may fail to compare due to this new added column when processing the row retraction if the source operator chooses to output the ‘row_kind’ column for each record. It is recommended to use this metadata column only in simple synchronization jobs. ‘+I’ means INSERT message, ‘-D’ means DELETE message, ‘-U’ means UPDATE_BEFORE message and ‘+U’ means UPDATE_AFTER message.

其中的 op_ts 就是我们想要的,也就是:CDC 数据中的记录变更时间。我们可以在定义数据表时声明这个列,Flink CDC 可以将其提取出来作为普通字段供下游使用,就像下表中这样:

CREATE TABLE IF NOT EXISTS orders_mysql_cdc (`order_number` INT NOT NULL,`order_date` DATE NOT NULL,`purchaser` INT NOT NULL,`quantity` INT NOT NULL,`product_id` INT NOT NULL,`op_ts` TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,PRIMARY KEY (`order_number`) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc',...
);

注意,在定义 Flink CDC 源表时,op_ts 的数据类型是 TIMESTAMP_LTZ(3),不是 TIMESTAMP(3),写入下游表时,可以是 TIMESTAMP(3)

当我们初次使用这个 op_ts 字段时,你会发现,写入到的数据库的数据全部都是 1970-01-01 00:00:00.000,就像下面这样:

在这里插入图片描述

你可能会认为是哪里出错了,实际上,这是 Flink CDC 特别设计的,也是合理的,Flink CDC 官方文档的解释是:

If the record is read from snapshot of the table instead of the binlog, the value is always 0.

我们知道,Flink CDC ( 2.0+ ) 的一个显著特征是:它是全量 + 增量的一体化读取!全量就是经常说的历史数据,增量就是实时的数据,控制 Flink CDC 是从全部历史数据开始同步整个数据库还是从只当下的 binlog 中同步近期增量数据的配置项是:scan.startup.mode ( 官方文档 ),该配置项支持 5 种配置,而默认配置(initial)就是以当前分界点,数据中的现有数据使用全量方式读取(也叫快照读取),此后的数据从 binlog 中读取,这样就和上面描述的 op_ts 字段的取值吻合上了:

当 Flink CDC 使用全量方式读取表中的历史数据时,op_ts 字段全部取值为 0,即 1970-01-01 00:00:00.000,当 Flink CDC 使用增量方式读取 binlog 数据时,op_ts 字段的取值为数据发生变更的实际时间

这种设计还是非常合理的,因为,Flink CDC 本身在使用快照方式读取时,就没有任何变更时间可以读取,这个时间只在 binlog 中才有,而这对下游也不会造成太大的影响,因为此时的数据都是 insert-only 的数据,同一主键也不会出现两条记录,至少对 Hudi 表是没有影响的。

此外,作为一个“额外收获”,你会发现:op_ts 这个字段本身恰好标记了一条记录是通过全量同步进来的,还是增量同步进来的!


补充:以下是 Flink CDC 官方文档对 scan.startup.mode 5 种同步模式的解释:

The config option scan.startup.mode specifies the startup mode for MySQL CDC consumer. The valid enumerations are:

  • initial (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog.
  • earliest-offset: Skip snapshot phase and start reading binlog events from the earliest accessible binlog offset.
  • latest-offset: Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the binlog which means only have the changes since the connector was started.
  • specific-offset: Skip snapshot phase and start reading binlog events from a specific offset. The offset could be specified with binlog filename and position, or a GTID set if GTID is enabled on server.
  • timestamp: Skip snapshot phase and start reading binlog events from a specific timestamp.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/267529.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深入剖析k8s-控制器思想

引言 本文是《深入剖析Kubernetes》学习笔记——《深入剖析Kubernetes》 正文 控制器都遵循K8s的项目中一个通用的编排模式——控制循环 for {实际状态 : 获取集群中对象X的实际状态期望状态 : 获取集群中对象X的期望状态if 实际状态 期望状态 {// do nothing} else {执行…

LeetCode 2120.执行所有后缀指令

现有一个 n x n 大小的网格,左上角单元格坐标 (0, 0) ,右下角单元格坐标 (n - 1, n - 1) 。给你整数 n 和一个整数数组 startPos ,其中 startPos [startrow, startcol] 表示机器人最开始在坐标为 (startrow, startcol) 的单元格上。 另给你…

多行业万能预约门店小程序源码系统 支持多门店预约小程序 带完整的安装代码包以及搭建教程

随着消费者对于服务体验要求的不断提升,门店预约系统成为了许多行业提升服务质量、提高运营效率的重要工具。然而,市面上的预约系统往往功能单一,无法满足多行业、多场景的个性化需求。下面,小编集合了多年的行业经验和技术积累&a…

11:日志分析系统ELK|Elasticsearch|kibana

日志分析系统ELK|Elasticsearch|kibana 日志分析系统ELKELK概述Elasticsearch安装Elasticsearch部署Elasticsearch集群Elasticsearch插件 熟悉Elasticsearch的API调用_cat API创建 tedu 索引使用 PUT 方式增加数据查询数据修改数据删除数据 KibanaKibana…

Android T 远程动画显示流程其三——桌面侧动画启动到系统侧结束流程

前言 接着前文分析Android T 远程动画显示流程其二 我们通过IRemoteAnimationRunner跨进程通信从系统进程来到了桌面进程,这里是真正动画播放的逻辑。 之后又通过IRemoteAnimationFinishedCallback跨进程通信回到系统进程,处理动画结束时的逻辑。 进入…

基于yolov5的电瓶车和自行车检测系统,可进行图像目标检测,也可进行视屏和摄像检测(pytorch框架)【python源码+UI界面+功能源码详解】

功能演示: 基于yolov5的电瓶车和自行车检测系统_哔哩哔哩_bilibili (一)简介 基于yolov5的电瓶车和自行车检测系统是在pytorch框架下实现的,这是一个完整的项目,包括代码,数据集,训练好的模型…

svn介绍 4.0

一、svn介绍(版本控制工具) 1、svn的定义: svn是一个开放源代码的版本控制系统,通过采用分支管理系统的高效管理,简而言之就是用于多个人共同开发同一个项目,实现共享资源,实现最终集中式个管…

2024年sCrypt编程马拉松即将开幕

BSV区块链的建设者们,你们在哪?2024年sCrypt编程马拉松即将拉开帷幕! 2024年3月16日至17日,我们将在旧金山市举办一场以比特币智能合约(即 sCrypt)和比特币通证(如Ordinals)相结合为…

【蛀牙】日常生活如何正确护理牙齿?刷牙、洗牙、补牙

程序员生活指南之 【蛀牙】日常生活如何正确护理牙齿?刷牙、洗牙、补牙 文章目录 一、日常如何清洗牙齿?——刷牙与洗牙1、牙齿污垢1.1 牙菌斑1.2 软垢1.3 牙结石1.4 牙龈出血 2、如何刷牙2.1 关于时间2.2 各种工具2.3 巴氏刷牙法 二、定期进行洗牙3、如…

链表OJ刷题(二)

制作不易,三连支持一下呗!!! 文章目录 前言一、链表的回文结构二、相交链表三、链表中倒数第k个节点四、环形链表Ⅰ和Ⅱ总结 前言 一、链表的回文结构 链表的回文结构_牛客题霸_牛客网 这里我们需要先了解一下什么叫做回文&#…

React之组件定义和事件处理

一、组件的分类 在react中,组件分为函数组件和class组件,也就是无状态组件和有状态组件。 * 更过时候我们应该区别使用无状态组件,因为如果有状态组件会触发生命周期所对应的一些函数 * 一旦触发他生命周期的函数,它就会影响当前项…

MQL5学习之RSI指标编写

研究MT5时发现MQL5这个指标编写功能很强大,应该是碾压国内所有的指标系统,不过这个东西相对复杂很多,比通达信公式不知复杂几许,看起来和C语法接近,倒是比较适合自己。试着玩一下,发现还是有点难度的。索性…

修改centos7的dns解决docker拉取镜像超时问题

近期在一台centos7的服务器上部署系统,拉取docker镜像时总是超时,如图所示。网上有教程说,可以修改操纵系统的dns地址,试了一下,果然搞定。 打开dns配置文件 sudo vi /etc/resolv.conf发觉里面的地址设为114.114.114…

Qt篇——QTableWidget保存表格数据到Excel文件中,读Excel内容到QTableWidget

表格和excel例子如下图所示&#xff1a; 一、QTableWidget保存表格数据到Excel文件中 代码如下&#xff1a; &#xff08;pro文件中添加QT axcontainer&#xff09; #include <QAxObject>void MainWindow::saveTableToExcel() {QDateTime current_date_time QDateTi…

交友社交软件开发-php交友聊天系统-

为了开发一个高效的交友系统&#xff0c;需要一个完善的信息管理和筛选机制。这个系统应该能够根据用户的个人信息、兴趣爱好、价值观等标准进行筛选&#xff0c;并向用户提供符合他们要求心仪的人的信息。为了实现这个目标&#xff0c;系统可以利用人工智能技术&#xff0c;分…

经销商文件分发 怎样兼顾安全和效率?

经销商文件分发是指将文件、资料、产品信息等从制造商或经销商传递给经销商的过程。这一过程对于确保经销商能够获取最新的产品信息、销售策略、市场活动资料等至关重要。 想要管理众多经销商合作伙伴之间的文件传输并提高效率&#xff0c;可以采取以下措施&#xff1a; 1、建…

中文文本分类(pytorch 实现)

import torch import torch.nn as nn import torchvision from torchvision import transforms, datasets import os, PIL, pathlib, warningswarnings.filterwarnings("ignore") # 忽略警告信息# win10系统 device torch.device("cuda" if torch.cuda.i…

【yolov8部署实战】VS2019环境下使用Onnxruntime环境部署yolo项目|含源码

一、前言 部署yolo项目&#xff0c;是我这几个月以来做的事情&#xff0c;最近打算把这几个月试过的方法&#xff0c;踩过的坑&#xff0c;以博客的形式&#xff0c;分享一下。关于下面动态中讲到的如何用opencv部署&#xff0c;我在上一篇博客中已经详细讲到了&#xff1a;【…

JavaWeb 自己给服务器安装SQL Server数据库遇到的坑

之前买的虚拟主机免费送了一个SQL Server数据库&#xff0c;由于服务器提供商今年下架我用的那款虚拟主机产品&#xff0c;所以数据库也被收回了。我买了阿里云云服务器&#xff0c;但是没有数据库&#xff0c;于是自己装了一个SQL Server数据库&#xff0c;总结一下遇到的坑。…

el-input组件当数据为空时, 边框变红,并提示错误信息

1&#xff0c;样式 初始&#xff1a; 当不输入口令&#xff0c; 点击确定时&#xff1a; 2, 思路 主要是使用动态类的方式。 先设置输入框变红的样式以及提示文字的样式class 对于样式class 用变量来控制是否奏效。 3&#xff0c; 代码实现 //html&#xff1a; <div cl…