【入门Flink】- 08Flink时间语义和窗口概念

Flink-Windows

是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(Window)。

注意:Flink 中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口。【事件驱动,没有数据到达永远都不会创建窗口】

1)窗口分类

(1)按照驱动类型分

(1)时间窗口

时间窗口以时间点来定义窗口的开始(start)和结束(end),截取出的就是某一时间段的数据。

(2)计数窗口

计数窗口基于元素的个数截取数据,到达固定的个数时就触发计算并关闭窗口。

(2)按照窗口分配数据的规则分类

根据分配数据的规则,窗口的具体实现可以分为 4 类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)、会话窗口(Session Window),以及全局窗口(Global Window)。

(1)滚动窗口(Tumbling Windows)

滚动窗口有固定的大小,是一种对数据进行“均匀切片”的划分方式。窗口之间没有重叠,也不会有间隔,是
“首尾相接”的状态。这是最简单的窗口形式,每个数据都会被分配到一个窗口,而且只会属于一个窗口

滚动窗口应用非常广泛,可以对每个时间段做聚合统计,很多BI分析指标都可以用它来实现。

(2)滑动窗口(Sliding Windows)

滑动窗口的大小也是固定的。但是窗口之间并不是首尾相接的,而是可以“错开”一定的位置。定义滑动窗口的参数有两个:除去窗口大小(window size)之外,还有一个“滑动步长”(window slide),它其实就代表了窗口计算的频率。窗口在结束时间触发计算输出结果,那么滑动步长就代表了计算频率

滚动窗口也可以看作是一种特殊的滑动窗口一一窗口大小等于滑动步长(size=slide)
滑动窗口适合计算结果更新频率非常高的场景。

(3)会话窗口(Session Windows)

会话窗口,是基于“会话”(session)来来对数据进行分组的。会话窗口只能基于时间来定义。
会话窗口中,最重要的参数就是会话的超时时间,也就是两个会话窗口之间的最小距离。如果相邻两个数据到
来的时间间隔(gap)小于指定的大小(size),那说明还在保持会话,它们就属于同一个窗口;如果gap大于size,
那么新来的数据就应该属于新的会话窗口,而前一个窗口就应该关闭了。

会话窗口之间一定是不会重叠的,而且会留有至少为size的间隔(session)

在一些类似保持会话的场景下,可以使用会话窗口来进行数据的处理统计。

(4)全局窗口(Global Windows)

“全局窗口”,这种窗口全局有效,会把相同key的所有数据都分配到同一个窗口中。这种窗口没有结束的时侯, 默认是不会做触发计算的,如果希望它能对数据进行计算处理,还需要自定义“触发器”(Trigger)。

2)窗口 API

(1)按键分区(Keyed)和非按键分区(Non-Keyed)

(1)按键分区窗口(Keyed Windows)

经过按键分区 keyBy 操作后,数据流会按照 key 被分为多条逻辑流(logical streams),这就是 KeyedStream。

stream.keyBy(...)
.window(...)

(2)非按键分区(Non-Keyed Windows)

如果没有进行 keyBy,那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了 1。

stream.windowAll(...)

注意:对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll本身就是一个非并行的操作。

(2)窗口分配器(Window Assigners)和窗口函数(WindowFunctions)

stream.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(<window function>)

窗口分配器

(1)时间窗口

滚动处理时间窗口

stream.keyBy(...)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5))).aggregate(...)

.of()还有一个重载方法,可以传入两个 Time 类型的参数:size 和offset。第一个参数当然还是窗口大小,第二个参数则表示窗口起始点的偏移量。

滑动处理时间窗口

stream.keyBy(...)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(...)

滑动窗口同样可以追加第三个参数,用于指定窗口起始点的偏移量,用法与滚动窗口完全一致。

处理时间会话窗口

stream.keyBy(...)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
.aggregate(...)

还可以调用 withDynamicGap()方法定义 session gap 的动态提取逻辑。

滚动事件时间窗口

stream.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.seconds(5))).aggregate(...)

滑动事件时间窗口

stream.keyBy(...)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(...)

事件时间会话窗口

stream.keyBy(...)
.window(EventTimeSessionWindows.withGap(Time.seconds(10))).aggregate(...)

(2)计数窗口

滚动计数窗口

stream.keyBy(...)
.countWindow(10)

滑动计数窗口

stream.keyBy(...)
.countWindow(10, 3)

全局窗口

stream.keyBy(...)
.window(GlobalWindows.create());

注意:使用全局窗口,必须自行定义触发器才能实现窗口计算,否则起不到任何作用。

窗口函数

(1)增量聚合函数(ReduceFunction / AggregateFunction)

归约函数(ReduceFunction)

