Flink SQL TopN语句详解

TopN 定义(⽀持 Batch\Streaming): TopN 对应离线数仓的 row_number(),使⽤ row_number() 对某⼀个分组的数据进⾏排序。

应⽤场景: 根据 某个排序 条件,计算 某个分组 下的排⾏榜数据。

SQL 语法标准:

SELECT [column_list]
FROM (SELECT [column_list],ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownumFROM table_name)
WHERE rownum <= N [AND conditions];
  • ROW_NUMBER() :标识 TopN 排序⼦句;
  • PARTITION BY col1[, col2…] :标识分区字段,代表按照这个 col 字段作为分区粒度对数据排序取 topN,下述案例中的 partition by key ,根据需求中的搜索关键词(key)做为分区;
  • ORDER BY col1 [asc|desc][, col2 [asc|desc]…] :标识 TopN 的排序规则,是按照哪些字段、顺序或逆序进⾏排序;
  • WHERE rownum <= N :这个⼦句是必须的,加上这个⼦句,Flink 才能将其识别为 TopN 查询,其中 N 代表 TopN 的条⽬数;
  • [AND conditions] :其他的限制条件也可以加上。

实际案例: 取某个搜索关键词下的搜索热度前 10 名的词条数据。

输⼊数据为搜索词条数据的搜索热度数据,当搜索热度发⽣变化时,会将变化后的数据写⼊到数据源的 Kafka 中:

数据源 schema:-- 字段名 备注
-- key 搜索关键词
-- name 搜索热度名称
-- search_cnt 热搜消费热度(⽐如 3000)
-- timestamp 消费词条时间戳
CREATE TABLE source_table (name STRING NOT NULL,search_cnt BIGINT NOT NULL,key STRING NOT NULL,row_time timestamp(3),WATERMARK FOR row_time AS row_time
) WITH ('connector' = 'filesystem', 'path' = 'file:///Users/hhx/Desktop/source_table.csv','format' = 'csv'
);A,100,a,2021-11-01 00:01:03
A,200,a,2021-11-02 00:01:03
A,300,a,2021-11-03 00:01:03
B,200,b,2021-11-01 00:01:03
B,300,b,2021-11-02 00:01:03
B,400,b,2021-11-03 00:01:03
C,300,c,2021-11-01 00:01:03
C,400,c,2021-11-02 00:01:03
C,500,c,2021-11-03 00:01:03
D,400,d,2021-11-01 00:01:03
D,500,d,2021-11-02 00:01:03
D,600,d,2021-11-03 00:01:03-- 数据汇 schema:
-- key 搜索关键词
-- name 搜索热度名称
-- search_cnt 热搜消费热度(⽐如 3000)
-- timestamp 消费词条时间戳
CREATE TABLE sink_table (key BIGINT,name BIGINT,search_cnt BIGINT,`timestamp` TIMESTAMP(3)
) WITH (...
);-- DML 逻辑
INSERT INTO sink_table
SELECT key, name, search_cnt, row_time as `timestamp`
FROM (SELECT key, name, search_cnt, row_time, -- 根据热搜关键词 key 作为 partition key,然后按照 search_cnt 倒排取前 2 名ROW_NUMBER() OVER (PARTITION BY key ORDER BY search_cnt desc) AS rownumFROM source_table)
WHERE rownum <= 2

输出结果:

在这里插入图片描述

注意: 包含回撤流。

上⾯ SQL 会翻译成以下三个算⼦

数据源 :数据源即最新的词条下⾯的搜索词的搜索热度数据,消费到 Kafka 中数据后,按照 partition key 将数据进⾏ hash 分发到下游排序算⼦,相同的 key 数据将会发送到⼀个并发中;

排序算⼦ :为每个 Key 维护了⼀个 TopN 的榜单数据,接受到上游的⼀条数据后,如果 TopN 榜单还没有到达 N 条,则将这条数据加⼊ TopN 榜单后,直接下发数据,如果到达 N 条之后,经过 TopN 计算,发现这条数据⽐原有的数据排序靠前,那么新的 TopN 排名就会有变化,就变化了的这部分数据,之前下发的排名数据被撤回(即回撤数据),然后下发新的排名数据;

