【大数据】Flink SQL 语法篇(二):WITH、SELECT WHERE、SELECT DISTINCT

Flink SQL 语法篇(二)

  • 1.WITH 子句
  • 2.SELECT & WHERE 子句
  • 3.SELECT DISTINCT 子句

1.WITH 子句

应用场景(支持 Batch / Streaming):With 语句和离线 Hive SQL With 语句一样的,语法糖 +1,使用它可以让你的代码逻辑更加清晰。

-- 语法糖 +1
WITH orders_with_total AS (SELECT order_id, price + tax AS totalFROM Orders
)
SELECT order_id, SUM(total)
FROM orders_with_total
GROUP BY order_id;

2.SELECT & WHERE 子句

应用场景(支持 Batch / Streaming):SELECT & WHERE 语句和离线 Hive SQL 语句一样的,常用作 ETL,过滤,字段清洗标准化。

INSERT INTO target_table
SELECT * FROM OrdersINSERT INTO target_table
SELECT order_id, price + tax FROM OrdersINSERT INTO target_table
-- 自定义 Source 的数据
SELECT order_id, price FROM (VALUES (1, 2.0), (2, 3.1))  AS t (order_id, price)INSERT INTO target_table
SELECT price + tax FROM Orders WHERE id = 10-- 使用 UDF 做字段标准化处理
INSERT INTO target_table
SELECT PRETTY_PRINT(order_id) FROM Orders
-- 过滤条件
Where id > 3

其实理解一个 SQL 最后生成的任务是怎样执行的,最好的方式就是理解其语义。

以下面的 SQL 为例,我们来介绍下其在离线中和在实时中执行的区别,对比学习一下,大家就比较清楚了。

INSERT INTO target_table
SELECT PRETTY_PRINT(order_id) FROM Orders
Where id > 3

这个 SQL 对应的实时任务,假设 Orders 为 Kafka,target_table 也为 Kafka,在执行时,会生成三个算子:

  • 数据源算子From Order):连接到 Kafka topic,数据源算子一直运行,实时的从 Orders Kafka 中一条一条的读取数据,然后一条一条发送给下游的 过滤和字段标准化算子
  • 过滤和字段标准化算子Where id > 3PRETTY_PRINT(order_id)):接收到上游算子发的一条一条的数据,然后判断 id > 3 ?,将判断结果为 true 的数据执行 PRETTY_PRINT UDF 后,一条一条将计算结果数据发给下游 数据汇算子
  • 数据汇算子INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Kafka 中。

可以看到这个实时任务的所有算子是以一种 Pipeline 模式运行的,所有的算子在同一时刻都是处于 running 状态的,24 小时一直在运行,实时任务中也没有离线中常见的分区概念。

在这里插入图片描述

⭐ 关于看如何看一段 Flink SQL 最终的执行计划:最好的方法就如上图,看 Flink Web UI 的算子图,算子图上详细的标记清楚了每一个算子做的事情。

