Flink之窗口聚合算子

1.窗口聚合算子

在Flink中窗口聚合算子主要分类两类

  • 滚动聚合算子(增量聚合)
  • 全窗口聚合算子(全量聚合)
1.1 滚动聚合算子

滚动聚合算子一次只处理一条数据,通过算子中的累加器对聚合结果进行更新,当窗口触发时再从累加器中取结果数据,一般使用算子如下:

  • aggregate
  • max
  • maxBy
  • min
  • minBy
  • reduce
  • sum

这里以aggregate算子作为示例

// ... 
// 每10s统计一次每个用户最近30s的行为条数
SingleOutputStreamOperator<Tuple2<String, Integer>> result = watermarked.keyBy(userEvent -> userEvent.getUId()).window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10))) // 参数1:窗口长度 参数2:滑动步长即计算频率.aggregate(new AggregateFunction<UserEvent2, Tuple2<String, Integer>, Tuple2<String, Integer>>() {// 这里给一个初始值@Overridepublic Tuple2<String, Integer> createAccumulator() {return Tuple2.of("", 0);}// 在累加器中统计每个用户行为条数(来一条更新一次)@Overridepublic Tuple2<String, Integer> add(UserEvent2 value, Tuple2<String, Integer> accumulator) {Tuple2<String, Integer> result = Tuple2.of(value.getUId() + "-" + value.getName(), accumulator.f1 + 1);return result;}// 将累加器中的更新结果给到getResult方法,输出@Overridepublic Tuple2<String, Integer> getResult(Tuple2<String, Integer> accumulator) {return accumulator;}// 这个方法在流式计算中可以不用实现,在上下游数据进行合并时需要用到,以spark为例,上有map和下游reduce的计算结果需要合并时需要实现这个方法@Overridepublic Tuple2<String, Integer> merge(Tuple2<String, Integer> a, Tuple2<String, Integer> b) {Tuple2<String, Integer> merged = Tuple2.of(a.f0, a.f1 + b.f1);return merged;}});
// ...

只展示部分代码,冗余代码已省略.
图解如下:
image-20231012101658054

1.2 全窗口聚合算子

全窗口聚合算子会将数据记录在状态容器中,当窗口触发时会将整个窗口中的数据交给聚合函数,根据具体逻辑将这些数据进行计算,常用算子如下:

  • apply
  • process

这里以apply算子为例

// ... 
// 每10s统计一次最近30s每个用户行为发生事件最大两条数据
SingleOutputStreamOperator<UserEvent2> userEventTimeTop2 = keyedStream.window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10)))// 泛型1: 数据数据类型 泛型2: 输出数据类型 泛型3: key类型 泛型4: 窗口类型.apply(new WindowFunction<UserEvent2, UserEvent2, String, TimeWindow>() {/***@Param s 本次传入的key*@Param window 本次传入窗口的各种元信息*@Param input 本次输入的所有数据*@Param out 输出数据**/@Overridepublic void apply(String s, TimeWindow window, Iterable<UserEvent2> input, Collector<UserEvent2> out) throws Exception {// 创建集合接收迭代器中的数据ArrayList<UserEvent2> userEvent2List = new ArrayList<>();// 遍历迭代器,也就是输入数据for (UserEvent2 userEvent2 : input) {// 将数据添加到集合中userEvent2List.add(userEvent2);}// 将集合中的数据根据用户行为发生事件进行排序Collections.sort(userEvent2List, new Comparator<UserEvent2>() {@Overridepublic int compare(UserEvent2 o1, UserEvent2 o2) {// 倒序排序return Integer.parseInt(o2.getTime()) - Integer.parseInt(o1.getTime());}});// 将每个用户行为发生时间最大的两条数据输出for (int i = 0; i < Math.min(userEvent2List.size(), 2); i++) {out.collect(userEvent2List.get(i));}}});
// ...

只展示部分代码,冗余代码已省略.
图解如下:
image-20231012101658054

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/157655.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何通过 NFTScan API 按照 NFT 合约地址检索数据?

