FlinkSql 窗口函数

Windowing TVF

以前用的是Grouped Window Functions(分组窗口函数),但是分组窗口函数只支持窗口聚合

现在FlinkSql统一都是用的是Windowing TVFs(窗口表值函数),Windowing TVFs更符合 SQL 标准且更加强大,支持window join、Window aggregations、Window Top-N、Window Deduplication 

Windowing TVFs是 Flink 定义的多态表函数(Polymorphic Table Function,缩写PTF),PTF 是 SQL 2016 标准中的一种特殊的表函数,它可以把表作为一个参数

窗口函数

Flink 认为窗口把流分割为有限大小的 “桶”,这样就可以在其之上进行计算

有以下几种用法

  • 滚动窗口
  • 滑动窗口
  • 累积窗口
  • 会话窗口 (即将支持)

滚动窗口(TUMBLE)

TUMBLE 函数指定每个元素到一个指定大小的窗口中。滚动窗口的大小固定且不重复。

例如:假设指定了一个 5 分钟的滚动窗口。Flink 将每 5 分钟生成一个新的窗口,如下图所示:

TUMBLE 函数通过时间属性字段为每行数据分配一个窗口。 在流计算模式,时间属性字段必须被指定为事件或处理时间属性。在批计算模式,这个窗口表函数的时间属性字段必须是 TIMESTAMP 或 TIMESTAMP_LTZ 的类型

--TUMBLE(TABLE data, DESCRIPTOR(timecol), size [, offset ])
data :拥有时间属性列的表。
timecol :列描述符,决定数据的哪个时间属性列应该映射到窗口。
size :窗口的大小(时长)。
offset :窗口的偏移量 [非必填]。SELECT * FROM TABLE(TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES));SELECT window_start, window_end, SUM(price)FROM TABLE(TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))GROUP BY window_start, window_end;

滑动窗口(HOP)

滑动窗口函数指定元素到一个定长的窗口中。和滚动窗口很像,有窗口大小参数,另外增加了一个窗口滑动步长参数。如果滑动步长小于窗口大小,就能产生数据重叠的效果。在这个例子里,数据可以被分配在多个窗口。

例如:可以定义一个每5分钟滑动一次。大小为10分钟的窗口。每5分钟获得最近10分钟到达的数据的窗口,如下图所示:

HOP 函数通过时间属性字段为每一行数据分配了一个窗口。 在流计算模式,这个时间属性字段必须被指定为事件或处理时间属性。在批计算模式,这个窗口表函数的时间属性字段必须是 TIMESTAMP 或 TIMESTAMP_LTZ 的类型

-- HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])
data:拥有时间属性列的表。
timecol:列描述符,决定数据的哪个时间属性列应该映射到窗口。
slide:窗口的滑动步长。
size:窗口的大小(时长)。
offset:窗口的偏移量 [非必填]。SELECT * FROM TABLE(HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES));SELECT window_start, window_end, SUM(price)FROM TABLE(HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))GROUP BY window_start, window_end;

累积窗口(CUMULATE)

CUMULATE 函数指定元素到多个窗口,从初始的窗口开始,直到达到最大的窗口大小的窗口,所有的窗口都包含其区间内的元素,另外,窗口的开始时间是固定的。 你可以将 CUMULATE 函数视为首先应用具有最大窗口大小的 TUMBLE 窗口,然后将每个滚动窗口拆分为具有相同窗口开始但窗口结束步长不同的几个窗口。 所以累积窗口会产生重叠并且没有固定大小。

例如:1小时步长,24小时大小的累计窗口,每天可以获得如下这些窗口:[00:00, 01:00)[00:00, 02:00)[00:00, 03:00), …, [00:00, 24:00)

-- CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)
data:拥有时间属性列的表。
timecol:列描述符,决定数据的哪个时间属性列应该映射到窗口。
step:指定连续的累积窗口之间增加的窗口大小。
size:指定累积窗口的最大宽度的窗口时间。size必须是step的整数倍。
offset:窗口的偏移量 [非必填]。SELECT * FROM TABLE(CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES));SELECT window_start, window_end, SUM(price)FROM TABLE(CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))GROUP BY window_start, window_end;

窗口偏移

上诉窗口都有一个 offset 参数,默认值就是 0,所以窗口默认都是整点启动的

