Flink SQL 表值聚合函数(Table Aggregate Function)详解

使用场景: 表值聚合函数即 UDTAF,这个函数⽬前只能在 Table API 中使⽤,不能在 SQL API 中使⽤。

函数功能:

在 SQL 表达式中,如果想对数据先分组再进⾏聚合取值:

select max(xxx) from source_table group by key1, key2

上⾯ SQL 的 max 语义产出只有⼀条最终结果,如果想取聚合结果最⼤的 n 条数据,并且 n 条数据,每⼀条都要输出⼀次结果数据,上⾯的 SQL 就没有办法实现了。

所以 UDTAF 为了处理这种场景,可以⾃定义 怎么取 , 取多少条 最终的聚合结果,UDTAF 和 UDAF 是类似的。

在这里插入图片描述

案例场景: 有⼀个饮料表有 3 列,分别是 id、name 和 price,⼀共有 5 ⾏,需要找到价格最⾼的两个饮料,类似于 top2,表值聚合函数,需要遍历所有 5 ⾏数据,输出结果为 2 ⾏数据的⼀个表。

开发流程:

实现 TableAggregateFunction 接⼝,其中所有的⽅法必须是 public 的、⾮ static 的

必须实现以下⽅法:

Acc聚合中间结果 createAccumulator() : 为当前 Key 初始化⼀个空的 accumulator,存储了聚合的中间结果,⽐如在执⾏ max() 时会存储每⼀条中间结果的 max 值;

accumulate(Acc accumulator, Input输⼊参数) : 每⼀⾏数据,都会调⽤ accumulate() ⽅法更新 accumulator,⽅法对每⼀条输⼊数据执⾏,⽐如执⾏ max() 时,遍历每⼀条数据执⾏;这个⽅法必须声明为 public 和⾮ static 的,accumulate ⽅法可以重载,每个⽅法的参数类型可以不同,⽀持变⻓参数。

emitValue(Acc accumulator, Collector collector) 或者 emitUpdateWithRetract(Acc accumulator, RetractableCollector collector) :

当所有的数据处理完之后,调⽤ emit ⽅法来计算和输出最终结果,可以⾃定义输出多少条以及怎样输出结果。

对于 emitValue 以及 emitUpdateWithRetract 区别,以 TopN 举例,emitValue 每次都会发送所有的最⼤的 n 个值,⽽这在流式任务中会有性能问题,为提升性能,可以实现 emitUpdateWithRetract ⽅法,这个⽅法在 retract 模式下会增量输出结果,⽐如只在有数据更新时,做到撤回⽼数据,再发送新数据,⽽不需要每次都发出全量的最新数据。

如果同时定义了 emitUpdateWithRetract、emitValue ⽅法,那 emitUpdateWithRetract 会优先于 emitValue ⽅法被使⽤,因为引擎会认为 emitUpdateWithRetract 会更加⾼效,它的输出是增量的。

某些场景下必须实现:

  • retract(Acc accumulator, Input输⼊参数) : 回撤流的场景必须实现,在计算回撤数据时调⽤,如果没有实现则会直接报错。
  • merge(Acc accumulator, Iterable it) : 在批式聚合以及流式聚合中的 Session、Hop 窗⼝聚合场景必须实现,这个⽅法对优化也有帮助,例如,打开了两阶段聚合优化,需要 AggregateFunction 实现 merge ⽅法,从⽽在第⼀阶段先进⾏数据聚合。
  • resetAccumulator() : 在批式聚合中是必须实现的。

关于⼊参、出参数据类型:

默认情况下,⽤户的 Input输⼊参数( accumulate(Acc accumulator, Input输⼊参数) 的⼊参 Input输⼊参数 )、accumulator( Acc聚 合中间结果 createAccumulator() 的返回结果)、 Output输出参数 数据类型( emitValue(Acc acc,Collector<Output输出参数> out) 的 Output输出参数 )会被 Flink 反射获取,但对于accumulator 和 Output输出参数类型来说,Flink SQL 的类型推导在遇到复杂类型的时候可能会推导出错误的结果(注意: Input输⼊参数 因为是上游算⼦传⼊的,所以类型信息是确认的,不会出现推导错误的情况),⽐如那些⾮基本类型 POJO 的复杂类型,所以跟 ScalarFunction 和 TableFunction ⼀样, AggregateFunction 提供了TableAggregateFunction#getResultType() 和 TableAggregateFunction#getAccumulatorType() 来分别指定最终返回值类型和accumulator 的类型,两个函数的返回值类型都是 TypeInformation。

  • getResultType() : 即 emitValue(Acc acc, Collector<Output输出参数> out) 的输出结果数据类型;
  • getAccumulatorType() : 即 Acc聚合中间结果 createAccumulator() 的返回结果数据类型;

案例场景: Top2

定义⼀个 TableAggregateFunction 来计算给定列的最⼤的 2 个值

在 TableEnvironment 中注册函数

在 Table API 查询中使⽤函数(当前只在 Table API 中⽀持 TableAggregateFunction)

实现思路:

计算最⼤的 2 个值,accumulator 需要保存当前的最⼤的 2 个值,定义了类 Top2Accum 作为 accumulator,Flink 的 checkpoint 机制会⾃动保存 accumulator,在失败时进⾏恢复,来保证精确⼀次的语义。

Top2 表值聚合函数(TableAggregateFunction)的 accumulate() ⽅法有两个输⼊,第⼀个是 Top2Accum accumulator,另⼀个是⽤户定义的输⼊:输⼊的值 v,尽管 merge() ⽅法在⼤多数聚合类型中不是必须的,但在样例中提供了它的实现。并且定义了 getResultType() 和 getAccumulatorType() ⽅法。

代码案例:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.util.Collector;/*** 输入数据:* a,1* a,2* a,3* * 输出结果:* res=>:1> +I[a, 1, 1]* res=>:1> -D[a, 1, 1]* res=>:1> +I[a, 2, 1]* res=>:1> +I[a, 1, 2]* res=>:1> -D[a, 2, 1]* res=>:1> -D[a, 1, 2]* res=>:1> +I[a, 3, 1]* res=>:1> +I[a, 2, 2]*/
public class TableAggregateFunctionTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);DataStreamSource<String> source = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<Tuple2<String,Integer>> tpStream = source.map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String,Integer> map(String input) throws Exception {return new Tuple2<>(input.split(",")[0],Integer.parseInt(input.split(",")[1]));}});tEnv.registerFunction("top2", new Top2());Table table = tEnv.fromDataStream(tpStream, "key,value");tEnv.createTemporaryView("SourceTable", table);// 使⽤函数Table res = tEnv.from("SourceTable").groupBy("key").flatAggregate("top2(value) as (v, rank)").select("key, v, rank");tEnv.toChangelogStream(res).print("res=>");env.execute();}/*** Accumulator for Top2.*/public static class Top2Accum {public Integer first;public Integer second;}public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Accum> {@Overridepublic Top2Accum createAccumulator() {Top2Accum acc = new Top2Accum();acc.first = Integer.MIN_VALUE;acc.second = Integer.MIN_VALUE;return acc;}public void accumulate(Top2Accum acc, Integer v) {if (v > acc.first) {acc.second = acc.first;acc.first = v;} else if (v > acc.second) {acc.second = v;}}public void merge(Top2Accum acc, java.lang.Iterable<Top2Accum> iterable) {for (Top2Accum otherAcc : iterable) {accumulate(acc, otherAcc.first);accumulate(acc, otherAcc.second);}}public void emitValue(Top2Accum acc, Collector<Tuple2<Integer, Integer>> out) {// emit the value and rankif (acc.first != Integer.MIN_VALUE) {out.collect(Tuple2.of(acc.first, 1));}if (acc.second != Integer.MIN_VALUE) {out.collect(Tuple2.of(acc.second, 2));}}}
}