在当前 NFT 市场还在不断扩张的背景下&#xff0c;各种 NFT 项目依旧是井喷式涌现&#xff0c;投资者和开发者都面临获取项目全貌数据的困境。公链上提取和处理大量的数据既费时又费力&#xff0c;缺乏全面的信息支持&#xff0c;将难以深入判断一个 NFT 项目的真实情况&#x…

react中ant.design框架配置动态路由

目录 什么是动态路由&#xff1f; 应用场景&#xff1a; ant.design动态路由如何配置&#xff1a; 首先&#xff1a;找到app.tsx文件 然后&#xff1a;找到menuHeaderRender 其次&#xff1a;修改menuHeaderRender为menuDataRender​编辑 最后&#xff1a;在箭头函数里re…

Jenkins集成newman

一、Docker环境准备 二、Jenkins环境准备 三、登录Jenkins 安装NodeJs插件 四、Jenkins全局工具配置Nodejs 五、创建Jenkins自由风格项目 构建步骤1&#xff1a;选择Execute NodeJS script构建步骤2&#xff1a;选择执行shell脚本 六、将postman相关的脚本、环境变量数据、全局…

VUE echarts 柱状图、折线图 双Y轴 显示

weekData: [“1周”,“2周”,“3周”,“4周”,“5周”,“6周”,“7周”,“8周”,“9周”,“10周”], //柱状图横轴 jdslData: [150, 220, 430, 360, 450, 680, 100, 450, 680, 200], // 折线图的数据 cyslData: [100, 200, 400, 300, 500, 500, 500, 450, 480, 400], // 柱状图…

python flask接口字段存在性校验函数(http接口字段校验)(返回提示缺少的字段信息)validate_fields()

文章目录 字段存在性校验示例 字段存在性校验 from flask import Flask, request, jsonifyapp Flask(__name__)def validate_fields(data, fields):missing_fields [field for field in fields if field not in data]if missing_fields:return False, f"缺少以下字段: …

算法解析:LeetCode——机器人碰撞和最低票价

摘要&#xff1a;本文由葡萄城技术团队原创并首发。转载请注明出处&#xff1a;葡萄城官网&#xff0c;葡萄城为开发者提供专业的开发工具、解决方案和服务&#xff0c;赋能开发者。 机器人碰撞 问题&#xff1a; 现有 n 个机器人&#xff0c;编号从 1 开始&#xff0c;每个…

ubuntu 安装jdk21开发环境

下载 wget https://download.oracle.com/java/21/latest/jdk-21_linux-x64_bin.tar.gz 第二步&#xff1a;解压 tar -zxvf jdk-21_linux-x64_bin.tar.gz 第三步&#xff1a;移动 jdk-21 目录到 /usr/local/jdk21 第四步&#xff1a;配置环境变量 sudovim/etc/profile vim/etc/…

Excel 的单元格内容和单元格格式

文章目录 单元格内容单元格格式常规格式数字格式 单元格内容 文本&#xff1a;只要不是纯数字&#xff0c;Excel 都默认是文本格式。 在 Excel 中&#xff0c;逻辑值只有两个&#xff1a;True 和 False。 全选一片区域&#xff0c;按 Delet 键删除内容时&#xff0c;确实可以删…

VTK编译解决CMake的“could not find any instance of Visual Studio”的问题

1、在配置VTK源码编译的过程中&#xff0c;遇到报错 “CMake的“could not find any instance of Visual Studio””,cmake在编程找不到vs2017路径或者配置不全。 解决方案&#xff1a; 安装“Visual Studio Installer”&#xff1b; 1.检查是否安装 “使用C的桌面开发” 2.检…

ACP.复盘方法

复盘要怎么做的有水准&#xff0c;让领导满意&#xff0c;方式方法很重要。今天给你们安利5种复盘方法&#xff0c;保准你省事&#xff0c;领导还满意。 一、KPT复盘法 7月份年中一直在做和复盘相关的事&#xff0c;像公司的OKR复盘、年中战略规划&#xff0c;不过日常很多生…

力扣刷题 day43:10-13

