Hive SQL ——窗口函数源码阅读

前言

   使用Starrocks引擎中的窗口函数 row_number() over( )对10亿的数据集进行去重操作,BE内存溢出问题频发(忘记当时指定的BE内存上限是多少了.....),此时才意识到,开窗操作,如果使用 不当,反而更容易引发性能问题。 下文是对Hive中的窗口函数底层源码进行初步学习,若有问题,请指正~

一、窗口函数的执行步骤

(1)将数据分割成多个分区;

(2)在各个分区上调用窗口函数;

   由于窗口函数的返回结果不是一个聚合值,而是另一张表的格式(table-in, table-out),因此Hive社区引入分区表函数  Partitioned Table Function(PTF)。

  简略的代码流转图:

  hive会把QueryBlock,翻译成执行操作数OperatorTree,其中每个operator都会有三个重要的方法:

  • initializeOp() :初始化算子
  • process() :执行每一行数据
  • forward() :把处理好的每一行数据发送到下个Operator

   当遇到窗口函数时,会生成PTFOperator,PTFOperator依赖PTFInvocation 读取已经排好序的数据,创建相应的输入分区:PTFPartition inputPart;

   WindowTableFunction 负责管理窗口帧、调用窗口函数(UDAF)、并将结果写入输出分区: PTFPartition outputPart。

二、源码分析

2.1 PTFOperator 类

   是PartitionedTableFunction的运算符,继承Operator抽象类(Hive运算符基类)

重写process(Object row, int tag) 方法,该方法来处理一行数据Row