比如10分钟的滚动窗口:TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES),只会生成[2021-06-29 23:40:00, 2021-06-29 00:50:00),[2021-06-29 23:50:00, 2021-06-30 00:00:00),window_start 和 window_end 和数据的时间无关

offset 就是用来调整窗口偏移的,当 offset 为 -16 MINUTE,时间戳为 2021-06-30 00:00:04 的数据会分配到窗口 [2021-06-29 23:54:00, 2021-06-30 00:04:00)。

窗口函数进阶用法

flink开窗需要写上windowend,否则只是带了一个windowstart的时间而已,并没有真正开启窗口

Window Aggregation

窗口聚合是通过 GROUP BY 子句定义的,其特征是包含 窗口表值函数 产生的 “window_start” 和 “window_end” 列。和普通的 GROUP BY 子句一样,窗口聚合对于每个组会计算出一行数据。

SELECT window_start, window_end, SUM(price)FROM TABLE(TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))GROUP BY window_start, window_end;
+------------------+------------------+-------+
|     window_start |       window_end | price |
+------------------+------------------+-------+
| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
+------------------+------------------+-------+

并且支持多级窗口聚合 

-- tumbling 5 minutes for each supplier_id
CREATE VIEW window1 AS
-- Note: The window start and window end fields of inner Window TVF are optional in the select clause. However, if they appear in the clause, they need to be aliased to prevent name conflicting with the window start and window end of the outer Window TVF.
SELECT window_start as window_5mintumble_start, window_end as window_5mintumble_end, window_time as rowtime, SUM(price) as partial_priceFROM TABLE(TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))GROUP BY supplier_id, window_start, window_end, window_time;-- tumbling 10 minutes on the first window
SELECT window_start, window_end, SUM(partial_price) as total_priceFROM TABLE(TUMBLE(TABLE window1, DESCRIPTOR(rowtime), INTERVAL '10' MINUTES))GROUP BY window_start, window_end;

下面是分组窗口聚合的写法,分组窗口聚合已经过时,官网不推荐使用了

SELECTuser,TUMBLE_START(order_time, INTERVAL '1' DAY) AS wStart,SUM(amount) FROM Orders
GROUP BYTUMBLE(order_time, INTERVAL '1' DAY),user

Window Join

在流式查询中,与其他连续表上的关联不同,窗口关联不产生中间结果,只在窗口结束产生一个最终的结果。另外,窗口关联会清除不需要的中间状态

目前使用时有一些限制:

目前,窗口关联需要在 join on 条件中包含两个输入表的 window_start 等值条件和 window_end 等值条件

目前,关联的左右两边必须使用相同的窗口表值函数。这个规则在未来可以扩展,比如:滚动和滑动窗口在窗口大小相同的情况下 join。

语法上支持 INNER、LEFT、RIGHT、FULL OUTER、ANTI、SEMI JOIN。而且,窗口关联可以在其他基于 窗口表值函数 的操作后使用,例如 窗口聚合,窗口 Top-N 和 窗口关联

SELECT l.id as l_id,r.id as r_id,l.window_start,l.window_end
FROM (SELECT * FROM TABLE(TUMBLE(TABLE t_left, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))) l
INNER JOIN (SELECT * FROM TABLE(TUMBLE(TABLE t_right, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))) r
ON l.id = r.id 
AND l.window_start = r.window_start 
AND l.window_end = r.window_end;

Window TopN

与普通Top-N不同,窗口Top-N只在窗口最后返回汇总的Top-N数据,不会产生中间结果。窗口 Top-N 会在窗口结束后清除不需要的中间状态

窗口 Top-N 适用于用户不需要每条数据都更新Top-N结果的场景,相对普通Top-N来说性能更好

SELECT *FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownumFROM TABLE(TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))) WHERE rownum <= 3;

还可以在窗口聚合后在进行窗口 Top-N

SELECT *FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownumFROM (SELECT window_start, window_end, supplier_id, SUM(price) as price, COUNT(*) as cntFROM TABLE(TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))GROUP BY window_start, window_end, supplier_id)) WHERE rownum <= 3;

Window Deduplication

窗口去重是一种特殊的 去重,它根据指定的多个列来删除重复的行,保留每个窗口和分区键的第一个或最后一个数据

对于流式查询,与普通去重不同,窗口去重只在窗口的最后返回结果数据,不会产生中间结果。它会清除不需要的中间状态。 因此,窗口去重查询在用户不需要更新结果时,性能较好

