【Flink】窗口(Window)

窗口理解

窗口(Window)是处理无界流的关键所在。窗口可以将数据流装入大小有限的“桶”中,再对每个“桶”加以处理。 本文的重心将放在 Flink 如何进行窗口操作以及开发者如何尽可能地利用 Flink 所提供的功能。

对窗口的正确理解
我们将窗口理解为一个一个的水桶,数据流(stream)就像水流,每个数据都会分发到对应的桶中,当达到结束时间时,对每个桶中收集的数据进行计算处理
在这里插入图片描述

Flink中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口

窗口的分类

按照驱动类型分

时间窗口(Time Window)

以时间来定义窗口的开始和结束,获取某一段时间内的数据(类比于我们的定时发车

计数窗口(Count Window)

计数窗口是基于元素的个数来获取窗口,达到固定个数时就计算并关闭窗口。(类比于我们的人齐才发车

按照窗口分配数据的规则分类

滚动窗口(Tumbling Window)

窗口之间没有重叠,也不会有间隔的首尾相撞状态,这样,每个数据都会被分到一个窗口,而且只会属于一个窗口。
滚动窗口的应用非常广泛,它可以对每个时间段做聚合统计,很多BI分析指标都可以用它来实现。
在这里插入图片描述

DataStream<T> input = ...;// 滚动 event-time 窗口
input.keyBy(<key selector>).window(TumblingEventTimeWindows.of(Time.seconds(5))).<windowed transformation>(<window function>);// 滚动 processing-time 窗口
input.keyBy(<key selector>).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).<windowed transformation>(<window function>);// 长度为一天的滚动 event-time 窗口, 偏移量为 -8 小时。
input.keyBy(<key selector>).window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))).<windowed transformation>(<window function>);

滑动窗口(Sliding Windows)

滑动窗口大小也是固定的,但是窗口之间并不是首尾相接的,而是重叠的。
在这里插入图片描述

DataStream<T> input = ...;// 滑动 event-time 窗口
input.keyBy(<key selector>).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).<windowed transformation>(<window function>);// 滑动 processing-time 窗口
input.keyBy(<key selector>).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).<windowed transformation>(<window function>);// 滑动 processing-time 窗口,偏移量为 -8 小时
input.keyBy(<key selector>).window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))).<windowed transformation>(<window function>);

会话窗口(Session Windows)

会话窗口,是基于“会话”(session)来对数据进行分组的,会话窗口只能基于时间来定义。
在这里插入图片描述

DataStream<T> input = ...;// 设置了固定间隔的 event-time 会话窗口
input.keyBy(<key selector>).window(EventTimeSessionWindows.withGap(Time.minutes(10))).<windowed transformation>(<window function>);// 设置了动态间隔的 event-time 会话窗口
input.keyBy(<key selector>).window(EventTimeSessionWindows.withDynamicGap((element) -> {// 决定并返回会话间隔})).<windowed transformation>(<window function>);// 设置了固定间隔的 processing-time session 窗口
input.keyBy(<key selector>).window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))).<windowed transformation>(<window function>);// 设置了动态间隔的 processing-time 会话窗口
input.keyBy(<key selector>).window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {// 决定并返回会话间隔})).<windowed transformation>(<window function>);

全局窗口

这种窗口对全局有效,会把相同的key的所有数据分配到同一个窗口中,这种窗口没有结束时间,默认不会触发计算,如果希望对数据进行处理,需要自定义“触发器”。
在这里插入图片描述

DataStream<T> input = ...;input.keyBy(<key selector>).window(GlobalWindows.create()).<windowed transformation>(<window function>);

计数窗口

计数窗口概念非常简单,本身底层是基于全局窗口(Global Window)实现的。Flink为我们提供了非常方便的接口:直接调用.countWindow()方法

滚动计数窗口

滚动计数窗口只需要传入一个长整型的参数size,表示窗口的大小。

stream.keyBy(...).countWindow(10)
滑动计数窗口

与滚动计数窗口类似,不过需要在.countWindow()调用时传入两个参数:size和slide,前者表示窗口大小,后者表示滑动步长。

stream.keyBy(...).countWindow(103)

窗口函数(Window Functions)

定义了 window assigner 之后,我们需要指定当窗口触发之后,我们如何计算每个窗口中的数据, 这就是 window function 的职责了
窗口函数有三种:ReduceFunction、AggregateFunction 或 ProcessWindowFunction。

ReduceFunction

ReduceFunction 指定两条输入数据如何合并起来产生一条输出数据,输入和输出数据的类型必须相同。 Flink 使用 ReduceFunction 对窗口中的数据进行增量聚合。

DataStream<Tuple2<String, Long>> input = ...;input.keyBy(<key selector>).window(<window assigner>).reduce(new ReduceFunction<Tuple2<String, Long>>() {//v1 和v2是 2个相同类型的输入参数public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {return new Tuple2<>(v1.f0, v1.f1 + v2.f1);}});

AggregateFunction

ReduceFunction 是 AggregateFunction 的特殊情况。 AggregateFunction 接收三个类型:输入数据的类型(IN)、累加器的类型(ACC)和输出数据的类型(OUT)。

/*** The accumulator is used to keep a running sum and a count. The {@code getResult} method* computes the average.*/
private static class AverageAggregateimplements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {@Overridepublic Tuple2<Long, Long> createAccumulator() {return new Tuple2<>(0L, 0L);}@Overridepublic Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);}@Overridepublic Double getResult(Tuple2<Long, Long> accumulator) {return ((double) accumulator.f0) / accumulator.f1;}@Overridepublic Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);}
}DataStream<Tuple2<String, Long>> input = ...;input.keyBy(<key selector>).window(<window assigner>).aggregate(new AverageAggregate());

