【大数据】Flink SQL 语法篇(四):Group 聚合

Flink SQL 语法篇(四):Group 聚合

  • 1.基础概念
  • 2.窗口聚合和 Group 聚合
  • 3.SQL 语义
  • 4.Group 聚合支持 Grouping sets、Rollup、Cube

1.基础概念

Group 聚合定义(支持 Batch / Streaming 任务):Flink 也支持 Group 聚合。Group 聚合和上面介绍到的窗口聚合的不同之处,就在于 Group 聚合是按照数据的类别进行分组,比如年龄、性别,是横向的;而窗口聚合是在时间粒度上对数据进行分组,是纵向的。如下图所示,就展示出了其区别。其中 按颜色分 key(横向)就是 Group 聚合按窗口划分(纵向)就是 窗口聚合

在这里插入图片描述

2.窗口聚合和 Group 聚合

应用场景:一般用于对数据进行分组,然后后续使用聚合函数进行 countsum 等聚合操作。

那么这时候,小伙伴萌就会问到,我其实可以把窗口聚合的写法也转换为 Group 聚合,只需要把 Group 聚合的 Group By key 换成时间就行,那这两个聚合的区别到底在哪?

首先来举一个例子看看怎么将 窗口聚合 转换为 Group 聚合。假如一个窗口聚合是按照 1 1 1 分钟的粒度进行聚合,如下 滚动窗口 SQL:

-- 数据源表
CREATE TABLE source_table (-- 维度数据dim STRING,-- 用户 iduser_id BIGINT,-- 用户price BIGINT,-- 事件时间戳row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),-- watermark 设置WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH ('connector' = 'datagen','rows-per-second' = '10','fields.dim.length' = '1','fields.user_id.min' = '1','fields.user_id.max' = '100000','fields.price.min' = '1','fields.price.max' = '100000'
)-- 数据汇表
CREATE TABLE sink_table (dim STRING,pv BIGINT,sum_price BIGINT,max_price BIGINT,min_price BIGINT,uv BIGINT,window_start bigint
) WITH ('connector' = 'print'
)-- 数据处理逻辑
insert into sink_table
select dim,count(*) as pv,sum(price) as sum_price,max(price) as max_price,min(price) as min_price,-- 计算 uv 数count(distinct user_id) as uv,UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS STRING)) * 1000  as window_start
from source_table
group bydim,-- 按照 Flink SQL tumble 窗口写法划分窗口tumble(row_time, interval '1' minute)

转换为 Group 聚合 的写法如下:

-- 数据源表
CREATE TABLE source_table (-- 维度数据dim STRING,-- 用户 iduser_id BIGINT,-- 用户price BIGINT,-- 事件时间戳row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),-- watermark 设置WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH ('connector' = 'datagen','rows-per-second' = '10','fields.dim.length' = '1','fields.user_id.min' = '1','fields.user_id.max' = '100000','fields.price.min' = '1','fields.price.max' = '100000'
);-- 数据汇表
CREATE TABLE sink_table (dim STRING,pv BIGINT,sum_price BIGINT,max_price BIGINT,min_price BIGINT,uv BIGINT,window_start bigint
) WITH ('connector' = 'print'
);-- 数据处理逻辑
insert into sink_table
select dim,count(*) as pv,sum(price) as sum_price,max(price) as max_price,min(price) as min_price,-- 计算 uv 数count(distinct user_id) as uv,cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint) as window_start
from source_table
group bydim,-- 将秒级别时间戳 / 60 转化为 1mincast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint)

确实没错,上面这个转换是一点问题都没有的。

但是窗口聚合和 Group by 聚合的差异在于:

  • 本质区别窗口聚合是具有时间语义的,其本质是想实现窗口结束输出结果之后,后续有迟到的数据也不会对原有的结果发生更改了,即输出结果值是定值(不考虑 allowLateness)。而 Group by 聚合是没有时间语义的,不管数据迟到多长时间,只要数据来了,就把上一次的输出的结果数据撤回,然后把计算好的新的结果数据发出。
  • 运行层面:窗口聚合是和 时间 绑定的,窗口聚合其中窗口的计算结果触发都是由 时间(Watermark)推动的。Group by 聚合完全由 数据 推动触发计算,新来一条数据去根据这条数据进行计算出结果发出;由此可见两者的实现方式也大为不同。

3.SQL 语义

