flink的窗口

目录

窗口分类

1.按照驱动类型分类

1. 时间窗口(Time window)

2.计数窗口(Count window)

2.按照窗口分配数据的规则分类

窗口API分类

API调用

窗口分配器器:

窗口函数

增量聚合函数:

全窗口函数

flink sql 窗口函数

窗口 | Apache Flink

窗口分类

1.按照驱动类型分类

1. 时间窗口(Time window)

    时间窗口以时间点定义窗口的开始和结束,因此截取出就是某一段时间的数据。当到达结束时间时窗口不在接受数据,触发计算输出结果,并关闭销毁窗口。

flink有一个专门的类用来表示时间窗口TimeWindow,这个类只有两个私有属性;窗口的方法获取最大时间戳为end-1,因此窗口[start,end)  左开右闭;

@PublicEvolving
public class TimeWindow extends Window {private final long start;private final long end;public TimeWindow(long start, long end) {this.start = start;this.end = end;}@Overridepublic long maxTimestamp() {return end - 1;}
2.计数窗口(Count window)

计数窗口是基于元素个数截取,在到达固定个数是就触发计算并关闭窗口。

3.全局窗口(Global Windows)

是计数窗口的底层实现,窗口分配器由GlobalWindows类提供,需要自定义触发器实现窗口的计算;

 stream.keyBy(data -> true).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(2)))
