FlinkSQL聚合函数(Aggregate Function)详解

使用场景: 聚合函数即 UDAF,常⽤于进多条数据,出⼀条数据的场景。

在这里插入图片描述

上图展示了⼀个 聚合函数的例⼦ 以及 聚合函数包含的重要⽅法

案例场景:

关于饮料的表,有三个字段,分别是 id、name、price,表⾥有 5 ⾏数据,找到所有饮料⾥最贵的饮料的价格,即执⾏⼀个 max() 聚合拿到结果,遍历所有 5 ⾏数据,最终结果就只有⼀个数值。

开发流程:

实现 AggregateFunction 接⼝,其中所有的⽅法必须是 public 的、⾮ static 的;

必须实现以下⽅法:

  • Acc聚合中间结果 createAccumulator() : 为当前 Key 初始化⼀个空的 accumulator,其存储了聚合的中间结果,⽐如在执⾏ max() 时会存储当前的 max 值;
  • accumulate(Acc accumulator, Input输⼊参数) : 每⼀⾏数据,调⽤ accumulate() ⽅法更新 accumulator,处理每⼀条输⼊数据,方法必须声明为 public 和⾮ static 的,accumulate ⽅法可以重载,⽅法的参数类型可以不同,并且⽀持变⻓参数。
  • Output输出参数 getValue(Acc accumulator) : 通过调⽤ getValue ⽅法来计算和返回最终的结果。

某些场景下必须实现:

  • retract(Acc accumulator, Input输⼊参数) : 在回撤流的场景下必须实现,在计算回撤数据时调⽤,如果没有实现会直接报错。
  • merge(Acc accumulator, Iterable it) : 在批式聚合以及流式聚合中的 Session、Hop 窗⼝聚合场景下必须要实现,此外,这个⽅法对于优化也有帮助,例如,打开了两阶段聚合优化,需要 AggregateFunction 实现 merge ⽅法,在数据 shuffle 前先进⾏⼀次聚合计算。
  • resetAccumulator() : 在批式聚合中是必须实现的。

关于⼊参、出参数据类型信息的⽅法:

默认情况下,⽤户的 Input 输⼊参数( accumulate(Acc accumulator, Input输⼊参数) 的⼊参 Input输⼊参数 )、accumulator( Acc聚合中间结果 createAccumulator() 的返回结果)、 Output输出参数数据类型( Output输出参数 getValue(Acc accumulator) 的 Output输出参数 )会被 Flink 使⽤反射获取到。

对于 accumulator 和 Output 输出参数类型,Flink SQL 的类型推导在遇到复杂类型时会推导出错误的结果(注意:Input输⼊参数 因为是上游算⼦传⼊的,类型信息是确认的,不会出现推导错误),⽐如⾮基本类型 POJO 的复杂类型。

同 ScalarFunction 和 TableFunction, AggregateFunction 提供了 AggregateFunction#getResultType() 和AggregateFunction#getAccumulatorType() 指定最终返回值类型和 accumulator 的类型,两个函数的返回值类型是TypeInformation。

  • getResultType() : 即 Output 输出参数 getValue(Acc accumulator) 的输出结果数据类型;
  • getAccumulatorType() : 即 Acc聚合中间结果 createAccumulator() 的返回结果数据类型。

案例: 加权平均值

  • 定义⼀个聚合函数来计算某⼀列的加权平均
  • 在 TableEnvironment 中注册函数
  • 在查询中使⽤函数

实现思路:

为了计算加权平均值,accumulator 需要存储加权总和以及数据的条数,定义了类 WeightedAvgAccumulator 作为 accumulator,Flink 的 checkpoint 机制会⾃动保存 accumulator,在失败时进⾏恢复,保证精确⼀次的语义。

