Flink SQL 语法篇(四):Group 聚合
- 1.基础概念
- 2.窗口聚合和 Group 聚合
- 3.SQL 语义
- 4.Group 聚合支持 Grouping sets、Rollup、Cube
1.基础概念
Group 聚合定义(支持 Batch / Streaming 任务):Flink 也支持 Group 聚合。Group 聚合和上面介绍到的窗口聚合的不同之处,就在于 Group 聚合是按照数据的类别进行分组,比如年龄、性别,是横向的;而窗口聚合是在时间粒度上对数据进行分组,是纵向的。如下图所示,就展示出了其区别。其中 按颜色分 key(横向)就是 Group 聚合,按窗口划分(纵向)就是 窗口聚合。
2.窗口聚合和 Group 聚合
应用场景:一般用于对数据进行分组,然后后续使用聚合函数进行 count
、sum
等聚合操作。
那么这时候,小伙伴萌就会问到,我其实可以把窗口聚合的写法也转换为 Group 聚合,只需要把 Group 聚合的 Group By key
换成时间就行,那这两个聚合的区别到底在哪?
首先来举一个例子看看怎么将 窗口聚合 转换为 Group 聚合。假如一个窗口聚合是按照 1 1 1 分钟的粒度进行聚合,如下 滚动窗口 SQL:
-- 数据源表
CREATE TABLE source_table (-- 维度数据dim STRING,-- 用户 iduser_id BIGINT,-- 用户price BIGINT,-- 事件时间戳row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),-- watermark 设置WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH ('connector' = 'datagen','rows-per-second' = '10','fields.dim.length' = '1','fields.user_id.min' = '1','fields.user_id.max' = '100000','fields.price.min' = '1','fields.price.max' = '100000'
)-- 数据汇表
CREATE TABLE sink_table (dim STRING,pv BIGINT,sum_price BIGINT,max_price BIGINT,min_price BIGINT,uv BIGINT,window_start bigint
) WITH ('connector' = 'print'
)-- 数据处理逻辑
insert into sink_table
select dim,count(*) as pv,sum(price) as sum_price,max(price) as max_price,min(price) as min_price,-- 计算 uv 数count(distinct user_id) as uv,UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS STRING)) * 1000 as window_start
from source_table
group bydim,-- 按照 Flink SQL tumble 窗口写法划分窗口tumble(row_time, interval '1' minute)
转换为 Group 聚合 的写法如下:
-- 数据源表
CREATE TABLE source_table (-- 维度数据dim STRING,-- 用户 iduser_id BIGINT,-- 用户price BIGINT,-- 事件时间戳row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),-- watermark 设置WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH ('connector' = 'datagen','rows-per-second' = '10','fields.dim.length' = '1','fields.user_id.min' = '1','fields.user_id.max' = '100000','fields.price.min' = '1','fields.price.max' = '100000'
);-- 数据汇表
CREATE TABLE sink_table (dim STRING,pv BIGINT,sum_price BIGINT,max_price BIGINT,min_price BIGINT,uv BIGINT,window_start bigint
) WITH ('connector' = 'print'
);-- 数据处理逻辑
insert into sink_table
select dim,count(*) as pv,sum(price) as sum_price,max(price) as max_price,min(price) as min_price,-- 计算 uv 数count(distinct user_id) as uv,cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint) as window_start
from source_table
group bydim,-- 将秒级别时间戳 / 60 转化为 1mincast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint)
确实没错,上面这个转换是一点问题都没有的。
但是窗口聚合和 Group by 聚合的差异在于:
- 本质区别:窗口聚合是具有时间语义的,其本质是想实现窗口结束输出结果之后,后续有迟到的数据也不会对原有的结果发生更改了,即输出结果值是定值(不考虑
allowLateness
)。而 Group by 聚合是没有时间语义的,不管数据迟到多长时间,只要数据来了,就把上一次的输出的结果数据撤回,然后把计算好的新的结果数据发出。 - 运行层面:窗口聚合是和 时间 绑定的,窗口聚合其中窗口的计算结果触发都是由 时间(Watermark)推动的。Group by 聚合完全由 数据 推动触发计算,新来一条数据去根据这条数据进行计算出结果发出;由此可见两者的实现方式也大为不同。
3.SQL 语义
SQL 语义这里也拿离线和实时做对比,Order
为 Kafka,target_table
为 Kafka,这个 SQL 生成的实时任务,在执行时,会生成三个算子。
- 数据源算子(
From Order
):数据源算子一直运行,实时的从Order
Kafka 中一条一条的读取数据,然后一条一条发送给下游的 Group 聚合算子,向下游发送数据的shuffle
策略是根据group by
中的key
进行发送,相同的key
发到同一个 SubTask(并发) 中。 - Group 聚合算子(
group by key
+sum / count / max / min
):接收到上游算子发的一条一条的数据,去状态state
中找这个key
之前的sum / count / max / min
结果。如果有结果oldResult
,拿出来和当前的数据进行sum / count / max / min
计算出这个key
的新结果newResult
,并将新结果[key, newResult]
更新到state
中,在向下游发送新计算的结果之前,先发一条撤回上次结果的消息-[key, oldResult]
,然后再将新结果发往下游+[key, newResult]
;如果state
中没有当前key
的结果,则直接使用当前这条数据计算sum / max / min
结果newResult
,并将新结果[key, newResult]
更新到state
中,当前是第一次往下游发,则不需要先发回撤消息,直接发送+[key, newResult]
。 - 数据汇算子(
INSERT INTO target_table
):接收到上游发的一条一条的数据,写入到target_table
Kafka 中这个实时任务也是 24 24 24 小时一直在运行的,所有的算子在同一时刻都是处于running
状态的。
4.Group 聚合支持 Grouping sets、Rollup、Cube
Group 聚合也支持 Grouping sets
、Rollup
、Cube
。举一个 Grouping sets
的案例:
SELECT supplier_id, rating, product_id, COUNT(*)
FROM (VALUES('supplier1', 'product1', 4),('supplier1', 'product2', 3),('supplier2', 'product3', 3),('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SET (( supplier_id, product_id, rating ),( supplier_id, product_id ),( supplier_id, rating ),( supplier_id ),( product_id, rating ),( product_id ),( rating ),( )
)