以上图来说,我们可以看到主要有三个算子:

  • Source 算子。Source: TableSourceScan(table=[[default_catalog, default_database, Orders]], fields=[order_id, name]) -> Calc(select=[order_id, name, CAST(CURRENT_TIMESTAMP()) AS row_time]) -> WatermarkAssigner(rowtime=[row_time], watermark=[(row_time - 5000:INTERVAL SECOND)])
    • 其中 Source 表名称为 table=[[default_catalog, default_database, Orders]
    • 字段为 select=[order_id, name, CAST(CURRENT_TIMESTAMP()) AS row_time]
    • Watermark 策略为 rowtime=[row_time], watermark=[(row_time - 5000:INTERVAL SECOND)]
  • 过滤算子。Calc(select=[order_id, name, row_time], where=[(order_id > 3)]) -> NotNullEnforcer(fields=[order_id])
    • 其中过滤条件为 where=[(order_id > 3)]
    • 结果字段为 select=[order_id, name, row_time]
  • Sink 算子。Sink: Sink(table=[default_catalog.default_database.target_table], fields=[order_id, name, row_time])
    • 其中最终产出的表名称为 table=[default_catalog.default_database.target_table]
    • 表字段为 fields=[order_id, name, row_time]

可以看到 Flink SQL 具体执行了哪些操作是非常详细的标记在算子图上。所以小伙伴萌一定要学会看算子图,这是掌握 Debug、调优前最基础的一个技巧。

那么如果这个 SQL 放在 Hive 中执行时,假设其中 Orders 为 Hive 表,target_table 也为 Hive 表,其也会生成三个类似的算子(虽然实际可能会被优化为一个算子,这里为了方便对比,划分为三个进行介绍),离线和实时任务的执行方式完全不同:

  • 数据源算子From Order):数据源从 Orders Hive 表(通常都是读一天、一小时的分区数据)中一次性读取所有的数据,然后将读到的数据全部发给下游 过滤和字段标准化算子,然后 数据源算子 就运行结束了,释放资源了。
  • 过滤和字段标准化算子Where id > 3PRETTY_PRINT(order_id)):接收到上游算子的所有数据,然后遍历所有数据判断 id > 3 ?,将判断结果为 true 的数据执行 PRETTY_PRINT UDF 后,将所有数据发给下游 数据汇算子,然后 过滤和字段标准化算子 就运行结束了,释放资源了
  • 数据汇算子INSERT INTO target_table):接收到上游的所有数据,将所有数据都写到 target_table Hive 表中,然后整个任务就运行结束了,整个任务的资源也就都释放了

可以看到离线任务的算子是分阶段(Stage)进行运行的,每一个 Stage 运行结束之后,然后下一个 Stage 开始运行,全部的 Stage 运行完成之后,这个离线任务就跑结束了。

注意:很多小伙伴都是之前做过离线数仓的,熟悉了离线的分区、计算任务定时调度运行这两个概念,所以在最初接触 Flink SQL 时,会以为 Flink SQL 实时任务也会存在这两个概念,这里博主做一下解释:

  • 分区概念:离线由于能力限制问题,通常都是进行一批一批的数据计算,每一批数据的数据量都是有限的集合,这一批一批的数据自然的划分方式就是时间,比如按小时、天进行划分分区。但是 在实时任务中,是没有分区的概念的,实时任务的上游、下游都是无限的数据流。
  • 计算任务定时调度概念:同上,离线就是由于计算能力限制,数据要一批一批算,一批一批输入、产出,所以要按照小时、天定时的调度和计算。但是 在实时任务中,是没有定时调度的概念的,实时任务一旦运行起来就是 24 小时不间断,不间断的处理上游无限的数据,不简单的产出数据给到下游。

3.SELECT DISTINCT 子句

应用场景(支持 Batch / Streaming):语句和离线 Hive SQL SELECT DISTINCT 语句一样的,用作根据 key 进行数据去重。

INSERT into target_table
SELECT DISTINCT id 
FROM Orders

也是拿离线和实时做对比。

这个 SQL 对应的实时任务,假设 Orders 为 kafka,target_table 也为 Kafka,在执行时,会生成三个算子:

  • 数据源算子From Order):连接到 Kafka topic,数据源算子一直运行,实时的从 Orders Kafka 中一条一条的读取数据,然后一条一条发送给下游的 去重算子
  • 去重算子DISTINCT id):接收到上游算子发的一条一条的数据,然后判断这个 id 之前是否已经来过了,判断方式就是使用 Flink 中的 state 状态,如果状态中已经有这个 id 了,则说明已经来过了,不往下游算子发,如果状态中没有这个 id,则说明没来过,则往下游算子发,也是一条一条发给下游 数据汇算子
  • 数据汇算子INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Kafka 中。

在这里插入图片描述
对于实时任务,计算时的状态可能会无限增长。状态大小取决于不同 key(上述案例为 id 字段)的数量。为了防止状态无限变大,我们可以设置状态的 TTL。但是这可能会影响查询结果的正确性,比如某个 key 的数据过期从状态中删除了,那么下次再来这么一个 key,由于在状态中找不到,就又会输出一遍。