接口中有四个方法:

  • createAccumulator():创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
  • add():将输入的元素添加到累加器中。
  • getResult():从累加器中提取聚合的输出结果。
  • merge():合并两个累加器,并将合并后的状态作为一个累加器返回。

可以看到,AggregateFunction的工作原理是:首先调用createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次add()方法,对数据进行聚合,得到的结果保存在状态中;等到了窗口需要输出时,再调用getResult()方法得到计算结果。很明显,与ReduceFunction相同,AggregateFunction也是增量式的聚合;而由于输入、中间状态、输出的类型可以不同,使得应用更加灵活方便。

ProcessWindowFunction

ProcessWindowFunction 有能获取包含窗口内所有元素的 Iterable, 以及用来获取时间和状态信息的 Context 对象,比其他窗口函数更加灵活。 ProcessWindowFunction 的灵活性是以性能和资源消耗为代价的, 因为窗口中的数据无法被增量聚合,而需要在窗口触发前缓存所有数据

public class WindowProcessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("127.0.0.1", 7777).map(new WaterSensorMapFunction());KeyedStream<WaterSensor, String> keyedStream = sensorDS.keyBy(WaterSensor::getId);WindowedStream<WaterSensor, String, TimeWindow> sensorWS = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));sensorWS.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) {// 上下文可以拿到window对象,还有其他东西:侧输出流 等等long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + key + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements);}}).print();env.execute();}
}

增量聚合和全窗口函数的结合使用

在实际应用中,我们往往希望兼具这两者的优点,把它们结合在一起使用。
我们之前在调用WindowedStream的.reduce()和.aggregate()方法时,只是简单地直接传入了一个ReduceFunction或AggregateFunction进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是WindowFunction或者ProcessWindowFunction。

// ReduceFunction与WindowFunction结合
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction,WindowFunction<TRKW> function) // ReduceFunction与ProcessWindowFunction结合
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction,ProcessWindowFunction<TRKW> function)// AggregateFunction与WindowFunction结合
public <ACCVR> SingleOutputStreamOperator<R> aggregate(AggregateFunction<TACCV> aggFunction,WindowFunction<VRKW> windowFunction)// AggregateFunction与ProcessWindowFunction结合
public <ACCVR> SingleOutputStreamOperator<R> aggregate(AggregateFunction<TACCV> aggFunction,ProcessWindowFunction<VRKW> windowFunction)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/198234.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

nn.KLDivLoss,nn.CrossEntropyLoss,nn.MSELoss,Focal_Loss

