【大数据】Flink SQL 语法篇(四):Group 聚合、Over 聚合

Flink SQL 语法篇(四):Group 聚合、Over 聚合

  • 1.Group 聚合
    • 1.1 基础概念
    • 1.2 窗口聚合和 Group 聚合
    • 1.3 SQL 语义
    • 1.4 Group 聚合支持 Grouping sets、Rollup、Cube
  • 2.Over 聚合
    • 2.1 时间区间聚合
    • 2.2 行数聚合

1.Group 聚合

1.1 基础概念

Group 聚合定义(支持 Batch / Streaming 任务):Flink 也支持 Group 聚合。Group 聚合和上面介绍到的窗口聚合的不同之处,就在于 Group 聚合是按照数据的类别进行分组,比如年龄、性别,是横向的;而窗口聚合是在时间粒度上对数据进行分组,是纵向的。如下图所示,就展示出了其区别。其中 按颜色分 key(横向)就是 Group 聚合按窗口划分(纵向)就是 窗口聚合

在这里插入图片描述

1.2 窗口聚合和 Group 聚合

应用场景:一般用于对数据进行分组,然后后续使用聚合函数进行 countsum 等聚合操作。

那么这时候,小伙伴萌就会问到,我其实可以把窗口聚合的写法也转换为 Group 聚合,只需要把 Group 聚合的 Group By key 换成时间就行,那这两个聚合的区别到底在哪?

首先来举一个例子看看怎么将 窗口聚合 转换为 Group 聚合。假如一个窗口聚合是按照 1 1 1 分钟的粒度进行聚合,如下 滚动窗口 SQL:

-- 数据源表
CREATE TABLE source_table (-- 维度数据dim STRING,-- 用户 iduser_id BIGINT,-- 用户price BIGINT,-- 事件时间戳row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),-- watermark 设置WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH ('connector' = 'datagen','rows-per-second' = '10','fields.dim.length' = '1','fields.user_id.min' = '1','fields.user_id.max' = '100000','fields.price.min' = '1','fields.price.max' = '100000'
)-- 数据汇表
CREATE TABLE sink_table (dim STRING,pv BIGINT,sum_price BIGINT,max_price BIGINT,min_price BIGINT,uv BIGINT,window_start bigint
) WITH ('connector' = 'print'
)-- 数据处理逻辑
insert into sink_table
select dim,count(*) as pv,sum(price) as sum_price,max(price) as max_price,min(price) as min_price,-- 计算 uv 数count(distinct user_id) as uv,UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS STRING)) * 1000  as window_start
from source_table
group bydim,-- 按照 Flink SQL tumble 窗口写法划分窗口tumble(row_time, interval '1' minute)

转换为 Group 聚合 的写法如下:

-- 数据源表
CREATE TABLE source_table (-- 维度数据dim STRING,-- 用户 iduser_id BIGINT,-- 用户price BIGINT,-- 事件时间戳row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),-- watermark 设置WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH ('connector' = 'datagen','rows-per-second' = '10','fields.dim.length' = '1','fields.user_id.min' = '1','fields.user_id.max' = '100000','fields.price.min' = '1','fields.price.max' = '100000'
);-- 数据汇表
CREATE TABLE sink_table (dim STRING,pv BIGINT,sum_price BIGINT,max_price BIGINT,min_price BIGINT,uv BIGINT,window_start bigint
) WITH ('connector' = 'print'
);-- 数据处理逻辑
insert into sink_table
select dim,count(*) as pv,sum(price) as sum_price,max(price) as max_price,min(price) as min_price,-- 计算 uv 数count(distinct user_id) as uv,cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint) as window_start
from source_table
group bydim,-- 将秒级别时间戳 / 60 转化为 1mincast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint)

确实没错,上面这个转换是一点问题都没有的。

但是窗口聚合和 Group by 聚合的差异在于:

  • 本质区别窗口聚合是具有时间语义的,其本质是想实现窗口结束输出结果之后,后续有迟到的数据也不会对原有的结果发生更改了,即输出结果值是定值(不考虑 allowLateness)。而 Group by 聚合是没有时间语义的,不管数据迟到多长时间,只要数据来了,就把上一次的输出的结果数据撤回,然后把计算好的新的结果数据发出。
  • 运行层面:窗口聚合是和 时间 绑定的,窗口聚合其中窗口的计算结果触发都是由 时间(Watermark)推动的。Group by 聚合完全由 数据 推动触发计算,新来一条数据去根据这条数据进行计算出结果发出;由此可见两者的实现方式也大为不同。

1.3 SQL 语义