那么如果这个 SQL 放在 Hive 中执行时,假设其中 Orders 为 Hive 表,target_table 也为 Hive 表,其也会生成三个相同的算子(虽然可能会被优化为一个算子,这里为了方便对比,划分为三个进行介绍),但是其和实时任务的执行方式完全不同:

  • 数据源算子From Order):数据源从 Orders Hive 表(通常都有天、小时分区限制)中一次性读取所有的数据,然后将读到的数据全部发给下游 去重算子,然后 数据源算子 就运行结束了,释放资源了。
  • 去重算子DISTINCT id):接收到上游算子的所有数据,然后遍历所有数据进行去重,将去重完的所有结果数据发给下游 数据汇算子,然后 去重算子 就运行结束了,释放资源了。
  • 数据汇算子INSERT INTO target_table):接收到上游的所有数据,将所有数据都写到 target_table Hive 中,然后整个任务就运行结束了,整个任务的资源也就都释放了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/248088.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

成本更低、更可控,云原生可观测新计费模式正式上线

云布道师 在上云开始使用云产品过程中,企业一定遇见过两件“讨厌”事: 难以理解的复杂计费逻辑,时常冒出“这也能收费”的感叹; 某个配置参数调节之后,云产品使用成本不可预估的暴涨。 可观测作为企业 IT 运维必须品…

Python基础(二十九、pymsql)

文章目录 一、安装pymysql库二、代码实践1.连接MySQL数据库2.创建表格3.插入数据4.查询数据5.更新数据6.删除数据 三、完整代码示例四、结论 使用Python的pymysql库可以实现数据存储,这是一种连接MySQL数据库的方式。在本篇文章中,将详细介绍如何使用pym…

【LeetCode每日一题】56. 合并区间插入区间

一、判断区间是否重叠 力扣 252. 会议室 给定一个会议时间安排的数组 intervals ,每个会议时间都会包括开始和结束的时间 intervals[i] [starti, endi] ,请你判断一个人是否能够参加这里面的全部会议。 思路分析 因为一个人在同一时刻只能参加一个会…

Django知识随笔

目录 1.如何再ajax中传输post数据? 2.在form表单中使用jquery序列化,input框过多。 1.如何再ajax中传输post数据? 在ajax传递的那个网址,会调用你路由的视图函数,在视图函数上面加一句 csrf_exempt 。写上之后会有提…

获取依赖aar包的两种方式-在android studio里引入 如:glide

背景:我需要获取aar依赖到内网开发,内网几乎代表没网。 一、 如何需要获取依赖aar包 方式一:在官方的github中下载,耗时不建议 要从开发者网站、GitHub 存储库或其他来源获取 ‘com.github.bumptech.glide:glide:4.12.0’ AAR 包&#xff…

应急消防应用步入“繁花”时代,卓翼智能消防无人机顺势而行大有可为

近日,北京卓翼智能科技有限公司(以下简称“卓翼智能”)宣布完成超亿元B轮融资,融资金额高达2.5亿元。这个“智能无人系统”黑马品牌,凭什么出圈?重点发力在哪些领域呢?今天,带你走进…

CodeLocator 避免控制台弹出一堆错误日志

现象 使用codeLocator 插件 控制台经常打印出一堆的错误日志。和项目本身无关。影响了我们排查错误的效率。 解决办法 在Application的onCreate方法中加入下面的代码。 //避免控制台弹出一堆的错误日志CodeLocator.config(new CodeLocatorConfig.Builder().enableHookInfla…

xcode安装visionOS Simulator模拟器报错解决方法手动安装方法

手动安装方法: 手动下载visionOS Simulator模拟器地址: https://developer.apple.com/download/all/ 选择 Xcode 版本 sudo xcode-select -s /Applications/Xcode.app # 用 Xcode-beta 的话是: # xcode-select -s /Applications/Xcode-beta…

Shell中正则表达式