类似Reduce算子,只不过固定时间才会输出

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<String> stream = env.socketTextStream("124.222.253.33", 7777);stream.map(new WaterSensorMapFunction()).keyBy(WaterSensor::getId)// 设置滚动事件时间窗口.window(TumblingProcessingTimeWindows.of(Time.seconds(10))).reduce(new ReduceFunction<WaterSensor>() {@Overridepublic WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {System.out.println("调用reduce 方法,之前的结果:" + value1 + ",现在来的数据:" + value2);return new WaterSensor(value1.getId(), System.currentTimeMillis(), value1.getVc() + value2.getVc());}}).print();env.execute();

聚合函数(AggregateFunction)

ReduceFunction 可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样

image-20231109192227819

有三种类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。

输入类型IN 就是输入流中元素的数据类型;累加器类型 ACC 是进行聚合的中间状态类型;而输出类型OUT是最终计算结果的类型。

接口中有四个方法:

  • createAccumulator():创建一个累加器,为聚合创建了一个初始状态,每个聚合任务只会调用一次。
  • add():将输入的元素添加到累加器中。
  • getResult():从累加器中提取聚合的输出结果。
  • merge():合并两个累加器,并将合并后的状态作为一个累加器返回。

AggregateFunction 的工作原理:首先调用createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次 add()方法,对数据进行聚合,得到的结果保存在状态中;等到了窗口需要输出时,再调用 getResult()方法得到计算结果。很明显,与 ReduceFunction 相同,AggregateFunction 也是增量式的聚合;而由于输入、中间状态、输出的类型可以不同,使得应用更加灵活方便。

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("124.222.253.33", 7777).map(new WaterSensorMapFunction());KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(WaterSensor::getId);WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));SingleOutputStreamOperator<String> aggregate = sensorWS.aggregate(new AggregateFunction<WaterSensor, Integer, String>() {@Overridepublic Integer createAccumulator() {System.out.println("创建累加器");return 0;}@Overridepublic Integer add(WaterSensor value, Integer accumulator) {System.out.println(" 调用add方法,value=" + value);return accumulator + value.getVc();}@Overridepublic String getResult(Integer accumulator) {System.out.println("调用getResult方法");return accumulator.toString();}@Overridepublic Integer merge(Integer a, Integer b) {System.out.println("调用merge方法");return null;}});aggregate.print();env.execute();

(2)全窗口函数(full window functions)

基于全部的数据计算

全窗口函数有两种:WindowFunction ProcessWindowFunction

窗口函数(WindowFunction)

基于 WindowedStream 调用.apply()方法,传入一个 WindowFunction 的实现类。

stream
.keyBy(<key selector>)
.window(<window assigner>)
.apply(new MyWindowFunction());

该类中可以获取到包含窗口所有数据的可迭代集合(Iterable),还可以拿到窗口(Window)本身的信息。

不过 WindowFunction 能提供的上下文信息较少,也没有更高级的功能。事实上,它的作用可以被 ProcessWindowFunction 全覆盖,所以之后可能会逐渐弃用

处理窗口函数(ProcessWindowFunction)

ProcessWindowFunction 还可以获取到一个“上下文对象”(Context)。上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。

时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("124.222.253.33", 7777).map(new WaterSensorMapFunction());KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(WaterSensor::getId);WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));SingleOutputStreamOperator<String> process = sensorWS.process(new ProcessWindowFunction<WaterSensor,String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long count = elements.spliterator().estimateSize();long windowStartTs = context.window().getStart();long windowEndTs = context.window().getEnd();String windowStart = DateFormatUtils.format(windowStartTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(windowEndTs, "yyyy-MM-dd HH:mm:ss.SSS");out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ") 包含 " + count + " 条数据===>" + elements);}});process.print();env.execute();

增量聚合和全窗口函数结合使用