SQL 语义这里也拿离线和实时做对比,Order 为 Kafka,target_table 为 Kafka,这个 SQL 生成的实时任务,在执行时,会生成三个算子。

  • 数据源算子From Order):数据源算子一直运行,实时的从 Order Kafka 中一条一条的读取数据,然后一条一条发送给下游的 Group 聚合算子,向下游发送数据的 shuffle 策略是根据 group by 中的 key 进行发送,相同的 key 发到同一个 SubTask(并发) 中。
  • Group 聚合算子group by key + sum / count / max / min):接收到上游算子发的一条一条的数据,去状态 state 中找这个 key 之前的 sum / count / max / min 结果。如果有结果 oldResult,拿出来和当前的数据进行 sum / count / max / min 计算出这个 key 的新结果 newResult,并将新结果 [key, newResult] 更新到 state 中,在向下游发送新计算的结果之前,先发一条撤回上次结果的消息 -[key, oldResult],然后再将新结果发往下游 +[key, newResult];如果 state 中没有当前 key 的结果,则直接使用当前这条数据计算 sum / max / min 结果 newResult,并将新结果 [key, newResult] 更新到 state 中,当前是第一次往下游发,则不需要先发回撤消息,直接发送 +[key, newResult]
  • 数据汇算子INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Kafka 中这个实时任务也是 24 24 24 小时一直在运行的,所有的算子在同一时刻都是处于 running 状态的。

1.4 Group 聚合支持 Grouping sets、Rollup、Cube

Group 聚合也支持 Grouping setsRollupCube。举一个 Grouping sets 的案例:

SELECT supplier_id, rating, product_id, COUNT(*)
FROM (VALUES('supplier1', 'product1', 4),('supplier1', 'product2', 3),('supplier2', 'product3', 3),('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SET (( supplier_id, product_id, rating ),( supplier_id, product_id         ),( supplier_id,             rating ),( supplier_id                     ),(              product_id, rating ),(              product_id         ),(                          rating ),(                                 )
)

2.Over 聚合

Over 聚合定义(支持 Batch / Streaming):可以理解为是一种特殊的滑动窗口聚合函数。

那这里我们拿 Over 聚合窗口聚合 做一个对比,其之间的最大不同之处在于:

  • 窗口聚合:不在 group by 中的字段,不能直接在 select 中拿到。
  • Over 聚合:能够保留原始字段。

注意:其实在生产环境中,Over 聚合的使用场景还是比较少的。在 Hive 中也有相同的聚合,但是小伙伴萌可以想想你在离线数仓经常使用嘛?

  • 应用场景:计算最近一段滑动窗口的聚合结果数据。
  • 实际案例:查询每个产品最近一小时订单的金额总和。
SELECT order_id, order_time, amount,SUM(amount) OVER (PARTITION BY productORDER BY order_timeRANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) AS one_hour_prod_amount_sum
FROM Orders
  • Over 聚合的语法总结如下:
SELECTagg_func(agg_col) OVER ([PARTITION BY col1[, col2, ...]]ORDER BY time_colrange_definition),...
FROM ...
  • ORDER BY:必须是时间戳列(事件时间、处理时间)。
  • PARTITION BY:标识了聚合窗口的聚合粒度,如上述案例是按照 product 进行聚合。
  • range_definition:这个标识聚合窗口的聚合数据范围,在 Flink 中有两种指定数据范围的方式。第一种为 按照行数聚合,第二种为 按照时间区间聚合。如下案例所示。

2.1 时间区间聚合

按照时间区间聚合就是时间区间的一个滑动窗口,比如下面案例 1 1 1 小时的区间,最新输出的一条数据的 sum 聚合结果就是最近一小时数据的 amount 之和。

CREATE TABLE source_table (order_id BIGINT,product BIGINT,amount BIGINT,order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),WATERMARK FOR order_time AS order_time - INTERVAL '0.001' SECOND
) WITH ('connector' = 'datagen','rows-per-second' = '1','fields.order_id.min' = '1','fields.order_id.max' = '2','fields.amount.min' = '1','fields.amount.max' = '10','fields.product.min' = '1','fields.product.max' = '2'
);CREATE TABLE sink_table (product BIGINT,order_time TIMESTAMP(3),amount BIGINT,one_hour_prod_amount_sum BIGINT
) WITH ('connector' = 'print'
);INSERT INTO sink_table
SELECT product, order_time, amount,SUM(amount) OVER (PARTITION BY productORDER BY order_time-- 标识统计范围是一个 product 的最近 1 小时的数据RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) AS one_hour_prod_amount_sum
FROM source_table