WeightedAvg(聚合函数)的 accumulate ⽅法有三个输⼊参数,第⼀个是 WeightedAvgAccum accumulator,另外两个是⽤户⾃定义的输⼊:输⼊的值 ivalue 和 输⼊的权重 iweight,尽管 retract()、merge()、resetAccumulator() ⽅法在⼤多数聚合类型中都不是必须实现的,但在样例中提供了他们的实现,并且定义了 getResultType() 和 getAccumulatorType()。

代码案例:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;import java.io.Serializable;import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;/*** 输入数据:* a,1,1* a,10,2** 输出结果:* res1=>:1> +I[a, 1.0]* res2=>:1> +I[a, 1.0]* res3=>:1> +I[a, 1.0]** res1=>:1> -U[a, 1.0]* res1=>:1> +U[a, 7.0]* res3=>:1> -U[a, 1.0]* res3=>:1> +U[a, 7.0]* res2=>:1> -U[a, 1.0]* res2=>:1> +U[a, 7.0]*/
public class AggregateFunctionTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);DataStreamSource<String> source = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<Tuple3<String, Double, Double>> tpStream = source.map(new MapFunction<String, Tuple3<String, Double, Double>>() {@Overridepublic Tuple3<String, Double, Double> map(String input) throws Exception {return new Tuple3<>(input.split(",")[0],Double.parseDouble(input.split(",")[1]),Double.parseDouble(input.split(",")[2]));}});Table table = tEnv.fromDataStream(tpStream, "field,iValue,iWeight");tEnv.createTemporaryView("SourceTable", table);Table res1 = tEnv.from("SourceTable").groupBy($("field")).select($("field"), call(WeightedAvg.class, $("iValue"), $("iWeight")));// 注册函数tEnv.createTemporarySystemFunction("WeightedAvg", WeightedAvg.class);// Table API 调⽤函数Table res2 = tEnv.from("SourceTable").groupBy($("field")).select($("field"), call("WeightedAvg", $("iValue"), $("iWeight")));// SQL API 调⽤函数Table res3 = tEnv.sqlQuery("SELECT field, WeightedAvg(`iValue`, iWeight) FROM SourceTable GROUP BY field");tEnv.toChangelogStream(res1).print("res1=>");tEnv.toChangelogStream(res2).print("res2=>");tEnv.toChangelogStream(res3).print("res3=>");env.execute();}// ⾃定义⼀个计算权重 avg 的 accmulatorpublic static class WeightedAvgAccumulator implements Serializable {public Double sum = 0.0;public Double count = 0.0;}// 输⼊:Long iValue, Integer iWeightpublic static class WeightedAvg extends AggregateFunction<Double, WeightedAvgAccumulator> {// 创建⼀个 accumulator@Overridepublic WeightedAvgAccumulator createAccumulator() {return new WeightedAvgAccumulator();}public void accumulate(WeightedAvgAccumulator acc, Double iValue, Double iWeight) {acc.sum += iValue * iWeight;acc.count += iWeight;}public void retract(WeightedAvgAccumulator acc, Double iValue, Double iWeight) {acc.sum -= iValue * iWeight;acc.count -= iWeight;}// 获取返回结果@Overridepublic Double getValue(WeightedAvgAccumulator acc) {if (acc.count == 0) {return null;} else {return acc.sum / acc.count;}}// Session window 使⽤这个⽅法将⼏个单独窗⼝的结果合并public void merge(WeightedAvgAccumulator acc, Iterable<WeightedAvgAccumulator> it) {for (WeightedAvgAccumulator a : it) {acc.count += a.count;acc.sum += a.sum;}}public void resetAccumulator(WeightedAvgAccumulator acc) {acc.count = 0.0;acc.sum = 0.0;}}
}

测试结果:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/188846.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

visual studio 启用DPI识别功能

在开发widow程序时&#xff0c;有时必须将电脑 设置-->显示-->缩放与布局-->更改文本、应用项目的大小-->100%后&#xff0c;程序的画面才能正确运行&#xff0c;居说这是锁定了dpi的原因&#xff0c;需要启dpi识别功能。设置方法如下&#xff1a; 或者