// ReduceFunction 与 WindowFunction 结合
public <R> SingleOutputStreamOperator<R> reduce(
ReduceFunction<T> reduceFunction,WindowFunction<TRKW>function)
// ReduceFunction 与 ProcessWindowFunction 结合
public <R> SingleOutputStreamOperator<R> reduce(
ReduceFunction<T> reduceFunction,ProcessWindowFunction<TRKW> function)// AggregateFunction 与 WindowFunction 结合
public <ACCVR> SingleOutputStreamOperator<R> aggregate(AggregateFunction<TACCV> aggFunction,WindowFunction<VRKW> windowFunction)
// AggregateFunction 与 ProcessWindowFunction 结合
public <ACCVR> SingleOutputStreamOperator<R> aggregate(AggregateFunction<TACCV> aggFunction,
ProcessWindowFunction<VRKW>     

结合使用

public class WindowAggregateAndProcessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("124.222.253.33", 7777).map(new WaterSensorMapFunction());KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));// 2. 窗口函数:/*增量聚合 Aggregate + 全窗口 process1、增量聚合函数处理数据: 来一条计算一条2、窗口触发时, 增量聚合的结果(只有一条)传递给全窗口函数3、经过全窗口函数的处理包装后,输出结合两者的优点:1、增量聚合: 来一条计算一条,存储中间的计算结果,占用的空间少2、全窗口函数: 可以通过 上下文 实现灵活的功能*/// sensorWS.reduce() //也可以传两个SingleOutputStreamOperator<String> result = sensorWS.aggregate(new MyAgg(),new MyProcess());result.print();env.execute();}public static class MyAgg implements AggregateFunction<WaterSensor, Integer, String> {@Overridepublic Integer createAccumulator() {System.out.println("创建累加器");return 0;}@Overridepublic Integer add(WaterSensor value, Integer accumulator) {System.out.println("调用 add 方法,value=" + value);return accumulator + value.getVc();}@Overridepublic String getResult(Integer accumulator) {System.out.println("调用 getResult 方法");return accumulator.toString();}@Overridepublic Integer merge(Integer a, Integer b) {System.out.println("调用 merge 方法");return null;}}// 全窗口函数的输入类型 = 增量聚合函数的输出类型public static class MyProcess extends ProcessWindowFunction<String, String, String, TimeWindow> {@Overridepublic void process(String s, Context context, Iterable<String> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyyMM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements);}}
}

Flink-Time

  • Event Time:事件时间,一个是数据产生的时间(时间戳Timestamp)
  • Processing time:处理时间,数据真正被处理的时间

image-20231108081425604

事件时间在实际应用中更为广泛,从Flink 1.12版本开始,Flink已经将事件时间作为默认的时间语义

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/190457.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

IDEA 关闭SpringBoot启动Logo/图标

一、环境 1、SpringBoot 2.6.4 Maven POM格式 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.6.4</version><relativePath/></parent> 2、IDE…

OpenCV:图像噪点消除与滤波算法

人工智能的学习之路非常漫长&#xff0c;不少人因为学习路线不对或者学习内容不够专业而举步难行。不过别担心&#xff0c;我为大家整理了一份600多G的学习资源&#xff0c;基本上涵盖了人工智能学习的所有内容。点击下方链接,0元进群领取学习资源,让你的学习之路更加顺畅!记得…

【hcie-cloud】【3】华为云Stack规划设计之华为云Stack交付综述【上】

文章目录 前言华为云Stack交付综述交付流程华为云Stack交付流程华为云Stack安装部署流程 交付工具链华为云Stack交付工具链eDesigner - 让解决方案销售更智能eDesigner配置页面 - 基本信息eDesigner配置页面 - 服务及组网配置eDesigner配置页面 - 弹性云服务器/ECSeDesigner配置…

带头双向循环链表

文章目录 概述初始化销毁插入删除遍历打印 概述 带头双向循环链表&#xff1a;结构最复杂&#xff0c;一般用在单独存储数据。实际中使用的链表数据结构&#xff0c;都是带头双向循环链表。另外这个结构虽然结构复杂&#xff0c;但是使用代码实现以后会发现结构会带来很多优势…

11.读取文件长度-fseek和ftell函数的使用

文章目录 简介1. 写入测试文件2. 读取文件长度 简介 主要讲使用fopen读取文件&#xff0c;配合使用fseek和ftell来读取文件长度。1. 写入测试文件 执行下方程序&#xff0c;使用fwrite函数写入40字节的数据&#xff0c;使其形成文件存入本地目录。#define _CRT_SECURE_NO_WARNI…

CCF ChinaSoft 2023 论坛巡礼 | 编译技术与编译器设计论坛

2023年CCF中国软件大会&#xff08;CCF ChinaSoft 2023&#xff09;由CCF主办&#xff0c;CCF系统软件专委会、形式化方法专委会、软件工程专委会以及复旦大学联合承办&#xff0c;将于2023年12月1-3日在上海国际会议中心举行。 本次大会主题是“智能化软件创新推动数字经济与社…

JVS低代码表单自定义按钮的使用说明和操作示例

在普通的表单设计中&#xff0c;虽然自带的【提交】、【重置】、【取消】按钮可以满足基本操作需求&#xff0c;但在面对更多复杂的业务场景时&#xff0c;这些按钮的显示控制就显得有些力不从心。为了更好地满足用户在表单操作过程中的个性化需求&#xff0c;JVS低代码推出了表…

接口测试--知识问答

1 做接口测试当请求参数多时tps下降明显&#xff0c;此接口根据参数从redis中获取数据&#xff0c;每个参数与redis交互一次&#xff0c;当一组参数是tps5133&#xff0c;五组参数是tps1169&#xff0c;多次交互影响了处理性能&#xff0c;请详细阐述如何改进增进效果的方案。 …

软件外包开发的需求表达方法