测试结果:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/189733.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

第24章_mysql性能分析工具的使用

文章目录 1. 数据库服务器的优化步骤2.查看系统性能参数3. 统计SQL的查询成本&#xff1a;last_query_cost4. 定位执行慢的 SQL&#xff1a;慢查询日志4.1 开启慢查询日志参数4.2 查看慢查询数目4.3 测试慢sql语句&#xff0c;查看慢日志4.4 系统变量 log_output&#xff0c; l…

用excel计算一个矩阵的转置矩阵

假设我们的原矩阵是一个3*3的矩阵&#xff1a; 125346789 现在求它的转置矩阵&#xff1a; 鼠标点到一个空白的地方&#xff0c;用来存放结果&#xff1a; 插入-》函数&#xff1a; 选择TRANSPOSE&#xff0c;这个就是求转置矩阵的函数&#xff1a; 点击“继续”&#xff1a…

Windows查看端口占用情况

Windows如何查看端口占用情况 方法1. cmd命令行执行netstat命令&#xff0c;查看端口占用情况 netstat -ano 以上命令输出太多信息&#xff0c;不方便查看&#xff0c;通过如下命令搜索具体端口占用情况&#xff0c;例如&#xff1a;8080端口 netstat -ano | findstr "…

初阶JavaEE(17)Linux 基本使用和 web 程序部署

接上次博客&#xff1a;初阶JavaEE&#xff08;16&#xff09;博客系统&#xff08;Markdown编辑器介绍、博客系统功能、博客系统编写&#xff1a;博客列表页 、博客详情页、实现登录、实现强制登录、显示用户信息、退出登录、发布博客&#xff09;-CSDN博客 目录 Linux 基本…

std::any

