09 flink-sql 中基于 mysql-cdc 的 select * from test_user 的具体实现

前言

这也是最近帮一个朋友看问题 遇到的一个问题 

然后 引发了一下 对于 flink-sql 里面的一些 常规处理的思考, 理解 

原始问题主要是 在测试库可以使用 flink-sql 可以正常同步, 但是 在生产环境 无法正常同步数据 

这个问题 我们后面单独 记录一篇文章 

87fe04f3239e4e768da72132e7774269.png

 

 

测试用例

下载 flink-1.13.6, 首先启动一个 standalone 的集群 

master:flink-1.13.6 jerry$ ./bin/start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host master.
Starting taskexecutor daemon on host master.

 

启动 flink sql-client 

master:flink-1.13.6 jerry$ ./bin/sql-client.sh 
Listening for transport dt_socket at address: 5007
No default environment specified.
Searching for '/Users/jerry/Downloads/flink-1.13.6/conf/sql-client-defaults.yaml'...not found.
Command history file path: /Users/jerry/.flink-sql-history▒▓██▓██▒▓████▒▒█▓▒▓███▓▒▓███▓░░        ▒▒▒▓██▒  ▒░██▒   ▒▒▓▓█▓▓▒░      ▒██████▒         ░▒▓███▒    ▒█▒█▒░▓█            ███   ▓░▒██▓█       ▒▒▒▒▒▓██▓░▒░▓▓██░ █   ▒▒░       ███▓▓█ ▒█▒▒▒████░   ▒▓█▓      ██▒▒▒ ▓███▒░▒█▓▓██       ▓█▒    ▓█▒▓██▓ ░█░▓░▒▓████▒ ██         ▒█    █▓░▒█▒░▒█▒███▓░██▓  ▓█           █   █▓ ▒▓█▓▓█▒░██▓  ░█░            █  █▒ ▒█████▓▒ ██▓░▒███░ ░ █░          ▓ ░█ █████▒░░    ░█░▓  ▓░██▓█ ▒▒▓▒          ▓███████▓░       ▒█▒ ▒▓ ▓██▓▒██▓ ▓█ █▓█       ░▒█████▓▓▒░         ██▒▒  █ ▒  ▓█▒▓█▓  ▓█ ██▓ ░▓▓▓▓▓▓▓▒              ▒██▓           ░█▒▓█    █ ▓███▓▒░              ░▓▓▓███▓          ░▒░ ▓███▓    ██▒    ░▒▓▓███▓▓▓▓▓██████▓▒            ▓███  █▓███▒ ███   ░▓▓▒░░   ░▓████▓░                  ░▒▓▒  █▓█▓▒▒▓▓██  ░▒▒░░░▒▒▒▒▓██▓░                            █▓██ ▓░▒█   ▓▓▓▓▒░░  ▒█▓       ▒▓▓██▓    ▓▒          ▒▒▓▓█▓ ▓▒█  █▓░  ░▒▓▓██▒            ░▓█▒   ▒▒▒░▒▒▓█████▒██░ ▓█▒█▒  ▒▓▓▒  ▓█                █░      ░░░░   ░█▒▓█   ▒█▓   ░     █░                ▒█              █▓█▓   ██         █░                 ▓▓        ▒█▓▓▓▒█░█▓ ░▓██░       ▓▒                  ▓█▓▒░░░▒▓█░    ▒███   ▓█▓░      ▒                    ░▒█▒██▒      ▓▓▓█▒   ▒█▓▒░                         ▒▒ █▒█▓▒▒░░▒██░██▒    ▒▓▓▒                     ▓██▓▒█▒ ░▓▓▓▓▒█▓░▓██▒                          ▓░  ▒█▓█  ░░▒▒▒▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓  ▓░▒█░______ _ _       _       _____  ____  _         _____ _ _            _  BETA   |  ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | |  | |__  | |_ _ __ | | __ | (___ | |  | | |      | |    | |_  ___ _ __ | |_ |  __| | | | '_ \| |/ /  \___ \| |  | | |      | |    | | |/ _ \ '_ \| __|| |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_ |_|    |_|_|_| |_|_|\_\ |_____/ \___\_\______|  \_____|_|_|\___|_| |_|\__|Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.

 

创建 flink-sql 的表结构 

CREATE TABLE test_user (
`name` string,
`age` string,
PRIMARY KEY (`name`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = 'postgres',
'database-name' = 'test',
'table-name' = 'test_user'
);

 

源表数据如下 

d53a95f8b01a4676981c365726074779.png

 

然后 flink-sql 这边查询 结果如下 

1fbd5138ad994a5383fbcb31bd47d9f4.png

 

然后 我们来看一下 这里的整个处理流程 

 

 

flink-sql 中 select * from test_user 获取全量数据的调试

首先这里交互的角色抽象的可以理解为两个, 一个是 flink 集群, 一个是 flink sql-client 

然后 flink sql-client 这边组合查询, 相关业务, 然后创建一个 flink 任务, 抛给 flink 集群 

然后 两者进行交互, 首先是拿到 test_user 的快照全量数据, 然后 flink sql-client 这边做业务展示 

然后 test_user 的之后的增删改查, 的处理是基于 mysql binlog 这边来做增量处理 

我们这里 仅仅演示 test_user 的快照数据获取 以及 在 test_user 中增加一条记录, 然后 flink sql-client 这边是 怎么获取到的这个整个流程 

 

如下这里是 flink sql-client 这边将用户输入的 " select * from test_user " 转换为 flink sql 上下文的 operations, 然后封装成 pipeline, 提交给 flink 集群 

11a2df70ea0a4b589f0a9e7c2cfaef2a.png

 

 

看一下 flink 集群这边任务的执行, 首先是 第一次的全量数据的快照 

从 CollectSinkFunction 这边从 buffer 中获取到两条记录, 大致可以看出是 第一条记录 和 第二条记录, 然后 sendBackResults 通过 tcp交互, 将这两条数据对应的 StreamRecord 传输回 flink sql-client 

0609d767ecca4bce9d4605071734d1f9.png

 

往前回溯, 看一下 真正执行查询的地方, 执行的是 "select * from test_user;" 

然后这里迭代会将 查询的记录封装成为 SourceRecord, 然后添加到 recorderMarker 的 bufferedRecordQueue 中 

92e1ba3608324a0b847d9289c3c34dc1.png

 

然后这个 bufferedRecordQueue 是一个队列, 会将消耗的元素调用 enqueueRecord 将数据放入到 records 中 

6b48eaeadc2549758874c0f7b5bfcf25.png

 

这里是更细节的 enqueueRecord 的执行流程, 比如这里 迭代的事 第一条记录 

1c3973e8afe84d86af26ef340e11bb32.png

 

然后接着是 更上一层 Engine 的业务流程, 他会将 SnapshotReader 这边读取的记录更新到 batch 中 

351bda6f26bf4455b4d2f1adfb10a528.png

 

然后就是 Engine 这边的任务的执行, 将数据经过 map, filter, NotNull, 等等相关处理 

最终到达 CollectSinkFunction 这边 

1cf7e36ff8344e1f9391efc607f06186.png

 

然后 CollectSinkFunction 这边将数据封装成 GenericRowData, 然后序列化, 放入 buffer 队列 

然后 最终就是 CollectSinkFunction 上面的流程, 将序列化之后的数据通过 CollectCoordinationResponse 回传给 flink sql-client 

b0fa1585ee8c4f99b1aff634cf651099.png

 

 

然后 flink sql-client 这边的处理如下  

将拿到的数据, 添加到 buffer  队列 

3a6a2ea3c8344bd1be5fa3723ecb62f7.png

dc17460a9c7e409b8701bcf524ac4b56.png

 

然后就是 flink sql-client 这边的主线程的处理了, 从 buffer 中迭代 记录出来, 然后 放到 materializedTable, 然后 之后 cli 这边获取表格数据的时候, 将其传输到 snapshot 中 

494c6dd347c84e6a9548d88ba2d01e01.png

c677e808922f43cb83bc423ff641252b.png

 

 

flink sql-client 这边的展示流程如下 

1b2cf8d16f3b4cf097c9df6b07ef4f66.png

 

然后 做具体的展示, 展示结果如下, 然后 随着之后的迭代 能够获取完整当前页的数据, 展示在 cli 中 

f90e68011bb843cb9196cf2ca7002bfa.png

eae2ea9b78e54ea9a484632a45c7ab34.png

 

 

 

 

flink-sql 中 select * from test_user 获取增量数据的调试

增量数据的获取, 来自于 BinlogClient 这边的获取, 连接 mysql 的服务 

发送获取 binlog 的命令, 然后 之后 mysql 这边有 binlog 的事件之后, 会将相关 事件传递到 BinlogClient 这边 

比如这里 执行了一个 ”insert into test_user (`name`, age) select max(age)+1, max(age+1) from test_user;”, 增加了一条记录 (3, 3) 

然后这边 反序列化之后, 读取到 WriteRowsEvent 数据为 (3, 3) 

5ef9dc67350d489b91903f2abb0547c0.png

 

然后就是 BinlogClient 的后续流程, 将数据使用 recordMarker 记录 

和上面 SnapshotReader 这边处理一样, recorderMarker 会将 SourceRecord记录 添加到 records 列表, 由外层 Engine 层轮询 records 将其进行任务的执行, 到后面的 CollectSinkFunction 传输给 flink sql-client 这边做数据增删改查, 以及展示 

e596147a59714192b4bafcfa5f73d28a.png

 

为记录 (3, 3) 生成 SourceRecord 并放到 records 队列 

34bbed10370743b695fd3af2c99174be.png

 

Engine 层的处理, 其他的这里就不细化了 

f08ed652eda44ad39fc545c7290248f3.png

 

 

 

 

flink mysql-cdc MysqlConnectorTask 的处理 

我们可以看到上面 全量读取使用了 快照读, 然后增量的部分使用基于 binlog 来进行处理 

那么这个 处理流程是在这里呢? 

9da3b393683b4988be75f76ab669e9ef.png

 

 

 

 

 

 

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/301036.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

代码随想录算法训练营第48天|198.打家劫舍|213.打家劫舍II| 337.打家劫舍III

代码随想录算法训练营第48天|198.打家劫舍|213.打家劫舍II| 337.打家劫舍III 今天就是打家劫舍的一天&#xff0c;这个系列不算难&#xff0c;大家可以一口气拿下。 198.打家劫舍 视频讲解&#xff1a;https://www.bilibili.com/video/BV1Te411N7SX https://programmercarl.c…

系统架构评估_2.SAAM方法

SAAM&#xff08;Scenarios-based Architecture Analysis Method&#xff09;是卡耐基梅隆大学软件工程研究所&#xff08;SEI at CMU&#xff09;的Kazman等人于1983年提出的一种非功能质量属性的架构分析方法&#xff0c;是最早形成文档并得到广泛使用的软件架构分析方法。最…

大语言模型上下文窗口初探(下)

由于篇幅原因&#xff0c;本文分为上下两篇&#xff0c;上篇主要讲解上下文窗口的概念、在LLM中的重要性&#xff0c;下篇主要讲解长文本能否成为LLM的护城河、国外大厂对长文本的态度。 3、长文本是护城河吗&#xff1f; 毫无疑问&#xff0c;Kimi从一开始就用“长文本”占领…

电脑硬件 - 硬盘

硬盘是一台电脑的数据中心&#xff0c;存放着我们用户的所有文件和数据 对于一块硬盘&#xff0c;其重要指标&#xff1a;顺序读写能力&#xff0c;随机读写能力 顺序读写影响大文件的拷贝&#xff0c;随机读写影响大量小文件的拷贝&#xff08;打开软件的快慢&#xff09; 因…

揭秘Symfony DomCrawler库的爬虫魔力:获取网易新闻热点

在这个信息爆炸的时代&#xff0c;新闻热点不仅仅是传递信息的渠道&#xff0c;它们还能够影响和引导公众舆论。Symfony DomCrawler库作为一个强大的爬虫工具&#xff0c;可以帮助我们理解这种现象&#xff0c;通过获取和分析网易新闻热点&#xff0c;我们可以洞察舆情的走向。…

系统监测工具-tcpdump的使用

一个简单的tcpdump抓包过程。主要抓包观察三次握手&#xff0c;四次挥手的数据包 有两个程序&#xff1a;客户端和服务器两个程序 服务器端的ip地址使用的是回环地址127.0.0.1 端口号使用的是6000 tcpdump -i 指定用哪个网卡等&#xff0c;dstip地址端口指定抓取目的地址…

【SpringBoot整合系列】SpringBoot整合FastDFS(二)

目录 SpringBoot整合FastDFSJava客户端/依赖常用api接口解释1.uploadFile参数返回值 2.uploadSlaveFile参数返回值 3.getMetadata参数返回值 4.overwriteMetadata参数&#xff1a;返回值&#xff1a;无 5.mergeMetadata参数&#xff1a;返回值&#xff1a;无 6.queryFileInfo参…

Nacos Namespace 未授权访问漏洞

Nacos Namespace 未授权访问漏洞 问题 nacos 源码启动&#xff0c;发现即使开启了鉴权&#xff1a;nacos.core.auth.enabledtrue&#xff0c;未登录情况下&#xff0c;命名空间列表接口仍旧能查询到数据 鉴权逻辑 通过**AuthFilter **进行权限校验判断方法上是否存在注解 …

idea开发 java web 疫情信息查询系统bootstrap框架web结构java编程计算机网页接口查询

一、源码特点 java 疫情信息查询系统是一套完善的完整信息系统&#xff0c;结合java web开发和bootstrap UI框架完成本系统 &#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。 前段主要技术 css j…

深入理解GO语言——GC垃圾回收二

文章目录 前言一、Go V1.5的三色并发标记法总结 前言 书接上回&#xff0c;无论怎么优化&#xff0c;Go V1.3都面临这个一个重要问题&#xff0c;就是mark-and-sweep 算法会暂停整个程序 。 Go是如何面对并这个问题的呢&#xff1f;接下来G V1.5版本 就用 三色并发标记法 来优…

python开发poc2,爆破脚本

#本课知识点和目的&#xff1a; ---协议模块使用&#xff0c;Request 爬虫技术&#xff0c;简易多线程技术&#xff0c;编码技术&#xff0c;Bypass 后门技术 下载ftp服务器模拟器 https://lcba.lanzouy.com/iAMePxl378h 随便创建一个账户&#xff0c;然后登录进去把ip改成…

战争中的AI应用:道德、伦理与技术的交织

AI在战争中的应用是一个极具争议和复杂的话题&#xff0c;无法简单地回答是好还是坏。其影响取决于多个因素&#xff0c;包括使用方式、目的、伦理框架以及技术本身的发展水平。 一方面&#xff0c;AI在战争中具有潜在的积极作用。它可以提高军事行动的效率和精确性&#xff0c…

【MySQL】游标和触发器

一、游标 1.1 什么是游标 1、使用背景 在我们使用update或者delete操作数据时&#xff0c;一般都会根据条件语句查询出很多条记录组成的数据集&#xff0c;然后一次性批量操作 假设我们想要对这个结果集中的数据 一行一行的进行操作&#xff0c;比如某个条件满足后&#xff…

计算机毕业设计java 基于Android的拼图游戏app

当今社会&#xff0c;随着电子信息技术的发展&#xff0c;电子游戏也成为人们日常生活的一部分。这种娱乐方式结合了日新月异的技术&#xff0c;在游戏软件中结合了多种复杂技术。拼图游戏流行在各种电子产品上&#xff0c;从计算机&#xff0c;掌上游戏机到如今的手机&#xf…

Vue中的键盘事件

目 录 1. 概述 2. JavaScript 键盘事件 2.1 键盘事件类型 2.1.1 keydown 事件2.1.2 keypress 事件2.1.3 keyup 事件2.1.4 input 事件 2.2 键盘事件的响应顺序 3. Vue 键盘事件监听与处理 3.1 获取按键的 键码&#xff08;keyCode&#xff09;3.2 监听按键事件 4. Vue 按键…

【C++】继承总结

一、前言 我们众所周知的C三大特性分别为&#xff1a;封装、继承、多态。 封装就是将接口实现统一化&#xff0c;隐藏那些不同的地方&#xff0c;在上层函数调用体现的方式一样&#xff0c;如各种容器的迭代器iterator&#xff0c;尽管底层实现的方式不同&#xff0c;但是在使用…

2024免费Mac电脑用户的系统清理和优化软件CleanMyMac

作为产品营销专家&#xff0c;对于各类产品的特性与优势有着深入的了解。CleanMyMac是一款针对Mac电脑用户的系统清理和优化软件&#xff0c;旨在帮助用户轻松管理、优化和保护Mac电脑。以下是关于CleanMyMac的详细介绍&#xff1a; CleanMyMac X2024全新版下载如下: https://…

ctfshow web入门 文件包含 web151--web161

web151 打算用bp改文件形式(可能没操作好)我重新试了一下抓不到 文件上传不成功 改网页前端 鼠标右键&#xff08;检查&#xff09;&#xff0c;把png改为php访问&#xff0c;执行命令 我上传的马是<?php eval($_POST[a]);?> 查看 web152 上传马 把Content-Type改为…

相机标定——四个坐标系介绍

世界坐标系(Xw,Yw,Zw) 世界坐标系是一个用于描述和定位三维空间中物体位置的坐标系&#xff0c;通常反映真实世界下物体的位置和方向。它是一个惯性坐标系&#xff0c;被用作整个场景或系统的参考框架。在很多情况下&#xff0c;世界坐标系被认为是固定不变的&#xff0c;即它…

【THM】Protocols and Servers 2(协议和服务器 2

介绍 协议和服务器房间涵盖了许多协议: 远程登录HTTP协议文件传输协议邮件传输协议POP3IMAP实现这些协议的服务器会受到不同类型的攻击。仅举几例,请考虑: 嗅探攻击(网络数据包捕获)中间人 ( MITM ) 攻击密码攻击(身份验证攻击)漏洞从安全的角度来看,我们始终需要思考…