Python开源项目PGDiff——人脸重建(Face Restoration),模糊清晰、划痕修复及黑白上色的实践

python ansconda 等的下载、安装等请参阅&#xff1a; Python开源项目CodeFormer——人脸重建&#xff08;Face Restoration&#xff09;&#xff0c;模糊清晰、划痕修复及黑白上色的实践https://blog.csdn.net/beijinghorn/article/details/134334021 友情提示&#xff1a; …

Vue23组件自定义事件 和 解绑事件

Vue2&3组件自定义事件 和 解绑事件 Vue2组件自定义事件 功能&#xff1a;父组件绑定数据&#xff0c;子组件触发事件。&#xff08;父绑子触发&#xff09; 实现步骤&#xff08;前三步在父组件实现&#xff0c;第四步在子组件实现&#xff09;&#xff1a; 第一步&#…

ArcGIS进阶:栅格计算器里的Con函数使用方法

本实验操作为水土保持功能重要性评价&#xff1a; 所用到的数据包括&#xff1a;土地利用类型数据&#xff08;矢量&#xff09;、植被覆盖度数据&#xff08;矢量&#xff09;和地形坡度数据&#xff08;栅格&#xff09;。 由于实验数据较少&#xff0c;其思路也较为简单&a…

立体相机标定

相机成像过程中涉及的4个坐标系&#xff1a; 1、世界坐标系&#xff1a;由用户定义的三维世界坐标系&#xff0c;描述物体和相机在真实世界中的位置&#xff0c;原点可以任意选择。 2、相机坐标系&#xff1a;以相机的光心为坐标原点&#xff0c;X轴和Y轴平行于图像坐标系的X轴…

OpenHarmony 社区运营报告(2023 年 10 月)

● 截至 2023 年 10 月&#xff0c;OpenHarmony 社区共有 51 家共建单位&#xff0c;累计超过 6200 名贡献者产生 24.2 万多个 PR&#xff0c;2.3 万多个 Star&#xff0c;6.1 万多个 Fork&#xff0c;59 个 SIG。 ● OpenHarmony 4.0 版本如期而至&#xff0c;开发套件同步升级…

【js逆向实战】某sakura动漫视频逆向

写在前面 再写一个逆向实战&#xff0c;后面写点爬虫程序来实现一下。 网站简介与逆向目标 经典的一个视频网站&#xff0c;大多数视频网站走的是M3U8协议&#xff0c;就是一个分段传输&#xff0c;其实这里就有两个分支。 通过传统的m3u8协议&#xff0c;我们可以直接进行分…

建行广东省江门市分行走进农村地区开展反假货币宣传

人民对美好生活的向往&#xff0c;涉及方方面面&#xff0c;小至“钱袋子”安全。建行广东省江门市分行落实当地监管部门部署&#xff0c;积极扛起维护国家金融安全的重要政治责任&#xff0c;深入农村地区开展反假货币宣传工作&#xff0c;助力构建农村反假货币工作长效机制。…

拓扑排序软件设计——ToplogicalSort_app(含有源码、需求分析、可行性分析、概要设计、用户使用手册)

拓扑排序软件设计 前言1. 需求分析2. 可行性分析2.1 简介2.2 技术可行性分析2.2.1 技术实现方案2.2.2 开发人员技能要求2.2.3 可行性 2.3 操作可行性分析2.4 结论 3. 项目报告3.1 修订历史记录3.2 软硬件环境3.3 需求分析3.4 详细设计3.4.1 类设计3.4.2 核心流程描述3.4.3 核心…

安全通信网络(设备和技术注解)

网络安全等级保护相关标准参考《GB/T 22239-2019 网络安全等级保护基本要求》和《GB/T 28448-2019 网络安全等级保护测评要求》 密码应用安全性相关标准参考《GB/T 39786-2021 信息系统密码应用基本要求》和《GM/T 0115-2021 信息系统密码应用测评要求》 1网络架构 1.1保证网络…