1.完全平方数 给你一个整数 n &#xff0c;返回 和为 n 的完全平方数的最少数量 。 完全平方数 是一个整数&#xff0c;其值等于另一个整数的平方&#xff1b;换句话说&#xff0c;其值等于一个整数自乘的积。例如&#xff0c;1、4、9 和 16 都是完全平方数&#xff0c;而 3 …

SOLIDWORKS® 2024 新功能 - 3D CAD

1、 先前版本的兼容性 • 利用您订阅的 SOLIDWORKS&#xff0c;可将您的 SOLIDWORKS 设计作品保存为旧版本&#xff0c;与使用旧版本 SOLIDWORKS 的供应商无缝协作。 • 可将零件、装配体和工程图保存为新版本前两年之内的SOLIDWORKS 版本。 优点&#xff1a; 即使其他用户正…

内存空间的分配与回收之连续分配管理方式

1.连续分配管理方式 连续分配:指为用户进程分配的必须是一个连续的内存空间。 1.单一连续分配 在单一连续分配方式中&#xff0c;内存被分为系统区和用户区。系统区通常位于内存的低地址部分&#xff0c;用于存放操作系统相关数据;用户区用于存放用户进程相关数据。内存中只…

多服务器云探针源码(服务器云监控)/多服务器多节点_云监控程序python源码

源码简介&#xff1a; 多服务器云探针源码(服务器云监控),支持python多服务器多节点&#xff0c;云监控程序源码。它是一款很实用的云探针和服务器云监控程序源码。使用它可以帮助管理员能够快速监控和管理各种服务器和节点&#xff0c;实用性强。 源码链接&#xff1a; 网盘…

【机器学习】sklearn特征选择(feature selection)

文章目录 特征工程过滤法&#xff08;Filter&#xff09;方差过滤相关性过滤卡方过滤F验表互信息法小结 嵌入法&#xff08;Embedded&#xff09;包装法&#xff08;Wrapper&#xff09; 特征工程 特征提取(feature extraction)特征创造(feature creation)特征选择(feature se…

lv8 嵌入式开发-网络编程开发 17 套接字属性设置

1 基本概念 设置套接字的选项对套接字进行控制除了设置选项外&#xff0c;还可以获取选项选项的概念相当于属性&#xff0c;所以套接字选项也可说是套接字属性有些选项&#xff08;属性&#xff09;只可获取&#xff0c;不可设置&#xff1b;有些选项既可设置也可获取 2 选项…

Ansible概述和模块解释

一、Ansible概述 Ansible介绍 Ansible是一个基于Python开发的配置管理和应用部署工具&#xff0c;现在也在自动化管理领域大放异彩。它融合了众多老牌运维工具的优点&#xff0c;Pubbet和Saltstack能实现的功能&#xff0c;Ansible基本上都可以实现。 Ansible能做什么 Ansi…

呼吁社区共同维护Sui品牌和商标

Sui社区成员可以通过举报Sui商标和品牌资产的不当使用来帮助保护网络的信誉。Sui商标政策解释了logo和名称的可接受和不可接受的使用方式。这些展示代表Sui面向公众&#xff0c;而善意行为者的正确使用有助于维护Sui的声誉。 Sui网络在公众中享有良好声誉&#xff0c;Sui社区都…

GEO生信数据挖掘(七)差异基因分析

上节&#xff0c;我们使用结核病基因数据&#xff0c;做了一个数据预处理的实操案例。例子中结核类型&#xff0c;包括结核&#xff0c;潜隐进展&#xff0c;对照和潜隐&#xff0c;四个类别。本节延续上个数据&#xff0c;进行了差异分析。 差异分析 计算差异指标step12 加载…

API攻防-接口安全SOAPOpenAPIRESTful分类特征导入项目联动检测

文章目录 概述什么是接口&#xff1f; 1、API分类特征SOAP - WSDLWeb services 三种基本元素&#xff1a; OpenApi - Swagger UISpringboot Actuator 2、API检测流程Method&#xff1a;请求方法URL&#xff1a;唯一资源定位符Params&#xff1a;请求参数Authorization&#xff…