【Flink】Flink 中的时间和窗口之窗口其他API的使用

1. 窗口的其他API简介

对于一个窗口算子而言,窗口分配器和窗口函数是必不可少的。除此之外,Flink 还提供了其他一些可选的 API,可以更加灵活地控制窗口行为。

1.1 触发器(Trigger)

触发器主要是用来控制窗口什么时候触发计算。所谓的"触发计算"本质上就是执行窗口函数,所以可以认为是计算得到结果并输出的过程。

基于WindowsStream调用.trigger()方法,参数传入一个自定义的窗口触发器(Trigger)。

stream.keyBy(...).window(...).trigger(new MyTrigger())

Trigger 是窗口算子的内部属性,每个窗口分配器(WindowAssigner)都会对应一个默认的触发器;对于 Flink 内置的窗口类型,它们的触发器都已经做了实现。例如,所有事件时间窗口,默认的触发器都是 EventTimeTrigger;类似还有 ProcessingTimeTriggerCountTrigger。所以一般情况下是不需要自定义触发器的。

Trigger是一个抽象类,自定义时必须实现以下四个抽象方法:

  • onElement():窗口中每到来一个元素,都会调用这个方法。
  • onEventTime():当注册的事件时间定时触发时,将调用这个方法。
  • onProcessingTime ():当注册的处理时间定时器触发时,将调用这个方法。
  • clear():当窗口关闭销毁时,调用这个方法。一般用来清除自定义的状态。

除了 clear()比较像生命周期方法,其他三个方法其实都是对某种事件的响应。onElement()是对流中数据元素到来的响应;而另两个则是对时间的响应。这些方法参数中都有一个“触发器上下文”(TriggerContext)对象,可以用来注册定时器回调(callback)。对于时间窗口(TimeWindow)而言,就是在窗口的结束时间设定了一个定时器,这样到时间就可以触发窗口的计算输出了。

另外这三个方法的返回类型都是 TriggerResult,这是一个枚举类型(enum),其中定义了对窗口进行操作的四种类型

  • CONTINUE(继续):什么都不做
  • FIRE(触发):触发计算,输出结果
  • PURGE(清除):清空窗口中的所有数据,销毁窗口
  • FIRE_AND_PURGE(触发并清除):触发计算输出结果,并清除窗口

Trigger 除了可以控制触发计算,还可以定义窗口什么时候关闭(销毁)。并且TriggerResult的返回结果可以让计算输出结果和关闭窗口分开执行。