数据汇 :接收到上游的数据之后,然后输出到外部存储引擎中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/183902.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于Java+SpringBoot+LayUI仓库管理系统

一.项目介绍 本项目是使用JavaSpringBoot开发&#xff0c;可以实现仓库的注册、登录&#xff0c;登录后可进入系统&#xff0c;进行客户管理、供应商管理、商品管理、商品退货查询管理、登录日志及退出等几大模块。系统界面采用传统的后台管理界面&#xff0c;界面简单、直观。…

【大数据】NiFi 中的处理器(一):GenerateTableFetch

NiFi 中的处理器&#xff08;一&#xff09;&#xff1a;GenerateTableFetch 1.简介2.应用场景3.示例3.1 案例一&#xff1a;无输入流文件&#xff0c;来源表含增量字段3.2 案例二&#xff1a;无输入流文件&#xff0c;不含增量字段3.3 案例三&#xff1a;无输入流文件&#xf…

Transformer的最简洁pytorch实现

目录 前言 1. 数据预处理 2. 模型参数 3. Positional Encoding 4. Pad Mask 5. Subsequence Mask 6. ScaledDotProductAttention 7. MultiHeadAttention 8. FeedForward Networks 9. Encoder Layer 10. Encoder 11. Decoder Layer 12. Decoder 13. Transformer 1…

【单片机基础小知识-如何通过指针来读写寄存器】

寄存器的本质就是内存&#xff0c;RAM&#xff0c;而指针是可以对内存进行操作的&#xff0c;因此可以通过指针来读写寄存器。 如何读取以下一片地址&#xff1a; 步骤1、首地址 结构体&#xff0c;它所占用的内存空间大小与它内部成员有关。 构造一个28字节的类型 type…

计算机服务器中了locked勒索病毒怎么办,勒索病毒解密,数据恢复

随着网络技术的不断成熟&#xff0c;网络中存在的病毒威胁也不断增多&#xff0c;近期&#xff0c;云天数据恢复中心陆续接到很多企业的求助&#xff0c;企业的计算机服务器数据库遭到了勒索病毒攻击&#xff0c;并且勒索病毒的攻击与加密形式也发生了许多变化。其中攻击次数较…

python对Windows如何进行关机/重启?

用CMD命令进行关机/重启步骤&#xff1a; 1.winR&#xff0c;换出输入框 2.在输入框输入命令&#xff0c;如关机&#xff1a;shutdown -s -t 20&#xff0c;该命令是20秒后关机。 命令说明 -s 关机 -r 重启 -t 时间&#xff0c;后面是数字是你要设置的秒数 -a 取消命令&…

bilibili快速升满级(使用Docker 容器脚本)

部署bilibili升级运行容器脚本 docker run --name"bili" -v /bili/Logs:/app/Logs -e Ray_DailyTaskConfig__Cron"30 9 * * *" -e Ray_LiveLotteryTaskConfig__Cron"40 9 * * *" -e Ray_UnfollowBatchedTaskConfig__Cron"…

​软考-高级-信息系统项目管理师教程 第四版【第19章-配置与变更管理-思维导图】​

软考-高级-信息系统项目管理师教程 第四版【第19章-配置与变更管理-思维导图】 课本里章节里所有蓝色字体的思维导图

安防监控EasyCVR视频汇聚平台使用海康SDK播放时,画面播放缓慢该如何解决?

视频云存储/安防监控EasyCVR视频汇聚平台基于云边端智能协同&#xff0c;支持海量视频的轻量化接入与汇聚、转码与处理、全网智能分发、视频集中存储等。安防视频平台EasyCVR拓展性强&#xff0c;视频能力丰富&#xff0c;具体可实现视频监控直播、视频轮播、视频录像、云存储、…

node插件express(路由)的插件使用(二)——body-parser和ejs插件的基本使用

文章目录 前言一、express使用中间件body-parser获取请全体的数据1. 代码2. 效果 二、express使用ejs&#xff08;了解即可&#xff09;1.安装2.作用3.基本使用&#xff08;1&#xff09;代码&#xff08;2&#xff09;代码分析和效果 4.列表渲染&#xff08;1&#xff09;代码…