KL loss&#xff1a;https://blog.csdn.net/qq_50001789/article/details/128974654 https://pytorch.org/docs/stable/nn.html 1. nn.L1Loss 1.1 公式 L1Loss: 计算预测 x和 目标y之间的平均绝对值误差MAE, 即L1损失&#xff1a; l o s s 1 n ∑ i 1 , . . . n ∣ x i…

【C++入门到精通】新的类功能 | 可变参数模板 C++11 [ C++入门 ]

阅读导航 引言一、新的类功能1. 默认成员函数2. 类成员变量初始化3. 强制生成默认函数的关键字default4. 禁止生成默认函数的关键字delete5. override 和 final&#xff08;1&#xff09;override&#xff08;2&#xff09;final 二、可变参数模板递归函数方式展开参数包逗号表…

读像火箭科学家一样思考笔记03_第一性原理(上)

1. 思维的两种障碍 1.1. 为什么知识会成为一种缺陷而非一种美德 1.1.1. 知识是一种美德 1.1.2. 知识同样的特质也会把它变成一种缺点 1.1.3. 知识确实是个好东西&#xff0c;但知识的作用应该是给人们提供信息&#xff0c;而不是起约束作用 1.1.4. 知识应该启发智慧&#…

Git精讲

Git基本操作 创建Git本地仓库 git initgit clone 配置Git git config [--global] user.name "Your Name" git config [--global] user.email "emailexample.com"–global是一个可选项。如果使用了该选项&#xff0c;表示这台机器上所有的Git仓库都会使…

6 Redis的慢查询配置

1、redis的命令执行流程 redis的慢查询只针对步骤3 默认情况下&#xff0c;慢查询的阈值是10ms 在配置文件中进行配置 //这个参数的单位为微秒 //如果将这个值设置为负数&#xff0c;则会禁用慢日志功能 //如果将其设置为0&#xff0c;则会强制记录每个命令 slowlog-log-slow…

【C++历练之路】list的重要接口||底层逻辑的三个封装以及模拟实现

W...Y的主页 &#x1f60a; 代码仓库分享&#x1f495; &#x1f354;前言&#xff1a; 在C的世界中&#xff0c;有一种数据结构&#xff0c;它不仅像一个神奇的瑰宝匣&#xff0c;还像一位能够在数据的海洋中航行的智慧舵手。这就是C中的list&#xff0c;一个引人入胜的工具…

立仪科技光谱共焦在半导体领域的应用

半导体技术在近年来以极快的速度发展&#xff0c;对质量和精密度的要求也不断提升。在这样的背景下&#xff0c;用于材料与设备研究的先进检测技术如光谱共焦成像将自然地找到一席之地。下面我们将详细探讨一下光谱共焦在半导体领域中的应用。 光谱共焦技术&#xff0c;通过在细…

【DevOps】Git 图文详解(四):Git 使用入门

Git 图文详解&#xff08;四&#xff09;&#xff1a;Git 使用入门 1.创建仓库2.暂存区 add3.提交 commit 记录4.Git 的 “指针” 引用5.提交的唯一标识 id&#xff0c;HEAD~n 是什么意思&#xff1f;6.比较 diff 1.创建仓库 创建本地仓库的方法有两种&#xff1a; 一种是创建…

MongoDB之索引和聚合

文章目录 一、索引1、说明2、原理3、相关操作3.1、创建索引3.2、查看集合索引3.3、查看集合索引大小3.4、删除集合所有索引&#xff08;不包含_id索引&#xff09;3.5、删除集合指定索引 4、复合索引 二、聚合1、说明2、使用 总结 一、索引 1、说明 索引通常能够极大的提高查…

CSS的选择器(一篇文章齐全)

目录 Day26&#xff1a;CSS的选择器 1、CSS的引入方式 2、CSS的选择器 2.1 基本选择器​编辑 2.2 组合选择器 2.3 属性选择器 2.4 伪类选择器 2.5 样式继承 2.6 选择器优先级 3、CSS的属性操作 3.1 文本属性 3.2 背景属性 3.3 边框属性 3.4 列表属性 3.5 dispal…