一、简介 std::any 可以储存任何可拷贝构造和可销毁的类型的对象。 struct test {test(int a,int b){} };int main(int argc, char *argv[]) {std::any a 1;qDebug() << a.type().name();a 3.14;qDebug() << a.type().name();a true;qDebug() << a.type…

从0到0.01入门React | 010.精选 React 面试题

🤍 前端开发工程师(主业)、技术博主(副业)、已过CET6 🍨 阿珊和她的猫_CSDN个人主页 🕠 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 🍚 蓝桥云课签约作者、已在蓝桥云课上架的前后端实战课程《Vue.js 和 Egg.js 开发企业级健康管理项目》、《带你从入…

开发者测试2023省赛--UnrolledLinkedList测试用例

测试结果 官方提交结果 EclEmma PITest 被测文件UnrolledLinkedList.java /** This source code is placed in the public domain. This means you can use it* without any restrictions.*/package net.mooctest;import java.util.AbstractList; import java.util.Collectio…

Postman模拟上传文件

如图&#xff0c;在F12抓到的上传文件的请求 那要在postman上模拟这种上传&#xff0c;怎么操作呢&#xff0c;如图&#xff0c;选中【Select File】选取文件上传即可

C++进阶篇4---番外-红黑树

一、红黑树的概念 红黑树&#xff0c;是一种二叉搜索树&#xff0c;但在每个结点上增加一个存储位表示结点的颜色&#xff0c;可以是Red或 Black。 通过对任何一条从根到叶子的路径上各个结点着色方式的限制&#xff0c;红黑树确保没有一条路 径会比其他路径长出俩倍&#xff0…

19. 深度学习 - 用函数解决问题

文章目录 Hi&#xff0c; 你好。我是茶桁。 上一节课&#xff0c;我们从一个波士顿房价的预测开始写代码&#xff0c;写到了KNN。 之前咱们机器学习课程中有讲到KNN这个算法&#xff0c;分析过其优点和缺点&#xff0c;说起来&#xff0c;KNN这种方法比较低效&#xff0c;在数…

Leetcode刷题详解—— 有效的数独

1. 题目链接&#xff1a;36. 有效的数独 2. 题目描述&#xff1a; 请你判断一个 9 x 9 的数独是否有效。只需要 根据以下规则 &#xff0c;验证已经填入的数字是否有效即可。 数字 1-9 在每一行只能出现一次。数字 1-9 在每一列只能出现一次。数字 1-9 在每一个以粗实线分隔的…

U-Mail邮件中继,让海外邮件沟通更顺畅

在海外&#xff0c;电子邮件是人们主要的通信工具&#xff0c;尤其是商务往来沟通&#xff0c;企业邮箱是标配。这主要是因为西方国家互联网发展较早&#xff0c;在互联网早期&#xff0c;电子邮件技术较为成熟&#xff0c;大家都用电子邮件交流&#xff0c;于是这成了一种潮流…

Jenkins简介及Docker Compose部署

Jenkins是一个开源的自动化服务器&#xff0c;用于自动化构建、测试和部署软件项目。它提供了丰富的插件生态系统&#xff0c;支持各种编程语言和工具&#xff0c;使得软件开发流程更加高效和可靠。在本文中&#xff0c;我们将介绍Jenkins的基本概念&#xff0c;并展示如何使用…

2023年第十六届山东省职业院校技能大赛中职组“网络安全”赛项规程

第十六届山东省职业院校技能大赛 中职组“网络安全”赛项规程 一、赛项名称 赛项名称&#xff1a;网络安全 英文名称&#xff1a;Cyber Security 赛项组别&#xff1a;中职组 专业大类&#xff1a;电子与信息大类 二、竞赛目的 网络空间已经成为陆、海、空、天之后的第…

C/C++满足条件的数累加 2021年9月电子学会青少年软件编程(C/C++)等级考试一级真题答案解析

目录 C/C满足条件的数累加 一、题目要求 1、编程实现 2、输入输出 二、算法分析 三、程序编写 四、程序说明 五、运行结果 六、考点分析 C/C满足条件的数累加 2021年9月 C/C编程等级考试一级编程题 一、题目要求 1、编程实现 现有n个整数&#xff0c;将其中个位数…

linux安装并配置git连接github

git安装 sudo apt-get install git git信息配置 git config --global uer.name "yourname" git config --global user.email "youremail" 其中&#xff0c;yourname是你在github上配置的用户名&#xff0c;youremail是你在github上设置的邮箱 查看git…

吃透 Spring 系列—MVC部分

目录 ◆ SpringMVC简介 - SpringMVC概述 - SpringMVC快速入门 - Controller中访问容器中的Bean - SpringMVC关键组件浅析 ◆ SpringMVC的请求处理 - 请求映射路径的配置 - 请求数据的接收 - Javaweb常用对象获取 - 请求静态资源 - 注解驱动 标签 ◆ SpringMV…

STL简介+浅浅了解string——“C++”

各位CSDN的uu们好呀&#xff0c;终于到小雅兰的STL的学习了&#xff0c;下面&#xff0c;让我们进入CSTL的世界吧&#xff01;&#xff01;&#xff01; 1. 什么是STL 2. STL的版本 3. STL的六大组件 4. STL的重要性 5. 如何学习STL 6.STL的缺陷 7.为什么要学习string类 …

AIGC专栏8——EasyPhoto 视频领域拓展-让AIGC肖像动起来

AIGC专栏8——EasyPhoto 视频领域初拓展-让AIGC肖像动起来 学习前言源码下载地址技术原理储备Video Inference 功能说明 & 效果展示1、Text2Video功能说明a、实现原理简介b、文到视频UI介绍c、结果展示 2、Image2Video功能说明a、实现原理简介i、单图模式ii、首尾图模式 b、…

react 组件进阶

目标&#xff1a;1.能够使用props接收数据 2.能够实现父子组建之间的通讯 3.能够实现兄弟组建之间的通讯 4.能够给组建添加props校验 5.能够说出生命周期常用的钩子函数 6.能够知道高阶组件的作用 一&#xff0c;组件通讯介绍 组件是独立且封闭的单元&#xff0c;默认情况下&a…