//                .max().aggregate(new AvgPv()).print();查看源代码,windou函数后见windowStrream时获取默认的触发器
@PublicEvolvingpublic WindowedStream(KeyedStream<T, K> input, WindowAssigner<? super T, W> windowAssigner) {this.input = input;this.builder =new WindowOperatorBuilder<>(windowAssigner,windowAssigner.getDefaultTrigger(input.getExecutionEnvironment()),  //湖区触发器input.getExecutionConfig(),input.getType(),input.getKeySelector(),input.getKeyType());}// 计数窗口底层采用全局窗口加计数器来实现public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));}public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {return window(GlobalWindows.create()).evictor(CountEvictor.of(size)).trigger(CountTrigger.of(slide));}

2.按照窗口分配数据的规则分类

滚动窗口(Tumbling Window):窗口大小固定,窗口没有重叠;

滑动窗口 (Sliding Window):滑动窗口有重叠,也可以没有重叠,如果窗口size和滑动size相等,等于滚动窗口;

会话窗口 (Session Window):基于会话对窗口进行分组,与其他两个不同的是,会话窗口是借用会话窗口的超时失效机触发窗口计算,当数据到来后会开启一个窗口,如果在超时时间内有数据陆续到来,窗口不会关闭,反之会关闭;极端情况,如果数据总能在窗口超时时间到达前远远不断的到来,该窗口会一直开启不会关闭;

全局窗口 (Global Window):比较通用的窗口,该窗口会把数据分配到一个窗口中,窗口为全局有效,会把相同key的数据分配到同一个窗口中,默认不会触发计算,跟没有窗口一样,需要自定义触发器才能使用;

窗口API分类

窗口大的分类可以分为按键分区和非按键分区两种:按键分需要经过keyby操作,会把数据进行分发,实现负载均分,可以并行处理更大的数据量。而非按键分区窗口,相当于并行度为1,使用上直接调用windowall(),因此一般并不推荐使用;

stream
.keyby(...)  //流按键分区
.window(...)  //定义窗口分配器
[.trigger()] //设置出发器
[.evictor()]   //设置移除器
[.allowedLateness()]  // 设置延迟时间
[.sideOutputLateData()]  //设置侧输出流
.reduce/aggregate/fold/apply()  //处理函数
[.getSideOutput()] //获取侧输出流stream
.windowAll(...)  //定义窗口分配器
[.trigger()] //设置出发器
[.evictor()]   //设置移除器
[.allowedLateness()]  // 设置延迟时间
[.sideOutputLateData()]  //设置侧输出流
.reduce/aggregate/fold/apply()  //处理函数
[.getSideOutput()] //获取侧输出流

API调用

窗口操作包含两个重要的概念:窗口分配器(window Assigners)和窗口函数(window function)两部分;

窗口分配器用于构建窗口,确定窗口类型,确定数据划分哪一个窗口,窗口函数制定数据的计算规则;

窗口分配器器:

作用:窗口分配器用来划分窗口属于哪一个窗口;

窗口按照时间可以划分为:滚动、滑动和session,三种类型窗口;

窗口计数划分:滚动和滑动两种类型;

  eventStream.keyBy(data -> data.url).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).aggregate();
窗口函数

窗口函数按照计算特点可以分为增量计算和全量计算;

增量聚合函数:数据到达后立即计算,窗口只保存中间结果。效率高,性能好,但不够灵活。

全量聚合函数:缓存窗口的所有元素,触发后统一计算,效率低,但计算灵活。

增量聚合函数:

数据进入窗口会参与计算,窗口结束前只需要保留一个聚合后的状态值,内存压力小。

1.规约函数(ReduceFunction):数据保存留一个状态,输入类型和输出类型必须一致,来一条数据会处理将数据合并到状态中;

 stream.keyBy(r -> r.f0)// 设置滚动事件时间窗口.window(TumblingEventTimeWindows.of(Time.seconds(5))).reduce(new ReduceFunction<Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {// 定义累加规则,窗口闭合时,向下游发送累加结果return Tuple2.of(value1.f0, value1.f1 + value2.f1);}}).print();

sum、max、min等底层都是通过同名AggregateFunction实现(非下面的聚合函数),本质还是实现ReduceFunction结构重写了reduce方法;

2.聚合函数(AggrateFunction):在规约函数基础上进行完善。解决输出和输入类型必须一致的限制问题。实现应用更灵活;

  // 所有数据设置相同的key,发送到同一个分区统计PV和UV,再相除stream.keyBy(data -> true).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(2))).aggregate(new AvgPv()).print();public static class AvgPv implements AggregateFunction<Event, Tuple2<HashSet<String>, Long>, Double> {@Overridepublic Tuple2<HashSet<String>, Long> createAccumulator() {// 创建累加器return Tuple2.of(new HashSet<String>(), 0L);}@Overridepublic Tuple2<HashSet<String>, Long> add(Event value, Tuple2<HashSet<String>, Long> accumulator) {// 属于本窗口的数据来一条累加一次,并返回累加器accumulator.f0.add(value.user);return Tuple2.of(accumulator.f0, accumulator.f1 + 1L);}@Overridepublic Double getResult(Tuple2<HashSet<String>, Long> accumulator) {// 窗口闭合时,增量聚合结束,将计算结果发送到下游return (double) accumulator.f1 / accumulator.f0.size();}@Overridepublic Tuple2<HashSet<String>, Long> merge(Tuple2<HashSet<String>, Long> a, Tuple2<HashSet<String>, Long> b) {return null;}}
全窗口函数

全窗口函数会将进入窗口的数据先进行缓存,然后在窗口关闭时一起计算,缓存数据会占用内存资源,如果一个窗口数据量太大时,可能出现内存溢出的问题;

全窗口函数可以划分窗口函数(windowFunction)和处理窗口函数(processWindowFunction)两种;

窗口函数(windowFunction):老版本通用窗口接口,window()后调用apply(),传入实现windowFunction接口; 缺点是不能获取上下文信息,也没有更高级的功能。因为在功能上可以被processWindowFunction全覆盖,因此主键被弃用

DataStream<Tuple2<String, Long>> input = ...;input.keyBy(<key selector>).window(<window assigner>).apply(new MyWindowFunction());@Public
public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {/*** Evaluates the window and outputs none or several elements.** @param key The key for which this window is evaluated.* @param window The window that is being evaluated.* @param input The elements in the window being evaluated.* @param out A collector for emitting elements.* @throws Exception The function may throw exceptions to fail the program and trigger recovery.*/void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}

处理窗口函数(processWindowFunction):是窗口API中最底层通用的窗口函数接口,可以获取到上问对象(context),实现为调用process方法传入自定义继承ProcessWindowFunction类;

input.keyBy(t -> t.f0).window(TumblingEventTimeWindows.of(Time.minutes(5))).process(new MyProcessWindowFunction());/* ... */public class MyProcessWindowFunction extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {@Overridepublic void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {long count = 0;for (Tuple2<String, Long> in: input) {count++;}out.collect("Window: " + context.window() + "count: " + count);}
}

注意:一般增量窗口函数和全量窗口函数可以一起使用,window().aggregate()方法可以传入两个函数,第一个采用增量聚合函数,第二个传入全量函数,这样数据在进入窗口会触发增量计算,窗口不会缓存数据。当窗口关闭触发计算时,结果数据穿度到全量计算,参数Iterable中一般只有一个数据;

aggregate(acct1,acct2)

flink sql 窗口函数

flink sql 窗口也包含常见的滚动窗口、滑动窗口、session窗口,但还有一种累计窗口。

在flink1.13版本后flinksql支持累计窗口CUMULATE,可以实现没5分钟触发一次计算,输出当天的累计数据,使用样例

SELECT cast(PROCTIME() as timestamp_ltz) as window_end_time,manufacturer_name,event_id,case when state is null then -1 else state end ,cast(sum(agg)as string ) as agg
FROM TABLE(CUMULATE(TABLE dm_cumulate, DESCRIPTOR(ts1), INTERVAL '5' MINUTES, INTERVAL '1' DAY(9)))
GROUP BYwindow_end,window_start,manufacturer_name,event_id,case when state is null then -1 else state end

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/364903.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

网络编程常见问题

1、TCP状态迁移图 2、TCP三次握手过程 2.1、握手流程 1、TCP服务器进程先创建传输控制块TCB&#xff0c;时刻准备接受客户进程的连接请求&#xff0c;此时服务器就进入了LISTEN&#xff08;监听&#xff09;状态&#xff1b; 2、TCP客户进程也是先创建传输控制块TCB&#xff…

Echarts地图实现:杭州市困难人数分布【动画滚动播放】

Echarts地图实现&#xff1a;杭州市困难人数分布 实现功能 杭州市地区以及散点图分布结合的形式数据展示动画轮播可进去杭州市下级地区可返回杭州市地图展示 效果预览 实现思路 使用ECharts的地图和散点图功能结合实现地区分布通过动画轮播展示数据变化实现下级地区数据的展…

搜索引擎的原理与相关知识

搜索引擎是一种网络服务&#xff0c;它通过互联网帮助用户找到所需的信息。搜索引擎的工作原理主要包括以下几个步骤&#xff1a; 网络爬虫&#xff08;Web Crawler&#xff09;&#xff1a;搜索引擎使用网络爬虫&#xff08;也称为蜘蛛或机器人&#xff09;来遍历互联网&#…

Hugging Face Accelerate 两个后端的故事:FSDP 与 DeepSpeed

社区中有两个流行的零冗余优化器 (Zero Redundancy Optimizer&#xff0c;ZeRO)算法实现&#xff0c;一个来自DeepSpeed&#xff0c;另一个来自PyTorch。Hugging FaceAccelerate对这两者都进行了集成并通过接口暴露出来&#xff0c;以供最终用户在训练/微调模型时自主选择其中之…

flask-socket的实践

1.长连接和短连接的由来 1&#xff09;TCP在真正的读写操作之前&#xff0c;server与client之间必须建立一个连接&#xff0c; 当读写操作完成后&#xff0c;双方不再需要这个连接时它们可以释放这个连接&#xff0c; 连接的建立通过三次握手&#xff0c;释放则需要四次握手…

java基于ssm+jsp 二手车交易网站

1用户功能模块 定金支付管理&#xff0c;在定金支付管理页面可以填写订单编号、车型、品牌、分类、车身颜色、售价、订金金额、付款日期、备注、用户名、姓名、联系方式、是否支付等信息&#xff0c;进行详情、修改&#xff0c;如图1所示。 图1定金支付管理界面图 预约到店管…

“论大数据处理架构及其应用”写作框架,软考高级,系统架构设计师

论文真题 大数据处理架构是专门用于处理和分析巨量复杂数据集的软件架构。它通常包括数据收集、存储、处理、分析和可视化等多个层面&#xff0c;旨在从海量、多样化的数据中提取有价值的信息。Lambda架构是大数据平台里最成熟、最稳定的架构&#xff0c;它是一种将批处理和流…

kicad第三方插件安装问题

在使用KICAD时想安装扩展内容&#xff0c;但是遇到下载失败&#xff0c;因为SSL connect error。 因为是公司网络&#xff0c;我也不是很懂&#xff0c;只能另寻他法。找到如下方法可以曲线救国。 第三方插件包目录 打开存放第三方插件存放目录&#xff0c;用于存放下载插件包…

通俗范畴论4 范畴的定义

注:由于CSDN无法显示本文章源文件的公式,因此部分下标、字母花体、箭头表示可能会不正常,请读者谅解 范畴的正式定义 上一节我们在没有引入范畴这个数学概念的情况下,直接体验了一个“苹果1”范畴,建立了一个对范畴的直观。本节我们正式学习范畴的定义和基本性质。 一个…

【WPF】Windows系统桌面应用程序编程开发新手入门-打造自己的小工具

电脑Windows系统上的桌面程序通常是用Visual Studio 开发工具编写出来的&#xff0c;有两种开发方式供选择&#xff0c;一种是WindowForm&#xff0c;简称WinForm&#xff0c;另一种是Windows Presentation Foundation&#xff0c;简称WPF&#xff0c;这里将学习WPF项目。 文章…

收银系统源码-千呼新零售【全场景收银】

千呼新零售2.0系统是零售行业连锁店一体化收银系统&#xff0c;包括线下收银线上商城连锁店管理ERP管理商品管理供应商管理会员营销等功能为一体&#xff0c;线上线下数据全部打通。 适用于商超、便利店、水果、生鲜、母婴、服装、零食、百货、宠物等连锁店使用。 详细介绍请…

云计算-期末复习题-框架设计/选择/填空/简答(2)

目录 框架设计 1.负载分布架构 2.动态可扩展架构 3.弹性资源容量架构 4.服务负载均衡架构 5.云爆发结构 6.弹性磁盘供给结构 7.负载均衡的虚拟服务器实例架构 填空题/简答题 单选题 多选题 云计算期末复习部分练习题&#xff0c;包括最后的部分框架设计大题(只是部分…

C++ | Leetcode C++题解之第200题岛屿数量

题目&#xff1a; 题解&#xff1a; class Solution { private:void dfs(vector<vector<char>>& grid, int r, int c) {int nr grid.size();int nc grid[0].size();grid[r][c] 0;if (r - 1 > 0 && grid[r-1][c] 1) dfs(grid, r - 1, c);if (r …

基于HarmonyOS NEXT开发智能提醒助手

目录 目录 目录 前言 关于HarmonyOS NEXT 智能提醒助手需求分析 智能提醒助手设计 1、系统架构 2、功能模块 智能提醒助手的应用场景 智能提醒助手的竞争力 具体技术实现 未来展望 结束语 前言 随着智能设备的普及和物联网技术的飞速发展&#xff0c;人们对于智能…

云计算 | 期末梳理(下)

1.模运算 2. 拓展欧几里得算法 3.扩散和混淆、攻击的分类 香农的贡献:定义了理论安全性,提出扩散和混淆原则,奠定了密码学的理论基础。扩散:将每一位明文尽可能地散布到多个输出密文中去,以更隐蔽明文数字的统计特性。混淆:使密文的统计特性与明文密钥之间的关系尽量复杂…

混合专家模型(MoE)的前世今生

在文章《聊聊最近很火的混合专家模型&#xff08;MoE&#xff09;》中&#xff0c;我们简单介绍了MoE模型的定义和设计&#xff0c;并且比较了MoE和Dense模型的区别&#xff0c;今天我们继续来回顾一下MoE模型发展的历史和最新的发展现状。 从去年GPT-4发布至今&#xff0c;MoE…

初阶数据结构之堆讲解

本篇文章带大家学习的是堆&#xff0c;还请各位观众老爷给个三连 正片开始 堆的概念 如果有一个关键码的集合 K { &#xff0c; &#xff0c; &#xff0c; … &#xff0c; } &#xff0c;把它的所有元素按完全二叉树的顺序存储方式存储 在一个一维数组中&#xff0c;并满…

查看Windows启动时长

&#xff08;附图片&#xff09;电脑自带检测开机时长---查看方式_电脑开机时长命令-CSDN博客 eventvwr - Windows日志 - 系统 - 查找 - 6013.jpg

SpringBoot(一)创建一个简单的SpringBoot工程

Spring框架常用注解简单介绍 SpringMVC常用注解简单介绍 SpringBoot&#xff08;一&#xff09;创建一个简单的SpringBoot工程 SpringBoot&#xff08;二&#xff09;SpringBoot多环境配置 SpringBoot&#xff08;三&#xff09;SpringBoot整合MyBatis SpringBoot&#xff08;四…

VMware中的三种虚拟网络模式

虚拟机网络模式 1 主机网络环境2 VMware中的三种虚拟网络模式2.1 桥接模式NAT模式仅主机模式网络模式选择1 VMware虚拟网络配置2 虚拟机选择网络模式3 Windows主机网络配置 配置静态IP 虚拟机联网方式为桥接模式&#xff0c;这种模式下&#xff0c;虚拟机通过主机的物理网卡&am…