@Overridepublic void process(Object row, int tag) throws HiveException {if (!isMapOperator) {/** check if current row belongs to the current accumulated Partition:* - If not:*  - process the current Partition*  - reset input Partition* - set currentKey to the newKey if it is null or has changed.*/newKeys.getNewKey(row, inputObjInspectors[0]);//会判断当前row所属的Key(newKeys)是否等于当前正在累积数据的partition所属的key(currentKeys)boolean keysAreEqual = (currentKeys != null && newKeys != null) ?newKeys.equals(currentKeys) : false;// 如果不相等,就结束当前partition分区的数据累积,触发窗口计算if (currentKeys != null && !keysAreEqual) {// 关闭正在积累的分区ptfInvocation.finishPartition();}// 如果currentKeys为空或者被改变,就将newKeys赋值给currentKeysif (currentKeys == null || !keysAreEqual) {// 开启一个新的分区partitionptfInvocation.startPartition();if (currentKeys == null) {currentKeys = newKeys.copyKey();} else {currentKeys.copyKey(newKeys);}}} else if (firstMapRow) { // 说明当前row是进入的第一行ptfInvocation.startPartition();firstMapRow = false;}// 将数据row添加到分区中,积累数据ptfInvocation.processRow(row);}

   上面的代码可以看出,所有数据应该是按照分区排好了序,排队进入process方法,当遇到进入的row和当前分区不是同一个key时,当前分区就可以关闭了,然后在打开下一个分区。

2.2 PTFInvocation类

  PTFInvocationPTFOperator类 的内部类

 在PTFOperator的初始化方法中创建了实例。

@Overrideprotected void initializeOp(Configuration jobConf) throws HiveException {...ptfInvocation = setupChain();ptfInvocation.initializeStreaming(jobConf, isMapOperator);...}

   它的主要作用是负责PTF 数据链中行( row)的流动,通过 ptfInvocation.processRow(row) 方法调用传递链中的每一行,并且通过ptfInvocation.startPartition()、ptfInvocation.finishPartition()方法来通知分区何时开始何时结束。

 该类中包含TableFunction,用来处理分区数据。

PTFPartition inputPart; // inputPart理解为:分区对象,一直是在复用一个inputPart
TableFunctionEvaluator tabFn; // tabFn理解为:窗口函数的实例//向分区中添加一行数据
void processRow(Object row) throws HiveException {if (isStreaming()) {// tabFn是窗口函数的实例handleOutputRows(tabFn.processRow(row));} else {// inputPart就是当前正在累积数据的分区inputPart.append(row);}
}// 开启一个分区
void startPartition() throws HiveException {if (isStreaming()) {tabFn.startPartition();} else {if (prev == null || prev.isOutputIterator()) {if (inputPart == null) {// 创建新分区对象:PTFPartition对象createInputPartition();} else {// 重置分区inputPart.reset();}}}if (next != null) {next.startPartition();}
}// 关闭一个分区
void finishPartition() throws HiveException {if (isStreaming()) {handleOutputRows(tabFn.finishPartition());} else {if (tabFn.canIterateOutput()) {outputPartRowsItr = inputPart == null ? null :tabFn.iterator(inputPart.iterator());} else {// tabFn是窗口函数的实例,execute方法:执行窗口函数逻辑的计算,返回outputPart依旧是一个分区对象outputPart = inputPart == null ? null : tabFn.execute(inputPart);outputPartRowsItr = outputPart == null ? null : outputPart.iterator();}if (next != null) {if (!next.isStreaming() && !isOutputIterator()) {next.inputPart = outputPart;} else {if (outputPartRowsItr != null) {while (outputPartRowsItr.hasNext()) {next.processRow(outputPartRowsItr.next());}}}}if (next != null) {next.finishPartition();} else {if (!isStreaming()) {if (outputPartRowsItr != null) {while (outputPartRowsItr.hasNext()) {// 将窗口函数计算结果逐条输出到下一个Operator中forward(outputPartRowsItr.next(), outputObjInspector);}}}}
}

2.3 PTFPartition类

   该类表示由TableFunctionWindowFunction来处理的行集合,使用PTFRowContainer来保存数据。

private final PTFRowContainer<List<Object>> elems; // 存放数据的容器public void append(Object o) throws HiveException {//在往PTFPartition中添加数据时,如果当前累计条数超过了Int最大值(21亿),会抛异常。if (elems.rowCount() == Integer.MAX_VALUE) {throw new HiveException(String.format("Cannot add more than %d elements to a PTFPartition",Integer.MAX_VALUE));}@SuppressWarnings("unchecked")List<Object> l = (List<Object>)ObjectInspectorUtils.copyToStandardObject(o, inputOI, ObjectInspectorCopyOption.WRITABLE);elems.addRow(l);
}

2.4 TableFunctionEvaluator类

   该类负责对分区内的数据做实际的窗口计算

public abstract class TableFunctionEvaluator {
transient protected PTFPartition outputPartition; // transient瞬态变量,该属性可以不参与序列化// iPart理解为:分区对象
public PTFPartition execute(PTFPartition iPart)throws HiveException {if (ptfDesc.isMapSide()) {return transformRawInput(iPart);}PTFPartitionIterator<Object> pItr = iPart.iterator();PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc.getLlInfo(), pItr);if (outputPartition == null) {outputPartition = PTFPartition.create(ptfDesc.getCfg(),tableDef.getOutputShape().getSerde(),OI, tableDef.getOutputShape().getOI());} else {outputPartition.reset();}// 入参1:输入PTFPartition转换的迭代器;入参2:输出PTFPartitionexecute(pItr, outputPartition);return outputPartition;
}protected abstract void execute(PTFPartitionIterator<Object> pItr, PTFPartition oPart) throws HiveException;
}

 抽象方法 execute(PTFPartitionIterator pItr, PTFPartition oPart) 方法的具体实现在子类WindowingTableFunction

public class WindowingTableFunction extends TableFunctionEvaluator {@Override
public void execute(PTFPartitionIterator<Object> pItr, PTFPartition outP) throws HiveException {ArrayList<List<?>> oColumns = new ArrayList<List<?>>();PTFPartition iPart = pItr.getPartition();StructObjectInspector inputOI = iPart.getOutputOI();WindowTableFunctionDef wTFnDef = (WindowTableFunctionDef) getTableDef();for (WindowFunctionDef wFn : wTFnDef.getWindowFunctions()) {// 这里是判断逻辑:如果该窗口定义是一个从第一行到最后一行的全局无限窗口就返回false,反之trueboolean processWindow = processWindow(wFn.getWindowFrame());pItr.reset();if (!processWindow) {Object out = evaluateFunctionOnPartition(wFn, iPart);if (!wFn.isPivotResult()) {out = new SameList(iPart.size(), out);}oColumns.add((List<?>) out);} else {oColumns.add(executeFnwithWindow(wFn, iPart));}}/** Output Columns in the following order* - the columns representing the output from Window Fns* - the input Rows columns*/for (int i = 0; i < iPart.size(); i++) {ArrayList oRow = new ArrayList();Object iRow = iPart.getAt(i);for (int j = 0; j < oColumns.size(); j++) {oRow.add(oColumns.get(j).get(i));}for (StructField f : inputOI.getAllStructFieldRefs()) {oRow.add(inputOI.getStructFieldData(iRow, f));}//最终将处理好的数据逐条添加到输出PTFPartition中outP.append(oRow);}
}// Evaluate the function result for each row in the partition
ArrayList<Object> executeFnwithWindow(WindowFunctionDef wFnDef,PTFPartition iPart)throws HiveException {ArrayList<Object> vals = new ArrayList<Object>();for (int i = 0; i < iPart.size(); i++) {// 入参:1.窗口函数、2.当前行的行号、3.输入PTFPartition对象Object out = evaluateWindowFunction(wFnDef, i, iPart);vals.add(out);}return vals;
}// Evaluate the result given a partition and the row number to process
private Object evaluateWindowFunction(WindowFunctionDef wFn, int rowToProcess, PTFPartition partition)throws HiveException {BasePartitionEvaluator partitionEval = wFn.getWFnEval().getPartitionWindowingEvaluator(wFn.getWindowFrame(), partition, wFn.getArgs(), wFn.getOI(), nullsLast);// 给定当前行,获取窗口的聚合return partitionEval.iterate(rowToProcess, ptfDesc.getLlInfo());
}}

注:WindowingTableFunction类中的execute方法 ,没怎么理解清楚,待补充~

三、Hive SQL窗口函数实现原理

    window Funtion的使用语法:

select col1,col2,row_number() over (partition by col1 order by col2 窗口子句) as rnfrom tableA

上面的语句主要分两部分

  • window函数部分(window_func)

  • 窗口定义部分

3.1 window函数部分

   windows函数部分即是:在窗口上执行的函数。主要有count 、sum、avg聚合类窗口函数、还有常用的row_number、rank这样的排序函数。

3.2  窗口定义部分

即为: over里面的三部分内容(均可省略不写)

  • partition by 分区

  • order by 排序

  • (rows | range )between ... and .....  窗口子句

ps :Hive 窗口函数的详细介绍:

(07)Hive——窗口函数详解_hive 窗口函数-CSDN博客

3.3  window Function实现原理

   窗口函数的实现,主要借助 Partitioned Table Function (即PTF);

(1)PTF的输入可以是:表、子查询或另一个PTF函数输出;

(2)PTF输出是一张表。

写一个相对复杂的sql,来看一下执行窗口函数时,数据的流转情况:

select id,sq,cell_type,rank,row_number() over(partition by id  order by rank ) as rn ,rank() over(partition by id order by rank) as r,dense_rank() over(partition by  cell_type order by id) as dr  from window_test_table group byid,sq,cell_type,rank;

数据流转如下图:

以上代码实现主要有三个阶段:

  • 计算除窗口函数以外所有的其他运算,如:group by,join ,having等。上面的代码的第一阶段即为:

selectid, sq, cell_type, rank
from window_test_table
group byid, sq, cell_type, rank;
  • 将第一步的输出作为第一个 PTF 的输入,计算对应的窗口函数值。上面代码的第二阶段即为:

select id,sq,cell_type,rank,rn,r 
from 
window(<w>,--将第一阶段输出记为wpartition by id, --分区order by rank, --窗口函数的order[rn:row_number(),r:rank()] --窗口函数调用)

   由于row_number(),rank() 两个函数对应的窗口是相同的(partition by id  order by rank),因此,这两个函数可以在一次shuffle中完成。

  • 将第二步的输出结果作为 第二个PTF 的输入,计算对应的窗口函数值。上面代码的第三阶段即为:

select id,sq,cell_type,rank,rn,r,dr
from 
window(<w1>,--将第二阶段输出记为w1partition by cell_type, --分区order by id, --窗口函数的order[dr:dense_rank()] --窗口函数调用)

    由于dense_rank()的窗口与前两个函数不同,因此需要再partition一次,得到最终的输出结果。

     总结:上述代码显示需要shuffle三次才能得到最终的结果(第一阶段的group by ,第二阶段,第三阶段的开窗操作)。对应到MapReduce程序,即需要经历三次 map->reduce组合;对应到spark sql上,需要Exchange三次,再加上中间排序操作,在数据量很大的情况下,效率上确实会有较大的影响。

四、窗口函数的性能问题

   在使用Hive进行数据处理时,借助窗口函数可以对数据进行分组、排序等操作,但是在使用row_number这类窗口函数时,会遇到性能较慢的问题,j即比普通的聚合函数( sum,min,max等)运行成本更高,为啥?

4.1 性能问题产生原因

4.1.1 第一个版本

小破站一个up主给出的答案:

 原因:

(1)开窗函数不能做预聚合 ,数据量很多,shuffle慢,计算慢,并且会有

数据倾斜的风险;

(2)开窗多一步order by ,更耗时间;

4.1.2 第二个版本

原因:

(1)普通的聚合函数语句,可以根据函数不同,采用partial + merge 的方式运行,也即是map端预聚合;但那是window 窗口语句只能在reduce 端一次性聚合,即只有complete 执行模式。

(2)普通聚合函数的物理执行计划分为SortBased和HashBased的;而window是SortBased。

(3)window语句作用于 对行,并为每行返回一个聚合结果,这决定了window在执行过程中需要更大的buffer 进行汇总。

4.2 性能问题的优化方法

4.2.1 用聚合函数替代 排序开窗函数

     例如:假设需要求出历史至今用户粒度末次交易的sku名称或者交易金额等,这种情况下,可以将 交易时间和sku名称拼接起来,取max ,之后再将sku名称拆解开,即能达到预期效果。

    在Hive 中,row_number是一个常用的窗口函数,用于为结果集中的每一行分配一个唯一的数字。通常会搭配over子句来指定窗口的范围和排序方式。例如:

select col1,col2,row_number() over (partition by col1 order by col2  窗口子句) as rnfrom tableA

   上述示例row_number 函数将根据col1进行分组,并按照col2的值进行排序,为每一组数据分配一个唯一的行号。然而,在处理大规模数据时,使用row_number可能会导致性能下降,这是因为row_number 需要对数据进行排序和标记,而这些操作在大数据量下会消耗较多的计算资源。

   注: 以下都是row_number() over () 开窗函数性能优化的几种方式:

4.2.2 减少数据量

   一种最直接的优化方法是减少需要进行row_number计算的数据量。可以通过在where子句中添加条件、对数据进行分区等方式来减小数据规模,从而提升计算性能。

   ps: 这种方式在生产环境中用过。

4.2.3 避免多次排序

   在使用row_number时,尽量避免多次排序操作。可以将row_number 函数应用在子查询中,然后再进行排序操作,避免重复的排序过程。

selectcol1,col2,rn
from 
( select col1,col2,row_number() over (partition by col1 order by col2) as rnfrom tableA) tmp1
order by col1,col2;

参考文章:

常用的SQL优化方式, 用聚合函数替代排序开窗求最值, sparksql, hivesql_哔哩哔哩_bilibili

https://blog.51cto.com/u_16213435/9877979

Hive学习(一)窗口函数源码阅读_hive 源码阅读-CSDN博客

https://mp.weixin.qq.com/s/WBryrbpHGO9jmzMp0e7jhw

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/394888.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

1. js混淆-源码乱码

目录 调试干扰参数逆向 调试干扰 打开开发者工具&#xff0c;首先会进入 setInterval 生成的 debugger 将 uzt.js uyt.js 内容替换 将这两个文件的内容置空&#xff0c;并刷新页面就可以正常调试了 参数逆向 点击翻页&#xff0c;可以发现 https://match.yuanrenxue.cn/api…

Arduino导入实例程序的过程,实例包文件却编译显示缺失文件

参考中实例程序中的readme.txt 导入方式 下面是文档中的使用方式 1.基本信息&#xff1a; 本例程是基于Arduino进行开发的&#xff0c;例程均在E-Paper ESP8266 Driver Board上进行了验证;2.基本使用&#xff1a;方法1&#xff1a;将整个esp8266-waveshare-epd文件夹复制到C…

【Go】通过反射解析对象tag信息,实现简易ORM

反射是运行时&#xff0c;需要在运行时解析类型信息&#xff0c;编译期无法优化这些操作&#xff0c;因此比编译时已知类型信息的直接调用效率要低。 package mainimport ("fmt""reflect""strings" )type Person struct {Name string json:&quo…

PicGo + gitee 免费搭建个人图床

目录 1 图床概念2 使用gitee和PicGo搭建图床流程2.1 下载安装PicGo工具 3 图片上传错误处理3.1 PicGo客户端提示404错误信息图片上传失败3.2 PicGo客户端提示400错误信息图片上传失败 1 图床概念 ​ "图床"是一个网络术语&#xff0c;它指的是一种用于存储和托管图片…

理解张量拼接(torch.cat)

拼接 维度顺序&#xff1a;对于 3D 张量&#xff0c;通常可以理解为 (深度, 行, 列) 或 (批次, 行, 列)。 选择一个dim进行拼接的时候其他两个维度大小要相等 对于三维张量&#xff0c;理解 torch.cat 的 dim 参数确实变得更加抽象&#xff0c;但原理是相同的。让我们通过一…

算法力扣刷题记录 六十九【动态规划基础及509. 斐波那契数】

前言 调整一下做题顺序&#xff0c;多个章节同步进行&#xff0c;穿插练习。可以在各章节的专栏中找同一类。 记录 六十九【动态规划基础】。 一、动态规划理论基础学习 参考学习链接 二、509. 斐波那契数 2.1 题目阅读 斐波那契数 &#xff08;通常用 F(n) 表示&#x…

html+css+js网页设计 中国移动5个页面(带js)

htmlcssjs网页设计 中国移动5个页面&#xff08;带js&#xff09; 网页作品代码简单&#xff0c;可使用任意HTML编辑软件&#xff08;如&#xff1a;Dreamweaver、HBuilder、Vscode 、Sublime 、Webstorm、Text 、Notepad 等任意html编辑软件进行运行及修改编辑等操作&#xf…

Cpp中的this指针--复习记录

1.什么是this指针? 每个类都有一个this指针&#xff0c;我们的非静态成员函数可以通过这个this指针来操作对象的成员属性。this指针存储的就是类的实例的地址&#xff0c;this指针时时刻刻指向的都是这个实例对象本身。 由下图可知: 我在主函数中栈上创建了一个类的实例(由操…

【Python-实操】LabelMe to YOLOv8 Converter

LabelMe to YOLOv8 Converter 这是一个 Python 脚本&#xff0c;用于将 LabelMe 标注工具导出的 JSON 文件转换为 YOLOv8 格式的标注文件&#xff0c;并同时在图像上绘制标注的多边形。 功能 读取 LabelMe JSON 文件。解码并显示图像。从 classes.txt 文件加载类别标签。将多…

Java | Leetcode Java题解之第327题区间和的个数

题目&#xff1a; 题解&#xff1a; class Solution {public int countRangeSum(int[] nums, int lower, int upper) {long sum 0;long[] preSum new long[nums.length 1];for (int i 0; i < nums.length; i) {sum nums[i];preSum[i 1] sum;}BalancedTree treap ne…

Java参数传递

Java参数传递 一、 方法重载 一个类中可以存在多个同名的方法&#xff0c;只要这些方法的参数列表不同即可。 参数列表不同&#xff1a;参数个数或者参数类型不同方法重载与修饰符、返回值类型等统统无关&#xff0c;只看参数列表 二、 可变个数的形参 从Java5.0开始&…

陶瓷材质的防静电架空地板越来越受欢迎的原因

目前市面上的陶瓷防静电架空地板主要分为两种&#xff1a;钢基和硫酸钙基。前者是以全钢冲孔裸板作为板基&#xff0c;经粘接、固定整型和灌浆的方式加工而成&#xff0c;后者是以复合硫酸钙板为基材&#xff0c;表面粘接防静电陶瓷砖&#xff0c;四周导电PVC边条封边。近年来陶…

【C++】vector 的模拟实现

&#x1f4e2;博客主页&#xff1a;https://blog.csdn.net/2301_779549673 &#x1f4e2;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; 如有错误敬请指正&#xff01; &#x1f4e2;本文由 JohnKi 原创&#xff0c;首发于 CSDN&#x1f649; &#x1f4e2;未来很长&#…

02_快速启动 Demo 创建 Electron 项目、electron-forge 搭建一个 electron 项目、手动创建electron项目

快速启动 Demo 创建 Electron 项目 一、克隆一个仓库、快速启动一个项目二、electron-forge 搭建一个 electron 项目三、手动搭建一个 electron 项目四、开发工具中配置 Eslint 一、克隆一个仓库、快速启动一个项目 要使用 git 的话首先电脑上面需要安装 git //克隆示例项目的…

Qt3D给圆环等立体图形添加纹理图片

添加纹理图片&#xff0c;首先需要自己找一个纹理图&#xff0c;当然了&#xff0c;随便什么图片都行。 创建3D图形的主要步骤查看另一篇文章。 这里主要代码如下&#xff1a; 使用QTextureLoader加载图片&#xff0c;图片路径需为qrc:/的路径。 auto *planeTransform1 ne…

嵌入式学习day13(C高级Linux命令)

一丶进程管理命令 1.grep 功能&#xff1a;从文件中查找字符串 格式&#xff1a;grep "要查找的字符串" 文件名 精确查找&#xff1a;grep "\<要查找的字符串\>" 文件名 结合ps以及管道&#xff1a;ps -ef | grep a.out: 从进程信息中查找带…

10个理由告诉你,为什么鸿蒙是下一个职业风口!

在当今科技飞速发展的时代&#xff0c;新的技术和趋势不断涌现&#xff0c;为人们带来了前所未有的机遇和挑战。鸿蒙操作系统作为我国自主研发的创新成果&#xff0c;正逐渐成为科技领域的焦点&#xff0c;被认为是下一个职业风口。 10个理由告诉你&#xff0c;为什么鸿蒙是下一…

【海贼王航海日志:前端技术探索】CSS你了解多少?(二)

目录 1 -> 字体属性 1.1 -> 设置字体 1.2 -> 字体大小 1.3 -> 字体粗细 1.4 -> 文字样式 2 -> 文本属性 2.1 -> 文本颜色 2.1.1 -> 认识RGB 2.1.2 -> 设置文本颜色 2.2 -> 文本对齐 2.3 -> 文本装饰 2.4 -> 文本缩进 2.5 -&g…

vue的nextTick是下一次事件循环吗

如题&#xff0c;nextTick的回调是在下一次事件循环被执行的吗&#xff1f; 是不是下一次事件循环取决于nextTick的实现&#xff0c;如果是用的微任务&#xff0c;那么就是本次事件循环&#xff1b;否则如果用的是宏任务&#xff0c;那么就是下一次事件循环。 我们看下Vue3中…

【Canvas与艺术】黄色立体感放射光芒五角星

【成图】 【代码】 <!DOCTYPE html> <html lang"utf-8"> <meta http-equiv"Content-Type" content"text/html; charsetutf-8"/> <head><title>黄色立体感放射光芒五角星</title><style type"text/c…