SQL 语义这里也拿离线和实时做对比,Order 为 Kafka,target_table 为 Kafka,这个 SQL 生成的实时任务,在执行时,会生成三个算子。

  • 数据源算子From Order):数据源算子一直运行,实时的从 Order Kafka 中一条一条的读取数据,然后一条一条发送给下游的 Group 聚合算子,向下游发送数据的 shuffle 策略是根据 group by 中的 key 进行发送,相同的 key 发到同一个 SubTask(并发) 中。
  • Group 聚合算子group by key + sum / count / max / min):接收到上游算子发的一条一条的数据,去状态 state 中找这个 key 之前的 sum / count / max / min 结果。如果有结果 oldResult,拿出来和当前的数据进行 sum / count / max / min 计算出这个 key 的新结果 newResult,并将新结果 [key, newResult] 更新到 state 中,在向下游发送新计算的结果之前,先发一条撤回上次结果的消息 -[key, oldResult],然后再将新结果发往下游 +[key, newResult];如果 state 中没有当前 key 的结果,则直接使用当前这条数据计算 sum / max / min 结果 newResult,并将新结果 [key, newResult] 更新到 state 中,当前是第一次往下游发,则不需要先发回撤消息,直接发送 +[key, newResult]
  • 数据汇算子INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Kafka 中这个实时任务也是 24 24 24 小时一直在运行的,所有的算子在同一时刻都是处于 running 状态的。

4.Group 聚合支持 Grouping sets、Rollup、Cube

Group 聚合也支持 Grouping setsRollupCube。举一个 Grouping sets 的案例:

SELECT supplier_id, rating, product_id, COUNT(*)
FROM (VALUES('supplier1', 'product1', 4),('supplier1', 'product2', 3),('supplier2', 'product3', 3),('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SET (( supplier_id, product_id, rating ),( supplier_id, product_id         ),( supplier_id,             rating ),( supplier_id                     ),(              product_id, rating ),(              product_id         ),(                          rating ),(                                 )
)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/263978.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深度神经网络中的计算和内存带宽

深度神经网络中的计算和内存带宽 文章目录 深度神经网络中的计算和内存带宽来源原理介绍分析1:线性层分析2:卷积层分析3:循环层总结 来源 相关知识来源于这里。 原理介绍 Memory bandwidth and data re-use in deep neural network computat…

S32 Design Studio PE工具配置UART

配置操作 先得配置下GPIO用的是哪个引脚 跟之前的外设一样,它这里有两个UART可以用。 这里只能用UART0和2的原因是UART1被LIN使用了。 配置的内容为 生成代码对应 首先会根据components名称创建个驱动状态结构体 /*! Driver state structure */ lpuart_state_t …

程序媛的mac修炼手册-- 2024如何彻底卸载Python

啊,前段时间因为想尝试chatgpt的API,需要先创建一个python虚拟环境来安装OpenAI Python library. 结果,不出意外的出意外了,安装好OpenAI Python library后,因为身份认证问题,根本就没有获取API key的权限…

stable diffusion学习笔记 手部修复

图片手部修复原理 某张图片在生成后,仅有手部表现不符合预期(多指,畸形等)。这种情况下我们通常使用【局部重绘】的方式对该图片的手部进行【图生图】操作,重新绘制手部区域。 但是仅采用重绘的方式也很难保证生成的…

试卷打印如何去除答案?3个方法一键还原

试卷打印如何去除答案?在日常学习中,在打印试卷时,去除答案是一个常见的需求,特别是在学生复习或老师准备教学材料时。答案的存在可能会干扰学生的学习过程,或者使得试卷的重复使用变得困难。为了解决这个问题&#xf…

JWT学习笔记

了解 JWT Token 释义及使用 | Authing 文档 JSON Web Token Introduction - jwt.io JSON Web Token (JWT,RFC 7519 (opens new window)),是为了在网络应用环境间传递声明而执行的一种基于 JSON 的开放标准((RFC 7519)。该 token 被设计为紧凑…

Another Redis Desktop Manager工具连接集群

背景:使用Another Redis Desktop Manager连接redsi集群 win10安装 使用 下载 某盘: 链接:https://pan.baidu.com/s/1dg9kPm9Av8-bbpDfDg9DsA 提取码:t1sm 使用

[c++] char * 和 std::string

1 char * 和 std::string 的区别 char * 字符串是常量字符串,不能修改;std::string 指向的字符串可以修改 实例代码如下图所示,s1 和 s2 均是常量字符串,字符串常量保存在只读数据区,是只读的,不能写&…

接口自动化测试用例如何设计

说到自动化测试,或者说接口自动化测试,多数人的第一反应是该用什么工具,比如:Python Requests、Java HttpClient、Apifox、MeterSphere、自研的自动化平台等。大家似乎更关注的是哪个工具更优秀,甚至出现“ 做平台的 &…

kafka生产者2

1.数据可靠 • 0:生产者发送过来的数据,不需要等数据落盘应答。 风险:leader挂了之后,follower还没有收到消息。。。。 • 1:生产者发送过来的数据,Leader收到数据后应答。 风险:leader应答…

Pyglet综合应用|推箱子游戏地图编辑器之图片跟随鼠标

目录 推箱子游戏 升级一:鼠标操作 升级二:增加网格 升级三:模拟按钮 综合应用:地图编辑器 关卡地图洗数 推箱子游戏 本篇为之前写的博客《Pyglet综合应用|推箱子游戏之关卡图片载入内存》的续篇,内容…

剪辑视频调色怎么让画质变得清晰 视频剪辑调色技巧有哪些方面 剪辑视频免费的软件有哪些 会声会影调色在哪里 会声会影模板素材

视频调色的作用有很多,除了进行风格化剪辑以外,还可以让作品的画质变得清晰。通过调色来增强画面的清晰度,在观感上也会显得十分自然。视频调色的技巧有很多,并且原理大都十分简单。有关剪辑视频调色怎么让画质变得清晰&#xff0…

Linux之部署前后端分离项目

Nginx配置安装 1.安装依赖 我们这里安装的依赖是有4个的 [rootlocalhost opt]# yum -y install gcc zlib zlib-devel pcre-devel openssl openssl-devel 2.上传解压安装包 [rootlocalhost opt]# tar -xvf nginx-1.13.7.tar.gz -C /usr/local/java/3.安装Nginx &#xff0…

[C++]使用C++部署yolov9的tensorrt模型进行目标检测

部署YOLOv9的TensorRT模型进行目标检测是一个涉及多个步骤的过程,主要包括准备环境、模型转换、编写代码和模型推理。 首先,确保你的开发环境已安装了NVIDIA的TensorRT。TensorRT是一个用于高效推理的SDK,它能对TensorFlow、PyTorch等框架训…

Windows系统搭建Elasticsearch引擎结合内网穿透实现远程连接查询数据

文章目录 系统环境1. Windows 安装Elasticsearch2. 本地访问Elasticsearch3. Windows 安装 Cpolar4. 创建Elasticsearch公网访问地址5. 远程访问Elasticsearch6. 设置固定二级子域名 Elasticsearch是一个基于Lucene库的分布式搜索和分析引擎,它提供了一个分布式、多…

javaApI(Application Programming Interface)应用程序编程接口

ApI概念 Apl:指的是官方给开发人员提供的说明文档,对语言中有哪些类,类中有哪些方法进行说明 Objict 类 java.lang.Object 是java类体系结构中最顶层的类 Object可以表示java中任意的类 Object类中的方法 toString() 输出一个对象,但是…

适合新手博主站长使用的免费响应式WordPress博客主题JianYue

这款JianYue主题之所以命名为 JianYue,意思就是简单而不简约的。是根据Blogs主题优化而成,剔除了一些不必要的功能及排版,仅保留一种博客布局,让新手站长能够快速手上WordPress。可以说这款主题比较适合新手博主站长使用&#xff…

【Linux】部署前后端分离项目---(Nginx自启,负载均衡)

目录 前言 一 Nginx(自启动) 2.1 Nginx的安装 2.2 设置自启动Nginx 二 Nginx负载均衡tomcat 2.1 准备两个tomcat 2.1.1 复制tomcat 2.1.2 修改server.xml文件 2.1.3 开放端口 2.2 Nginx配置 2.2.1 修改nginx.conf文件 2.2.2 重启Nginx服务 2…

【MySQL】探索表结构、数据类型和基本操作

表、记录、字段 数据库的E-R(entity-relationship,实体-关系)模型中有三个主要概念: 实体集 、 属性 、 关系集 。 一个实体集对应于数据库中的一个表,一个实体则对应于数据库表 中的一行,也称为一条记录。…

adb-连接模拟器和真机操作

目录 1. 连接模拟器(夜神模拟器示例) 1.1 启动并连接模拟器 1.2 开启调试模式 2. USB连接真机调试 2.1 usb数据线连接好电脑,手机打开调试模式 2.2 输入adb devices检测手机 3. Wifi连接真机调试 3.1 USB连接手机和电脑 3.2 运行 adb…