Window Deduplication是一种特殊的窗口 Top-N:N是1并且是根据处理时间或事件时间排序的(目前只支持根据事件时间属性进行排序),支持在其他窗口操作上进行去重操作,比如 窗口聚合,窗口TopN 和 窗口关联

SELECT *FROM (SELECT bidtime, price, item, supplier_id, window_start, window_end, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY bidtime DESC) AS rownumFROM TABLE(TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))) WHERE rownum <= 1;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/254300.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

第5章 数据库操作

学习目标 了解数据库&#xff0c;能够说出数据库的概念、特点和分类 熟悉Flask-SQLAlchemy的安装&#xff0c;能够在Flask程序中独立安装扩展包Flask-SQLAlchemy 掌握数据库的连接方式&#xff0c;能够通过设置配置项SQLALCHEMY_DATABASE_URI的方式连接数据库 掌握模型的定义…

rust语言tokio库底层原理解析

目录 1 rust版本及tokio版本说明1 tokio简介2 tokio::main2.1 tokio::main使用多线程模式2.2 tokio::main使用单线程模式 3 builder.build()函数3.1 build_threaded_runtime()函数新的改变功能快捷键合理的创建标题&#xff0c;有助于目录的生成如何改变文本的样式插入链接与图…

【开源】JAVA+Vue.js实现高校实验室管理系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、研究内容2.1 实验室类型模块2.2 实验室模块2.3 实验管理模块2.4 实验设备模块2.5 实验订单模块 三、系统设计3.1 用例设计3.2 数据库设计 四、系统展示五、样例代码5.1 查询实验室设备5.2 实验放号5.3 实验预定 六、免责说明 一、摘…

使用 Docker 镜像预热提升容器启动效率详解

概要 在容器化部署中,Docker 镜像的加载速度直接影响到服务的启动时间和扩展效率。本文将深入探讨 Docker 镜像预热的概念、必要性以及实现方法。通过详细的操作示例和实践建议,读者将了解如何有效地实现镜像预热,以加快容器启动速度,提高服务的响应能力。 Docker 镜像预热…

【数据结构】堆(创建,调整,插入,删除,运用)

目录 堆的概念&#xff1a; 堆的性质&#xff1a; 堆的存储方式&#xff1a; 堆的创建 &#xff1a; 堆的调整&#xff1a; 向下调整&#xff1a; 向上调整&#xff1a; 堆的创建&#xff1a; 建堆的时间复杂度&#xff1a; 向下调整&#xff1a; 向上调整&#xff…

红队打靶练习:GLASGOW SMILE: 1.1

目录 信息收集 1、arp 2、nmap 3、nikto 4、whatweb 目录探测 1、gobuster 2、dirsearch WEB web信息收集 /how_to.txt /joomla CMS利用 1、爆破后台 2、登录 3、反弹shell 提权 系统信息收集 rob用户登录 abner用户 penguin用户 get root flag 信息收集…

HARRYPOTTER: FAWKES

攻击机 192.168.223.128 目标机192.168.223.143 主机发现 nmap -sP 192.168.223.0/24 端口扫描 nmap -sV -p- -A 192.168.223.143 开启了21 22 80 2222 9898 五个端口&#xff0c;其中21端口可以匿名FTP登录&#xff0c;好像有点说法,百度搜索一下发现可以用anonymous登录…

网络安全产品之认识准入控制系统

文章目录 一、什么是准入控制系统二、准入控制系统的主要功能1. 接入设备的身份认证2. 接入设备的安全性检查 三、准入控制系统的工作原理四、准入控制系统的特点五、准入控制系统的部署方式1. 网关模式2. 控制旁路模式 六、准入控制系统的应用场景七、企业如何利用准入控制系统…

使用PDFBox实现pdf转其他图片格式

最近在做一个小项目&#xff0c;项目中有一个功能要把pdf格式的图片转换为其它格式&#xff0c;接下来看看用pdfbox来如何实现吧。 首先导入pdfbox相关依赖&#xff1a; <dependency> <groupId>org.apache.pdfbox</groupId> <artifactId>pdfbox</a…

【高阶数据结构】位图布隆过滤器