示例:
在日常业务场景中,我们经常会开比较大的窗口来计算每个窗口的pv 或者 uv 等数据。但窗口开的太大,会使我们看到计算结果的时间间隔变长。所以我们可以使用触发器,来隔一段时间触发一次窗口计算。我们在代码中计算了每个 url 在 10 秒滚动窗口的 pv 指标,然后设置了触发器,每隔 1 秒钟触发一次窗口的计算。

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 创建自定义数据源的实时流DataStream<Event> stringDataStreamSource = env.addSource(new CustomSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.getTimestamp();}}));stringDataStreamSource.keyBy(Event::getUrl).window(TumblingEventTimeWindows.of(Time.seconds(10))).trigger(new MyTrigger()).process(new WindowResult()).print();stringDataStreamSource.print("data");env.execute();}public static class WindowResult extends ProcessWindowFunction<Event, UrlViewCount, String, TimeWindow> {@Overridepublic void process(String s, ProcessWindowFunction<Event, UrlViewCount, String, TimeWindow>.Context context, Iterable<Event> elements, Collector<UrlViewCount> out) throws Exception {out.collect(new UrlViewCount(s, elements.spliterator().getExactSizeIfKnown(), context.window().getStart(), context.window().getEnd()));}}private static class UrlViewCount {private String s;private long size;private long start;private long end;public UrlViewCount(String s, long size, long start, long end) {this.s = s;this.size = size;this.start = start;this.end = end;}@Overridepublic String toString() {return "UrlViewCount{" +"s='" + s + '\'' +", size=" + size +", start='" + start + '\'' +", end='" + end + '\'' +'}';}}public static class MyTrigger extends Trigger<Event, TimeWindow> {@Overridepublic TriggerResult onElement(Event element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {ValueState<Boolean> isFirstEvent = ctx.getPartitionedState(new ValueStateDescriptor<Boolean>("first-event", Types.BOOLEAN));if (isFirstEvent.value() == null) {for (long i = window.getStart(); i < window.getEnd() ; i = i + 1000L) {ctx.registerEventTimeTimer(i);}isFirstEvent.update(true);}return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {return TriggerResult.FIRE;}@Overridepublic void clear(TimeWindow window, TriggerContext ctx) throws Exception {ValueState<Boolean> isFirstEvent = ctx.getPartitionedState(new ValueStateDescriptor<Boolean>("first-event", Types.BOOLEAN));isFirstEvent.clear();}}

输出结果:
在这里插入图片描述

1.2 移除器(Evictor)

移除器主要用来定义移除某些数据的逻辑。基于 WindowedStream 调用.evictor()方法,就可以传入一个自定义的移除器(Evictor)Evictor 是一个接口,不同的窗口类型都有各自预实现的移除器。

Evictor 接口定义了两个方法:

  • evictBefore():定义执行窗口函数之前的移除数据操作
  • evictAfter():定义执行窗口函数之后的以处数据操作

默认情况下,预实现的移除器都是在执行窗口函数(window fucntions)之前移除数据的。

1.3 允许延迟(Allowed Lateness)

在事件时间语义下,窗口会出现迟到的数据。之所以出现迟到数据是因为在乱序流中,水位线并一定能保证时间戳更早的所有数据不会再出现,当水位线已经到达窗口的结束时间时,窗口触发计算并输出结果,这时一般就要销毁窗口了;如果还有本该属于这个窗口的数据到达,默认情况下会被丢弃。

大多数情况下直接丢弃数据会导致统计结果不准,为了解决迟到数据的问题,Flink提供了一个特殊接口,可以为窗口算子设置一个"允许最大延迟"(Allowed Lateness)。也就是说,我们可以设定允许延迟一段时间,在这段时间内,窗口不会销毁,继续到来的数据依然可以进入窗口中并触发计算。直到水位线推进到了 窗口结束时间 + 延迟时间,才真正将窗口的内容清空,正式关闭窗口。

基于 WindowedStream 调用.allowedLateness()方法,传入一个Time类型的延迟时间,就可以表示允许这段时间内的延迟数据。

stream.keyBy(...).window(TumblingEventTimeWindows.of(Time.hours(1))).allowedLateness(Time.minutes(1))

比如上面的代码中,我们定义了 1 小时的滚动窗口,并设置了允许 1 分钟的延迟数据。也就是说,在不考虑水位线延迟的情况下,对于 8 点~9 点的窗口,本来应该是水位线到达 9 点整触发计算并关闭窗口;现在允许延迟 1 分钟,那么 9 点整就只是触发一次计算并输出结果,并不会关窗。后续到达的数据,只要属于 8 点~9 点窗口,依然可以在之前统计的基础上继续叠加,并且再次输出一个更新后的结果。直到水位线到达了 9 点零 1 分,这时就真正清空状态、关闭窗口,之后再来的迟到数据就会被丢弃了。

从这里可以看到,窗口的触发计算(Fire)清除(Purge)操作确实可以分开。不过在默认情况下,允许的延迟是 0,这样一旦水位线到达了窗口结束时间就会触发计算并清除窗口,两个操作看起来就是同时发生了。当窗口被清除(关闭)之后,再来的数据就会被丢弃。

1.4 将迟到的数据放入侧输出流

对于处理迟到数据,仅仅提供延迟时间还是会出现迟到的数据,所以Flink提供了另外一种方式处理迟到数据。可以将迟到数据放到侧输出流(side output)进行另外的处理。所谓的侧输出流,相当于是数据流的一个“分支”,这个流中单独放置那些错过了该上的车、本该被丢弃的数据。

基于 WindowedStream 调用.sideOutputLateData() 方法,就可以实现这个功能。方法需要传入一个“输出标签”(OutputTag),用来标记分支的迟到数据流。因为保存的就是流中的原始数据,所以OutputTag的类型与流中数据类型相同。

DataStream<Event> stream = env.addSource(...);
OutputTag<Event> outputTag = new OutputTag<Event>("late") {};
stream.keyBy(...).window(TumblingEventTimeWindows.of(Time.hours(1)))
.sideOutputLateData(outputTag)

将迟到数据放入侧输出流之后,还可以将它提取出来。基于窗口处理完成之后的DataStream,调用.getSideOutput()方法,传入对应的输出标签,就可以获取到迟到数据所在的流了。

SingleOutputStreamOperator<AggResult> winAggStream = stream.keyBy(...).window(TumblingEventTimeWindows.of(Time.hours(1)))
.sideOutputLateData(outputTag)
.aggregate(new MyAggregateFunction())
DataStream<Event> lateStream = winAggStream.getSideOutput(outputTag);

这里注意,getSideOutput()SingleOutputStreamOperator的方法,获取到的侧输出流数据
类型应该和 OutputTag 指定的类型一致,与窗口聚合之后流中的数据类型可以不同。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/283410.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

微服务高级篇(三):分布式缓存+Redis集群

文章目录 一、单点Redis的问题及解决方案二、Redis持久化2.1 单机安装Redis2.2 RDB持久化2.3 AOF持久化2.4 RDB和AOF对比 三、Redis主从3.1 搭建Redis主从架构3.1.1 集群结构3.1.2 准备实例和配置3.1.3 启动3.1.4 开启主从关系3.1.5 测试 3.2 数据同步3.2.1 全量同步【建立连接…

idea import的maven类报红

idea 报红/显示红色的原因 一般报红&#xff0c;显示红色&#xff0c;是因为 idea 在此路径下&#xff0c;找不到这个类。 找到是哪个 jar 包的类导致 idea 报红 点击报红的路径的上一层&#xff0c;进入jar 包。比如&#xff1a; import com.aaa.bbb.ccc.DddDto;这个 impo…

<c语言学习>结构体

结构体类型 为什么要有结构体 我们用c语言描述年龄时候&#xff0c;可以定义一个整形类型来实现&#xff1a; int age; age 18; printf("年龄为%d",age); (c语言描述年龄) 由于年龄这一属性比较单一&#xff0c;类似性别、某游戏角色攻击力、血量都可以用c语言内置…

亚马逊AWS展示高效纠错的全新量子比特!

亚马逊网络服务公司&#xff08;AWS&#xff09;在量子计算的纠错技术领域取得了显著成就&#xff0c;极大地简化了量子系统的复杂性和资源需求。他们的研究人员通过采用“双轨擦除”量子比特&#xff08;dual-rail erasure qubit&#xff09;技术&#xff0c;有效地克服了量子…

Flink入门知识点汇总(二)

具体内容请看b站尚硅谷课程&#xff01; 32_Flink运行时架构_提交流程_Yarn应用模式_哔哩哔哩_bilibili 窗口 Flink的窗口并不是静态准备好的&#xff0c;而是动态创建的。数据流到达时不会准备24个或者其他完整数量的桶&#xff0c;而是当下桶接满了&#xff0c;才临时又拿新…

(C语言)浮点数在内存中的存储详解

1. 浮点数 常见的浮点数&#xff1a;3.14159、 1E10等 &#xff0c;浮点数家族包括&#xff1a; float、double、long double 类型。 浮点数表示的范围&#xff1a; float.h 中定义. 2. 浮点数的存储 我们先来看一串代码&#xff1a; int main() {int n 9;float* pFloa…

分页多线程处理大批量数据

1.业务场景 因为需要从一个返利明细表中获取大量的数据&#xff0c;生成返利报告&#xff0c;耗时相对较久&#xff0c;作为后台任务执行。但是后台任务如果不用多线程处理&#xff0c;也会要很长时间才能处理完。 另外考虑到数据量大&#xff0c;不能一次查询所有数据在内存…

分布式Raft原理详解,从不同角色视角分析相关状态

分布式Raft原理详解&#xff0c;从不同角色视角分析相关状态 1. CAP定理2.Raft 要解决的问题3. Raft的核心逻辑3.1. Raft的核心逻辑2.1. 复制状态机2.2. 任期 Term2.3. 任期的意义&#xff1a;逻辑时钟2.4 选举定时器 3. Leader选举逻辑4. 从节点视角查看Leader选举4.1. Follow…

qt+ffmpeg 实现音视频播放(三)之视频播放

一、视频播放流程 &#xff08;PS&#xff1a;视频的播放流程跟音频的及其相似&#xff01;&#xff01;&#xff09; 1、打开视频文件 通过 avformat_open_input() 打开媒体文件并分配和初始化 AVFormatContext 结构体。 函数原型如下&#xff1a; int avformat_open_inpu…

ideaSSM 工程车辆人员管理系统bootstrap开发mysql数据库web结构java编程计算机网页源码maven项目

一、源码特点 idea 开发 SSM 工程车辆人员管理系统是一套完善的信息管理系统&#xff0c;结合SSM框架和bootstrap完成本系统&#xff0c;对理解JSP java编程开发语言有帮助系统采用SSM框架&#xff08;MVC模式开发&#xff09;&#xff0c;系统具 有完整的源代码和数据库&…

[AIGC] 在Spring Boot中指定请求体格式

在使用Spring Boot开发Web应用的时候&#xff0c;我们经常会遇到需要接收并处理HTTP请求的情况。一个HTTP请求通常包括一个请求行、若干请求头和一个请求体。请求体在POST和PUT请求中特别重要&#xff0c;因为它通常用于向服务器传递数据。 文章目录 创建并使用一个Java Bean指…

计算机二级Python基础操作题

题目来源&#xff1a;计算机二级Python半个月抱佛脚大法&#xff08;内呈上真题版&#xff09; - 知乎 第4&#xff0c;5&#xff0c;6&#xff0c;7&#xff0c;9&#xff0c;10&#xff0c;11套 1. 基础题1 sinput() print("{:\"^30x}".format(eval(s))) b …

【S5PV210】 | GPIO编程

【S5PV210】 | GPIO编程 时间:2024年3月17日22:02:32 目录 文章目录 【`S5PV210`】 | `GPIO`编程目录1.参考2.`DataSheet`2.1.概述2.1.1.特色2.1.2 输入/输出配置2.1.3 `S5PV210` 输入/输出类型2.1.4 IO驱动强度**2.1.4.1 类型A IO驱动强度****2.1.4.2 类型A IO驱动强度****2…

Selenium不同版本配置自动下载驱动及打包细节

Selenium配置浏览器驱动 自动下载浏览器驱动的方法 selenium4.7.0自动下载浏览器驱动的方法 selenium4.11.0 或4.11.1手动设置浏览器驱动路径的方法pyinstaller打包程序时同时打包ChromeDriverchromedriver路径需要sys._MEIPASS的路径进行引用方法一&#xff1a;通过–add-data…

LiveGBS流媒体平台GB/T28181功能-HTTPS 服务支持配置开启什么时候需要开启HTTPS测试SSL证书配置HTTPS测试证书

LiveGBS功能支持HTTPS 服务支持配置开启什么时候需要开启HTTPS测试SSL证书配置HTTPS测试证书 1、配置开启HTTPS1.1、准备https证书1.1.1、选择Nginx类型证书下载 1.2、配置 LiveCMS 开启 HTTPS1.2.1 web页面配置1.2.2 配置文件配置 2、HTTPS测试证书3、验证HTTPS服务4、为什么要…

安防监控视频汇聚平台EasyCVR接入海康Ehome设备,设备在线但视频无法播放是什么原因?

安防视频监控/视频集中存储/云存储/磁盘阵列EasyCVR平台可拓展性强、视频能力灵活、部署轻快&#xff0c;可支持的主流标准协议有国标GB28181、RTSP/Onvif、RTMP等&#xff0c;以及支持厂家私有协议与SDK接入&#xff0c;包括海康Ehome、海大宇等设备的SDK等。平台既具备传统安…

python5:基于多进程的并发编程、基于协程的并发编程的学习笔记

进程 为什么要使用多进程&#xff1f;——GIL的存在&#xff0c;多线程实际不是并发执行 将任务分为两类&#xff1a;IO密集型&#xff08;多线程&#xff09;CPU密集型&#xff08;多进程&#xff09; 多进程的基本用法 concurrent.futures.process.ProcessPoolExecutor#进…

Airgorah:一款功能强大的WiFi安全审计工具

关于Airgorah Airgorah是一款功能强大的WiFi安全审计工具&#xff0c;该工具可以轻松发现和识别连接到无线接入点的客户端&#xff0c;并对特定的客户端执行身份验证攻击测试&#xff0c;捕捉WPA握手包&#xff0c;并尝试破解接入点的密码。在该工具的帮助下&#xff0c;广大研…

C语言联合体和枚举

前言 这篇博客就把剩下的两个自定义类型联合体和枚举好好总结一下&#xff0c;让我们好好看看联合体和枚举到底是什么 个人主页&#xff1a;小张同学zkf 若有问题 评论区见 感兴趣就关注一下吧 目录 1. 联合体 1.1 联合体类型的声明 1.2 联合体的特点 1.3 相同成员的结构体和联…

分类预测 | Matlab实现PSO-KELM粒子群优化算法优化核极限学习机分类预测

分类预测 | Matlab实现PSO-KELM粒子群优化算法优化核极限学习机分类预测 目录 分类预测 | Matlab实现PSO-KELM粒子群优化算法优化核极限学习机分类预测分类效果基本描述程序设计参考资料 分类效果 基本描述 1.MATLAB实现PSO-KELM粒子群优化算法优化核极限学习机分类预测(完整源…