Hive调优

1.参数配置优化 设定Hive参数有三种方式&#xff1a; &#xff08;1&#xff09;配置Hive文件 当修改配置Hive文件的设定后&#xff0c;对本机启动的所有Hive进程都有效&#xff0c;因此配置是全局性的。 一般地&#xff0c;Hive的配置文件包括两部分&#xff1a; a&#xff…

Node.js之TCP(net)

Hi I’m Shendi Node.js之TCP&#xff08;net&#xff09; 最近使用Nodejs编写程序&#xff0c;需要用到自己编写的分布式工具&#xff0c;于是需要将Java版的用NodeJs重新写一遍&#xff0c;需要使用到TCP通信&#xff0c;于是在这里记录下Node.js TCP 的使用方法 依赖 需要使…

【面试经典150 | 算术平方根】

文章目录 写在前面Tag题目来源解题思路方法一&#xff1a;数学表达式方法二&#xff1a;二分法 其他语言python3 写在最后 写在前面 本专栏专注于分析与讲解【面试经典150】算法&#xff0c;两到三天更新一篇文章&#xff0c;欢迎催更…… 专栏内容以分析题目为主&#xff0c;并…

Asp.net MVC Api项目搭建

整个解决方案按照分层思想来划分不同功能模块&#xff0c;以提供User服务的Api为需求&#xff0c;各个层次的具体实现如下所示&#xff1a; 1、新建数据库User表 数据库使用SQLExpress版本&#xff0c;表的定义如下所示&#xff1a; CREATE TABLE [dbo].[User] ([Id] …

YOLOv8改进 | 2023 | InnerIoU、InnerSIoU、InnerWIoU、FoucsIoU等损失函数

论文地址&#xff1a;官方Inner-IoU论文地址点击即可跳转 官方代码地址&#xff1a;官方代码地址-官方只放出了两种结合方式CIoU、SIoU 本位改进地址&#xff1a; 文末提供完整代码块-包括InnerEIoU、InnerCIoU、InnerDIoU等七种结合方式和其Focus变种 一、本文介绍 本文给…

手写消息队列(基于RabbitMQ)

一、什么是消息队列&#xff1f; 提到消息队列是否唤醒了你脑海深处的记忆&#xff1f;回看前面的这篇文章&#xff1a;《Java 多线程系列Ⅳ&#xff08;单例模式阻塞式队列定时器线程池&#xff09;》&#xff0c;其中我们在介绍阻塞队列时说过&#xff0c;阻塞队列最大的用途…

PWM实验

PWM相关概念 PWM:脉冲宽度调制定时器 脉冲&#xff1a;方波信号&#xff0c;高低电平变化产生方波 周期&#xff1a;高低电平变化所需要时间 频率&#xff1a;1s钟可以产生方波个数 占空比&#xff1a;在一个方波内&#xff0c;高电平占用的百分比 宽度调制&#xff1a;占…

开发知识点-uniapp微信小程序-开发指南

uniapp Vue的原型链生命周期函数onLoaduni.chooseLocationgetCurrentPages美团外卖微信小程序开发uniapp-美团外卖微信小程序开发P1 成果展示P2外卖小程序后端&#xff0c;学习给小程序写http接口P3 主界面配置P4 首页组件拆分P13 外卖列表布局筛选组件商家 布局测试数据创建样…

莹莹API管理系统源码附带两套模板

这是一个API后台管理系统的源码&#xff0c;可以自定义添加接口&#xff0c;并自带两个模板。 环境要求 PHP版本要求高于5.6且低于8.0&#xff0c;已测试通过的版本为7.4。 需要安装PHPSG11加密扩展。 已测试&#xff1a;宝塔/主机亲测成功搭建&#xff01; 安装说明 &am…

Flutter 中数据存储的四种方式

在 Flutter 中&#xff0c;存储是指用于本地和远程存储和管理数据的机制。以下是 Flutter 中不同存储选项的概述和示例。 Shared Preferences&#xff08;本地键值存储&#xff09; Shared Preferences 是一种在本地存储少量数据&#xff08;例如用户首选项或设置&#xff09…