软件开发需求的有效表达对于项目的成功至关重要。无论选择哪种需求表达方法&#xff0c;清晰、详细、易于理解是关键。与开发团队建立良好的沟通渠道&#xff0c;确保他们对需求有充分的理解&#xff0c;并随着项目的推进及时调整和更新需求文档。以下是一些常用的需求表达方法…

Django下的Race Condition漏洞

目录 环境搭建 无锁无事务的竞争攻击复现 无锁有事务的竞争攻击复现 悲观锁进行防御 乐观锁进行防御 环境搭建 首先我们安装源码包&#xff1a;GitHub - phith0n/race-condition-playground: Playground for Race Condition attack 然后将源码包上传到Ubuntu 为了方便使…

【Linux】虚拟机连不上外网 (ping www.baidu.com不通)

进入linux系统&#xff0c;打开终端&#xff0c;ping www.baidu.com 发现ping不通 首先我连接的是nat模式 查看是否连接上自己本机的网 切换root用户 使用 ifconfig 命令查看是eth0 还是 ens33 vi /etc/sysconfig/network-scripts/ifcfg-ens33 BOOTPROTOstatic ONBOOTyes …

openGauss学习笔记-122 openGauss 数据库管理-设置密态等值查询-密态支持函数/存储过程

文章目录 openGauss学习笔记-122 openGauss 数据库管理-设置密态等值查询-密态支持函数/存储过程122.1 创建并执行涉及加密列的函数/存储过程 openGauss学习笔记-122 openGauss 数据库管理-设置密态等值查询-密态支持函数/存储过程 密态支持函数/存储过程当前版本只支持sql和P…

带有密码的Excel只读模式,如何取消?

Excel文件打开之后发现是只读模式&#xff0c;想要退出只读模式&#xff0c;但是只读模式是带有密码的&#xff0c;该如何取消带有密码的excel只读文件呢&#xff1f; 带有密码的只读模式&#xff0c;是设置了excel文件的修改权限&#xff0c;取消修改权限&#xff0c;我们需要…

2.7 CE修改器:多级指针查找

在本步骤中&#xff0c;你需要使用多级指针的概念来查找健康值真正的地址并修改它。多级指针就是一个指针的指针&#xff0c;也就是第一个指针指向第二个指针&#xff0c;第二个指针指向第三个指针&#xff0c;以此类推&#xff0c;最终指向你想要访问的地址。 首先&#xff0…

MAC在Linux上上传本地文件压缩包(tomcat)解决方法(炒鸡详细)

要将文件压缩包上传到Linux云服务器&#xff0c;并在服务器上解压打开&#xff0c;你可以使用以下步骤&#xff1a; 在本地的Mac上&#xff0c;将要上传的文件或文件夹压缩成一个压缩包&#xff08;如zip或tar.gz格式&#xff09;。 使用SSH连接到Linux云服务器。你可以使用Te…

【深度学习实验】网络优化与正则化(三):随机梯度下降的改进——Adam算法详解(Adam≈梯度方向优化Momentum+自适应学习率RMSprop)

文章目录 一、实验介绍二、实验环境1. 配置虚拟环境2. 库版本介绍 三、实验内容0. 导入必要的库1. 随机梯度下降SGD算法a. PyTorch中的SGD优化器b. 使用SGD优化器的前馈神经网络 2.随机梯度下降的改进方法a. 学习率调整b. 梯度估计修正 3. 梯度估计修正&#xff1a;动量法Momen…

STM32--系统滴答SysTick

一、SysTick是什么&#xff1f; Systick定时器是一个24bit的倒计时&#xff08;向下计数&#xff09;定时器&#xff0c;功能就是实现简单的延时。 SysTick 是一种系统定时器&#xff0c;通常在嵌入式系统中使用。它是 ARM Cortex-M 处理器的一个特殊定时器&#xff0c;用于提…

基于Qt 多线程(继承自QThread篇)

# 简介 我们写的一个应用程序,应用程序跑起来后一般情况下只有一个线程,但是可能也有特殊情况。比如我们前面章节写的例程都跑起来后只有一个线程,就是程序的主线程。线程内的操作都是顺序执行的。恩,顺序执行?试着想一下,我们的程序顺序执行,假设我们的用户界面点击有某…

JavaScript_动态表格_删除功能

1、动态表格_删除功能 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>动态表格_添加和删除功能</title><style>table{border: 1px solid;margin: auto;width: 100%;}td,th{text-align: …

网络渗透测试(被动扫描)

被动扫描 主要是指的是在目标无法察觉的情况下进行信息搜集。在Google上进行人名的搜素就是一次被动扫描。最经典的被动扫描技术就是"Google Hacking"技术。由于Google退出中国&#xff0c;暂时无法使用。在此介绍三个优秀的信息搜集工具 被动扫描范围 1.企业网络…