2.2 行数聚合

按照行数聚合就是数据行数的一个滑动窗口,比如下面案例,最新输出的一条数据的 sum 聚合结果就是最近 5 5 5 行数据的 amount 之和。

CREATE TABLE source_table (order_id BIGINT,product BIGINT,amount BIGINT,order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),WATERMARK FOR order_time AS order_time - INTERVAL '0.001' SECOND
) WITH ('connector' = 'datagen','rows-per-second' = '1','fields.order_id.min' = '1','fields.order_id.max' = '2','fields.amount.min' = '1','fields.amount.max' = '2','fields.product.min' = '1','fields.product.max' = '2'
);CREATE TABLE sink_table (product BIGINT,order_time TIMESTAMP(3),amount BIGINT,one_hour_prod_amount_sum BIGINT
) WITH ('connector' = 'print'
);INSERT INTO sink_table
SELECT product, order_time, amount,SUM(amount) OVER (PARTITION BY productORDER BY order_time-- 标识统计范围是一个 product 的最近 5 行数据ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) AS one_hour_prod_amount_sum
FROM source_table

预跑结果如下:

+I[2, 2021-12-24T22:18:19.147, 1, 9]
+I[1, 2021-12-24T22:18:20.147, 2, 11]
+I[1, 2021-12-24T22:18:21.147, 2, 12]
+I[1, 2021-12-24T22:18:22.147, 2, 12]
+I[1, 2021-12-24T22:18:23.148, 2, 12]
+I[1, 2021-12-24T22:18:24.147, 1, 11]
+I[1, 2021-12-24T22:18:25.146, 1, 10]
+I[1, 2021-12-24T22:18:26.147, 1, 9]
+I[2, 2021-12-24T22:18:27.145, 2, 11]
+I[2, 2021-12-24T22:18:28.148, 1, 10]
+I[2, 2021-12-24T22:18:29.145, 2, 10]

当然,如果你在一个 SELECT 中有多个聚合窗口的聚合方式,Flink SQL 支持了一种简化写法,如下案例:

SELECT order_id, order_time, amount,SUM(amount) OVER w AS sum_amount,AVG(amount) OVER w AS avg_amount
FROM Orders
-- 使用下面子句,定义 Over Window
WINDOW w AS (PARTITION BY productORDER BY order_timeRANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/264815.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

019 Spring Boot+Vue 电影院会员管理系统(源代码+数据库+文档)

部分代码地址: https://github.com/XinChennn/xc019-cinema 一、系统介绍 cinema项目是一套电影院会员管理系统,使用前后端分离架构开发包含管理员、会员管理、会员卡管理、电影票、消费记录、数据统计等模块 二、所用技术 后端技术栈: …

【Flink精讲】Flink组件通信

主要指三个进程中的通讯 CliFrontendYarnJobClusterEntrypointTaskExecutorRunner Flink内部节点之间的通讯使用Akka,比如JobManager和TaskManager之间。而operator之间的数据传输是利用Netty。 RPC是统称,Akka,Netty是实现 Akka与Ac…

热闹元宵进行中,如何利用VR全景展示民宿品牌形象?

错峰出游闹元宵,元宵节恰逢周末,而且还是春节假期返工之后的首个休息日,不少人都想通过短途度假来缓解“节后综合征”。两位数的特价机票、打折的各种酒店让你实现“旅行自由”,那么如何知道特价酒店服务好不好呢?先别…

Docker Volume

"Ice in my vein" Docker Volume(存储卷) 什么是存储卷? 存储卷就是: “将宿主机的本地文件系统中存在的某个目录,与容器内部的文件系统上的某一目录建立绑定关系”。 存储卷与容器本身的联合文件系统? 在宿主机上的这个与容器形成绑定关系…

实用区块链应用:去中心化投票系统的部署与实施

一、需求分析背景 随着技术的发展,传统的投票系统面临着越来越多的挑战,如中心化控制、透明度不足和易受攻击等问题。为了解决这些问题,我们可以利用区块链技术去中心化、透明性和安全性来构建一个去中心化投票系统。这样的系统能够确保投票过…

vue2.0及起步(前端面试知识积累)

1、需要了解的vue概要知识 1、vue是什么? 一套用于构建用户界面的渐进式JavaScript框架。 为什么vue被称为是渐进式JS框架? 答:Vue允许开发者在不同的项目中以渐进式的方式使用它,这种渐进式表现在以下的方面: 逐步采…

数据可视化--了解数据可视化和Excel数据可视化

目录 1.1科学可视化: 可视化是模式、关系、异常 1.2三基色原理: 三基色:红色、绿色和蓝色 1.3Excel数据可视化 1.3.1 excel数据分析-13个图表可视化技巧 1.3.2 excel数据分析-28个常用可视化图表(video) 1.3.3Excel可视化…

Java面试——锁

​ 公平锁: 是指多个线程按照申请锁的顺序来获取锁,有点先来后到的意思。在并发环境中,每个线程在获取锁时会先查看此锁维护的队列,如果为空,或者当前线程是等待队列的第一个,就占有锁,否则就会…

Apache Doris 发展历程、技术特性及云原生时代的未来规划

本文节选自《基础软件之路:企业级实践及开源之路》一书,该书集结了中国几乎所有主流基础软件企业的实践案例,由 28 位知名专家共同编写,系统剖析了基础软件发展趋势、四大基础软件(数据库、操作系统、编程语言与中间件…

js里面有引用传递吗?

一:什么是引用传递 引用传递是相对于值传递的。那什么是值传递呢?值传递就是在传递过程中再复制一份,然后再赋值给变量,例如: let a 2; let b a;在这个代码中,let b a; 就是一个值传递,首先…

深度学习手写字符识别:推理过程

说明 本篇博客主要是跟着B站中国计量大学杨老师的视频实战深度学习手写字符识别。 第一个深度学习实例手写字符识别 深度学习环境配置 可以参考下篇博客,网上也有很多教程,很容易搭建好深度学习的环境。 Windows11搭建GPU版本PyTorch环境详细过程 数…

设计模式(十) - 工厂方式模式

前言 在此前的设计模式(四)简单工厂模式中我们介绍了简单工厂模式,在这篇文章中我们来介绍下工厂方法模式,它同样是创建型设计模式,而且又有些类似,文章的末尾会介绍他们之间的不同。 1.工厂方法模式简介 …

小程序性能优化

背景 在开发小程序的过程中我们发现,小程序的经常会遇到性能问题,尤其是在微信开发者工具的时候更是格外的卡,经过排查发现,卡顿的页面有这么多的js代码需要加载,而且都是在进入这个页面的时候加载,这就会…

面试redis篇-10Redis集群方案-主从复制

在Redis中提供的集群方案总共有三种: 主从复制哨兵模式分片集群主从复制 单节点Redis的并发能力是有上限的,要进一步提高Redis的并发能力,就需要搭建主从集群,实现读写分离。 主从数据同步原理 Replication Id:简称replid,是数据集的标记,id一致则说明是同一数据集。每…

React18源码: Fiber树的初次创建过程图文详解

fiber树构造(初次创建) fiber树构造的2种情况: 1.初次创建 在React应用首次启动时,界面还没有渲染此时并不会进入对比过程,相当于直接构造一棵全新的树 2.对比更新 React应用启动后,界面已经渲染如果再次发…

软考45-上午题-【数据库】-数据操纵语言DML

一、INSERT插入语句 向SQL的基本表中插入数据有两种方式: ①直接插入元组值 ②插入一个查询的结果值 1-1、直接插入元组值 【注意】: 列名序列是可选的,若是所有列都要插入数值,则可以不写列名序列。 示例: 1-2、插…

跟着cherno手搓游戏引擎【26】Profile和Profile网页可视化

封装Profile: Sandbox2D.h:ProfileResult结构体和ProfileResult容器,存储相应的信息 #pragma once #include "YOTO.h" class Sandbox2D :public YOTO::Layer {public:Sandbox2D();virtual ~Sandbox2D() default;virtual void OnAttach()ove…

微信小程序的医院体检预约管理系统springboot+uniapp+python

本系统设计的目的是建立一个简化信息管理工作、便于操作的体检导引平台。共有以下四个模块: uni-app框架:使用Vue.js开发跨平台应用的前端框架,编写一套代码,可编译到Android、小程序等平台。 语言:pythonjavanode.js…

React Hooks概述及常用的React Hooks介绍

Hook可以让你在不编写class的情况下使用state以及其他React特性 useState ● useState就是一个Hook ● 通过在函数组件里调用它来给组件添加一些内部state,React会在重复渲染时保留这个state 纯函数组件没有状态,useState()用于设置和使用组件的状态属性。语法如下…

传统推荐算法库使用--mahout初体验

文章目录 前言环境准备调用混合总结 前言 郑重声明:本博文做法仅限毕设糊弄老师使用,不建议生产环境使用!!! 老项目缝缝补补又是三年,本来是打算直接重写写个社区然后给毕设使用的。但是怎么说呢&#xff…