1.正则表达式介绍 1、正则表达式---通常用于判断语句中,用来检查某一字符串是否满足某一格式 2、正则表达式是由普通字符与元字符组成 3、普通字符包括大小写字母、数字、标点符号及一些其他符号 4、元字符是指在正则表达式中具有特殊意义的专用字符&#xff0c…

Java集合-Map接口(key-value)

Map接口的特点:①KV键值对方式存储②Key键唯一,Value允许重复③无序。 Map有四个实现类:1.HashMap类2.LinkedHashMap类3.TreeMap类4.Hashtable类 1.HashMap类: 存储结构:哈希表 数组Node[ ] 链表(红黑…

STM32的GPIO的详细配置指南

1. GPIO简介 GPIO(General Purpose Input/Output)是用于在微控制器中与外部世界通信的接口。通过GPIO,微控制器可以控制外部设备(如LED、LCD、按键等)的状态,也可以接收外部设备的状态(如传感器…

用AI工具一键生成原创文案的方法

一键生成原创文案对于文案工作者来说它是一种高效率创作文案内容的方法。文案工作者知道创作文案是一件消耗精力和时间的事情,遇到没有创作灵感,想要写一篇高质量的文案内容简直难上加难,因此,互联网上出现了一键生成原创文案的方…

Linux下安装edge

edge具有及其强大的功能,受到很多人的喜爱,它也开发Linux版本,下面是安装方法: 1.去edge官网下载Linux(.deb)文件。 https://www.microsoft.com/zh-cn/edge/download?formMA13FJ 2.下载之后输入以下指令(后面是安装…

【算法专题】贪心算法

贪心算法 贪心算法介绍1. 柠檬水找零2. 将数组和减半的最少操作次数3. 最大数4. 摆动序列(贪心思路)5. 最长递增子序列(贪心算法)6. 递增的三元子序列7. 最长连续递增序列8. 买卖股票的最佳时机9. 买卖股票的最佳时机Ⅱ(贪心算法)10. K 次取反后最大化的数组和11. 按身高排序12…

WPOpenSocial实现WordPress的QQ登录

个人建站不可避免的需要自己搭建用户数据库的问题,可用户却往往因为注册繁琐而放弃浏览您的网站,由此可见,一个社交账号一键登录方式尤为重要。选择适合您网站需求的社交插件,可以提升用户互动,增加社交分享&#xff0…

【C++】类和对象(一)

前言:在前面我们带大家初步步入了C,让大家大概知道了他的样子,那今天就可以说我们要正式步入C的大门了,这一章内容的细节比较多各位学习的时候一定要仔细。 💖 博主CSDN主页:卫卫卫的个人主页 💞 &#x1f…

Flink实战三_TableAPISQL

接上文:Flink实战二_DataStream API 1、Table API和SQL是什么? 接下来理解下Flink的整个客户端API体系,Flink为流式/批量处理应用程序提供了不同级别的抽象: 这四层API是一个依次向上支撑的关系。 Flink API 最底层的抽象就是有…

AR眼镜_ar智能眼镜显示方案|光学方案

AR眼镜是一种智能眼镜,能够将虚拟现实和现实世界相结合,使人们能够在日常生活中体验和参与虚拟现实。然而,AR智能眼镜的制造成本高,开发周期长。要实现AR眼镜的各项功能,需要良好的硬件条件,而AR智能眼镜的…

大专生能不能学习鸿蒙开发?

目前安卓有2,000万的开发者。本科及以上学历占比为35%;iOS有2,400万开发者,本科及以上学历占比为40% 绝大多数的前端开发者都是大专及以下学历,在2023年华为开发者大会上余承东透露华为的开发者目前有200万,但鸿蒙开发者统计的数据…

【Lazy ORM 整合druid 实现mysql监控】

Lazy ORM 整合druid 实现mysql监控 JDK 17 Lazy ORM框架地址 up、up欢迎start、issues 当前项目案例地址 框架版本描述spring-boot3.0.7springboot框架wu-framework-web1.2.2-JDK17-SNAPSHOTweb容器Lazy -ORM1.2.2-JDK17-SNAPSHOTORMmysql-connector-j8.0.33mysql驱动druid-…