【算法| 差分 No.1】AcWing 797. 差分 AcWing 798. 差分矩阵

个人主页&#xff1a;兜里有颗棉花糖 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 兜里有颗棉花糖 原创 收录于专栏【手撕算法系列专栏】 &#x1f354;本专栏旨在提高自己算法能力的同时&#xff0c;记录一下自己的学习过程&#xff0c;希望对大家有所帮…

时序预测 | MATLAB实现基于SVM-Adaboost支持向量机结合AdaBoost时间序列预测

时序预测 | MATLAB实现基于SVM-Adaboost支持向量机结合AdaBoost时间序列预测 目录 时序预测 | MATLAB实现基于SVM-Adaboost支持向量机结合AdaBoost时间序列预测预测效果基本介绍模型描述程序设计参考资料 预测效果 基本介绍 1.Matlab实现SVM-Adaboost时间序列预测&#xff08;风…

SpringBoot案例(数据层、业务层、表现层)

1.创建项目 2.选择坐标 3.添加坐标 说明&#xff1a;为了便于开发&#xff0c;引入了lombak坐标。 <!--添加mybatis-plus坐标--><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><ver…

verilog 每日一练- 移位寄存器

module shift_1x64 (clk, shift,sr_in,sr_out,);input clk, shift;input sr_in;output sr_out;reg [63:0] sr;always(posedge clk)beginif (shift 1b1)beginsr[63:1] < sr[62:0];sr[0] < sr_in;endendassign sr_out sr[63];endmodule 这个Verilog模块 shift_1x64 实现了…

1、Sentinel基本应用限流规则(1)

Sentinel基本应用&限流规则 1.1 概述与作用 随着微服务的流行&#xff0c;服务和服务之间的稳定性变得越来越重要。缓存、降级和限流是保护微服务系统运行稳定性的三大利器。 缓存&#xff1a;提升系统访问速度和增大系统能处理的容量 降级&#xff1a;当服务出问题或者影…

Linux cat命令

连接文件并打印输出到标准输出设备。cat 命令可以用来显示文本文件的内容&#xff08;类似于 DOS 下的 type 命令&#xff09;&#xff0c;也可以把几个文件内容附加到另一个文件中&#xff0c;即连接合并文件。 关于此命令&#xff0c;有人认为写 cat 命令的人是因为喜欢猫&am…

技术分享 | app自动化测试(Android)--触屏操作自动化

导入TouchAction Python 版本 from appium.webdriver.common.touch_action import TouchAction Java 版本 import io.appium.java_client.TouchAction; 常用的手势操作 press 按下 TouchAction 提供的常用的手势操作有如下操作&#xff1a; press 按下 release 释放 …

[PHP]ShopXO企业级B2C免费开源商城系统 v2.3.1

ShopXO 企业级B2C免费开源电商系统&#xff01; 求实进取、创新专注、自主研发、国内领先企业级B2C电商系统解决方案。 遵循Apache2开源协议发布&#xff0c;无需授权、可商用、可二次开发、满足99%的电商运营需求。 PCH5、支付宝小程序、微信小程序、百度小程序、头条&抖音…

【JVM系列】- 挖掘·JVM堆内存结构

挖掘JVM堆内存结构 文章目录 挖掘JVM堆内存结构堆的核心概念堆的特点 堆的内存结构内存划分新生代/新生区&#xff08;Young Generation&#xff09;老年代&#xff08;Tenured Generation&#xff09;永久代&#xff08;或元数据区&#xff09;&#xff08;PermGen 或 MetaSpa…

水利部加快推进小型水库除险加固,大坝安全监测是重点

国务院常务会议明确到2025年前&#xff0c;完成新出现病险水库的除险加固&#xff0c;配套完善重点小型水库雨水情和安全监测设施&#xff0c;实现水库安全鉴定和除险加固常态化。 为加快推进小型水库除险加固前期工作&#xff0c;水利部协调财政部提前下达了2023年度中央补助…