Zephyr-7B论文解析及全量训练、Lora训练

文章目录 一、Zephyr&#xff1a;Direct Distillation of LM Alignment1.1 开发经过1.1.1 Zephyr-7B-alpha1.1.2 Zephyr-7B-beta 1.2 摘要1.3 相关工作1.4 算法1.4.1 蒸馏监督微调&#xff08;dSFT&#xff09;1.4.2 基于偏好的AI反馈 (AIF&#xff09;1.4.3 直接蒸馏偏好优化&…

论文阅读[121]使用CAE+XGBoost从荧光光谱中检测和识别饮用水中的有机污染物

【论文基本信息】 标题&#xff1a;Detection and Identification of Organic Pollutants in Drinking Water from Fluorescence Spectra Based on Deep Learning Using Convolutional Autoencoder 标题译名&#xff1a;基于使用卷积自动编码器的深度学习&#xff0c;从荧光光谱…

Clickhouse学习笔记(8)—— 建表优化

数据类型 时间字段 建表时能用数值型或日期时间类型&#xff08;DateTime&#xff09;表示的字段就不要用字符串 因为clickhouse进行分区时一般使用时间字段来进行分区&#xff0c;而将时间字段使用DateTime表示&#xff0c;不需要经过函数转换处理&#xff0c;执行效率高、…

黑窗口连接远程服务

ssh root192.168.x.x 回车输入密码 查看docker docker ps 停止正在运行的服务 docker stop xxxxx 删除服务 docker rm xxxxx 查看镜像 docker images 删除镜像 docker rmi xxxxx 删除镜像 启动并运行整个服务 docker compose up -d jar包名称 idea 使用tcp方式连接docker 配置d…

深度探究深度学习常见数据类型INT8 FP32 FP16的区别即优缺点

定点和浮点都是数值的表示&#xff08;representation&#xff09;&#xff0c;它们区别在于&#xff0c;将整数&#xff08;integer&#xff09;部分和小数&#xff08;fractional&#xff09;部分分开的点&#xff0c;点在哪里。定点保留特定位数整数和小数&#xff0c;而浮点…

webpack babel

构建工具 简介 当我们习惯了在node中编写代码的方式后&#xff0c;在回到前端编写html、css、js这些东西会感觉到各种的不便。比如&#xff1a;不能放心的使用模块化规范&#xff08;浏览器兼容性问题&#xff09;、即使可以使用模块化规范也会面临模块过多时的加载问题。我们…

S7-1200PLC和SMART PLC开放式以太网通信(UDP双向通信)

S7-1200PLC的以太网通信UDP通信相关介绍还可以参考下面文章链接: 博途PLC开放式以太网通信TRCV_C指令应用编程(运动传感器UDP通信)-CSDN博客文章浏览阅读2.8k次。博途PLC开放式以太网通信TSENG_C指令应用,请参看下面的文章链接:博途PLC 1200/1500PLC开放式以太网通信TSEND_…

图书销售数据大屏可视化【可视化项目案例-03】

🎉🎊🎉 你的技术旅程将在这里启航! 🚀🚀 本文选自专栏:可视化技术专栏100例 可视化技术专栏100例,包括但不限于大屏可视化、图表可视化等等。订阅专栏用户在文章底部可下载对应案例源码以供大家深入的学习研究。 🎓 每一个案例都会提供完整代码和详细的讲解,不…

k8s 裸金属集群部署metalLB软负载均衡 —— 筑梦之路

metalLB 官方网站 Repo&#xff1a;https://github.com/metallb/metallb 官网&#xff1a;https://metallb.universe.tf/installation metalLB解决什么问题&#xff1f; MetalLB 是一个用于裸机 Kubernetes 集群的负载均衡器实现&#xff0c;使用标准路由协议。 k8s 并没有为裸…

【m98】webrtc vs2017构建带符号的debug库

调试有符号 调试 无符号 试试exe不输出到独立的文件? -】 直接输出到sln下面