文章目录 1. 位图1.1什么是位图1.2为什么会有位图1.3 实现位图1.4 位图的应用 2. 布隆过滤器2.1 什么是布隆过滤器2.2 为什么会有布隆过滤器2.3 布隆过滤器的插入2.4 布隆过滤器的查找2.5 布隆过滤器的模拟实现2.6 布隆过滤器的优点2.7 布隆过滤器缺陷 3. 海量数据面试题3.1 哈…

CTFshow web(命令执行29-36)

?ceval($_GET[shy]);&shypassthru(cat flag.php); #逃逸过滤 ?cinclude%09$_GET[shy]?>&shyphp://filter/readconvert.base64-encode/resourceflag.php #文件包含 ?cinclude%0a$_GET[cmd]?>&cmdphp://filter/readconvert.base64-encode/…

Kubernetes实战(二十七)-HPA实战

1 HPA简介 HPA 全称是 Horizontal Pod Autoscaler&#xff0c;用于POD 水平自动伸缩&#xff0c; HPA 可以 基于 POD CPU 利用率对 deployment 中的 pod 数量进行自动扩缩容&#xff08;除了 CPU 也可以基于自定义的指标进行自动扩缩容&#xff09;。pod 自动缩放不适用于无法…

ubuntu22.04@laptop OpenCV Get Started: 001_reading_displaying_write_image

ubuntu22.04laptop OpenCV Get Started: 001_reading_displaying_write_image 1. 源由2. Read/Display/Write应用Demo2.1 C应用Demo2.2 Python应用Demo 3. 过程分析3.1 导入OpenCV库3.2 读取图像文件3.3 显示图像3.4 保存图像文件 4. 总结5. 参考资料 1. 源由 读、写、显示图像…

Windows - URL Scheme - 在Windows上无管理员权限为你的程序添加URL Scheme

Windows - URL Scheme - 在Windows上无管理员权限为你的程序添加URL Scheme What 想不想在浏览器打开/控制你的电脑应用&#xff1f; 比如我在浏览器地址栏输入wegame://后回车会提示是否打开URL:wegame Portocol。 若出现了始终允许选项&#xff0c;你甚至可以写一个Web界面…

【AIGC核心技术剖析】DreamCraft3D一种层次化的3D内容生成方法

DreamCraft3D是一种用于生成高保真、连贯3D对象的层次化3D内容生成方法。它利用2D参考图像引导几何塑造和纹理增强阶段&#xff0c;通过视角相关扩散模型执行得分蒸馏采样&#xff0c;解决了现有方法中存在的一致性问题。使用Bootstrapped Score Distillation来提高纹理&#x…

React 实现表单组件

表单是html的基础元素&#xff0c;接下来我会用React实现一个表单组件。支持包括输入状态管理&#xff0c;表单验证&#xff0c;错误信息展示&#xff0c;表单提交&#xff0c;动态表单元素等功能。 数据状态 表单元素的输入状态管理&#xff0c;可以基于react state 实现。 …

计算机网络——04接入网和物理媒体

接入网和物理媒体 接入网络和物理媒体 怎样将端系统和边缘路由器连接&#xff1f; 住宅接入网络单位接入网络&#xff08;学校、公司&#xff09;无线接入网络 住宅接入&#xff1a;modem 将上网数据调制加载到音频信号上&#xff0c;在电话线上传输&#xff0c;在局端将其…

Ubuntu22.04 gnome-builder gnome C 应用程序习练笔记(一)

一、序言 gnome-builder构建器是gnome程序开发的集成环境&#xff0c;支持主力语言C, C, Vala, jscript, python等&#xff0c;界面以最新的 gtk 4.12 为主力&#xff0c;将其下版本的gtk直接压入了depreciated&#xff0c;但gtk4.12与普遍使用的gtk3有很大区别&#xff0c;原…

Java stream 流的基本使用

Java stream 的基本使用 package com.zhong.streamdemo.usestreamdemo;import jdk.jfr.DataAmount; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor;import java.util.ArrayList; import java.util.Comparator; import java.util.Li…

elasticsearch重置密码操作

安装es的时候需要测试这个url&#xff1a;http://127.0.0.1:9200/ 出现弹窗让我输入账号和密码。我第一次登录&#xff0c;没有设置过账号和密码&#xff0c; 解决方法是&#xff1a;在es的bin目录下打开cmd窗口&#xff0c;敲命令&#xff1a;.\elasticsearch-reset-password…