【Flink系列四】Window及Watermark

3.1、window

在 Flink 中 Window 可以将无限流切分成有限流,是处理有限流的核心组件,现在 Flink 中 Window 可以是时间驱动的(Time Window),也可以是数据驱动的(Count Window)。

Flink中的窗口可以分成:滚动窗口(Tumbling Window,无重叠),滑动窗口(Sliding Window,可能有重叠),会话窗口(Session Window,活动间隙),全局窗口(Gobal Window)

3.1.1、Tumbling Windows 滚动窗口

滚动窗口的assigner分发元素到指定大小的窗口。滚动窗口的大小是固定的,且各自范围之间不重叠。

// 滚动event-time窗口
input.keyBy(<key selector>).window(TumblingEventTimeWindows.of(Time.seconds(5))).<windowed transformation>(<window function>);// 滚动processing-time窗口
input.keyBy(<key selector>).window(TumblingProcessingTimeWindows.of(Time.second(5))).<windowed transformation>(<window function>);// 长度为一天的滚动event-time窗口, 偏移量为-8小时
input.keyBy(<key selector>).window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))).<windowed transformation>(<window function>);

如上一个例子所示,滚动窗口的 assigners 也可以传入可选的 offset 参数。这个参数可以用来对齐窗口。 比如说,不设置 offset 时,长度为一小时的滚动窗口会与 linux 的 epoch 对齐。 你会得到如 1:00:00.000 - 1:59:59.999、2:00:00.000 - 2:59:59.999 等。 如果你想改变对齐方式,你可以设置一个 offset。如果设置了 15 分钟的 offset, 你会得到 1:15:00.000 - 2:14:59.999、2:15:00.000 - 3:14:59.999 等。 一个重要的 offset 用例是根据 UTC-0 调整窗口的时差。比如说,在中国你可能会设置 offset 为 Time.hours(-8)。

3.1.2、Sliding Windows滑动窗口

滑动窗口的assigner 分发元素到指定大小的窗口,窗口大小通过 window size 参数设置。 滑动窗口需要一个额外的滑动距离(滑动步长window slide)参数来控制生成新窗口的频率。 因此,如果 slide 小于窗口大小,滑动窗口可以允许窗口重叠。这种情况下,一个元素可能会被分发到多个窗口。

// 滑动 event-time 窗口
input.keyBy(<key selector>).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).<windowed transformation>(<window function>);// 滑动 processing-time 窗口
input.keyBy(<key selector>).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).<windowed transformation>(<window function>);// 滑动 processing-time 窗口,偏移量为 -8 小时
input.keyBy(<key selector>).window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))).<windowed transformation>(<window function>);

3.1.3、Session Windows 会话窗口

会话窗口的 assigner 会把数据按活跃的会话分组。 与滚动窗口和滑动窗口不同,会话窗口不会相互重叠,且没有固定的开始或结束时间。 会话窗口在一段时间没有收到数据之后会关闭,即在一段不活跃的间隔之后。 会话窗口的 assigner 可以设置固定的会话间隔(session gap)或 用 session gap extractor 函数来动态地定义多长时间算作不活跃。 当超出了不活跃的时间段,当前的会话就会关闭,并且将接下来的数据分发到新的会话窗口。

// 设置了固定间隔的event-time会话窗口
input.keyBy(<key selector>).window(EventTimeSessionWindows.withGap(Time.minutes(10))).<windowed transformation>(<window function>);// 设置了动态间隔的event-time会话窗口
input.keyBy(<key selector>).window(EventTimeSessionWindows.withDynamicGap((element)-> {// 决定并返回会话间隔})).<windowed transformation>(<window function>);// 设置了固定间隔的 processing-time session 窗口
input.keyBy(<key selector>).window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))).<windowed transformation>(<window function>);// 设置了动态间隔的 processing-time 会话窗口
input.keyBy(<key selector>).window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {// 决定并返回会话间隔}))

3.1.4、Global Windows 全局窗口

全局窗口的 assigner 将拥有相同 key 的所有数据分发到一个全局窗口。 这样的窗口模式仅在你指定了自定义的 trigger 时有用。 否则,计算不会发生,因为全局窗口没有天然的终点去触发其中积累的数据。

input.keyBy(<key selector>).window(GlobalWindows.create()).<windowed transformation>(<window function>);
3.1.5、Triggers窗口触发

Trigger决定了一个窗口(由window assigner定义)何时可以被window function处理。一般来说,watermark的时间戳>=window endTime并且在窗口内有数据,就会触发窗口的计算。每个WindowAssigner都有一个默认的Trigger。如果默认trigger无法满足需求,可以在trigger(...)调用中指定自定义的trigger。

  • onElement() 每次往 window 增加一个元素的时候都会触发
  • onEventTime() 当 event-time timer 被触发的时候会调用
  • onProcessingTime() 当 processing-time timer 被触发的时候会调用
  • onMerge() 对两个 trigger 的 state 进行 merge 操作
  • clear() window 销毁的时候被调用

上面的接口中前三个会返回一个 TriggerResult,TriggerResult 有如下几种可能的选择:

  • CONTINUE 不做任何事情
  • FIRE 触发 window
  • PURGE 清空整个 window 的元素并销毁窗口
  • FIRE_AND_PURGE 触发窗口,然后销毁窗口

3.2、time和watermark

3.2.1、time

在 Flink 中 Time 可以分为三种Event-Time,Processing-Time 以及 Ingestion-Time,三者的关系我们可以从下图中得知:

3.2.2、watermark

Flink提出了watermark,专门处理EventTime窗口计算,其本质其实就是一个时间戳。因为对于迟到数据late element,不可能一直无限期等待,必须有一个机制来保证一个特定的时间后,必须取触发window去进行计算,这种机制就是watermark

watermark本质上也是一种时间戳,由Apache Flink Source或者自定义的Watermark生成器按照需求Punctuated或者Periodic两种方式生成的一种系统Event,与普通数据流Event一样流转到对应的下游算子,接收到Watermark Event的算子以此不断调整自己管理的EventTime clock。 Apache Flink 框架保证Watermark单调递增,算子接收到一个Watermark时候,框架知道不会再有任何小于该Watermark的时间戳的数据元素到来了,所以Watermark可以看做是告诉Apache Flink框架数据流已经处理到什么位置(时间维度)的方式。 Watermark的产生和Apache Flink内部处理逻辑如下图所示: 

目前Apache Flink 有两种生产Watermark的方式,如下:

  • Punctuated - 数据流中每一个递增的EventTime都会产生一个Watermark。 在实际的生产中Punctuated方式在TPS很高的场景下会产生大量的Watermark在一定程度上对下游算子造成压力,所以只有在实时性要求非常高的场景才会选择Punctuated的方式进行Watermark的生成。
  • Periodic - 周期性的(一定时间间隔或者达到一定的记录条数)产生一个Watermark。在实际的生产中Periodic的方式必须结合时间和积累条数两个维度继续周期性产生Watermark,否则在极端情况下会有很大的延时。

参阅:Apache Flink 漫谈系列(03) - Watermark-阿里云开发者社区

我们可以考虑一个这样的例子:某 App 会记录用户的所有点击行为,并回传日志(在网络不好的情况下,先保存在本地,延后回传)。A 用户在 11:02 对 App 进行操作,B 用户在 11:03 操作了 App,但是 A 用户的网络不太稳定,回传日志延迟了,导致我们在服务端先接受到 B 用户 11:03 的消息,然后再接受到 A 用户 11:02 的消息,消息乱序了。那我们怎么保证基于 event-time 的窗口在销毁的时候,已经处理完了所有的数据呢?这就是 watermark 的功能所在。watermark 会携带一个单调递增的时间戳 t,watermark(t) 表示所有时间戳不大于 t 的数据都已经到来了,未来小于等于t的数据不会再来,因此可以放心地触发和销毁窗口了。下图中给了一个乱序数据流中的 watermark 例子

3.2.3、迟到的数据

上面的 watermark 让我们能够应对乱序的数据,但是真实世界中我们没法得到一个完美的 watermark 数值 — 要么没法获取到,要么耗费太大,因此实际工作中我们会使用近似 watermark — 生成 watermark(t) 之后,还有较小的概率接受到时间戳 t 之前的数据,在 Flink 中将这些数据定义为 “late elements”, 同样我们可以在 window 中指定是允许延迟的最大时间(默认为 0),可以使用下面的代码进行设置

设置allowedLateness之后,迟来的数据同样可以触发窗口,进行输出,利用 Flink 的 side output 机制,我们可以获取到这些迟到的数据,使用方式如下:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/212563.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ELK(四)—els基本操作

目录 elasticsearch基本概念RESTful API创建非结构化索引&#xff08;增&#xff09;创建空索引&#xff08;删&#xff09;删除索引&#xff08;改&#xff09;插入数据&#xff08;改&#xff09;数据更新&#xff08;查&#xff09;搜索数据&#xff08;id&#xff09;&…

倚天屠龙:Github Copilot vs Cursor

武林至尊&#xff0c;宝刀屠龙。号令天下&#xff0c;莫敢不从。倚天不出&#xff0c;谁与争锋&#xff01; 作为开发人员吃饭的家伙&#xff0c;一款好的开发工具对开发人员的帮助是无法估量的。还记得在学校读书的时候&#xff0c;当时流行CS架构的RAD&#xff0c;Delphi和V…

xcode swiftui项目添加依赖

打开项目targets——Build Phases 点击“” 属于Apple SDKs的依赖可以直接添加 其他依赖需要在 Add Other中添加&#xff0c;在右上角用名字搜索或者URL地址(如GitHub上插件的地址)搜索,然后添加&#xff0c;也可添加本地文件

USB Type-C一拖二线缆制作方法

1 实现方法 Figure 1-1 Type-C Socket(母口) Figure 1-2 Type-C Plug(公头) Table 1-1 Type-C Socket Pin连接描述 Type-C Plug连接&#xff0c; 需要做一个一拖二的线&#xff0c;一根的一端是USB&#xff0c; 另外一根的一端是USB转UART&#xff0c; 参考Table 1-2。 Table 1…

css 修改滚动条样式,解决Windows浏览器中滚动条不美观问题

Windows环境中的浏览器中滚动条默认是直接显示了&#xff0c;不管光标是否进入该区域&#xff0c;这样就很不美观&#xff0c;如下图&#xff1a; 之前样式为 .well {display: block;background-color: #f2f2f2;border: 1px solid #ccc;margin: 5px;width: calc(100% - 12px);h…

spark sql基于RBO的优化

前言 这里只对RBO优化进行简单的讲解。讲解RBO之前必须对spark sql的执行计划做一个简单的介绍。 这个里讲解的不是很清楚&#xff0c;需要结合具体的执行计划来进行查看 1、执行计划 在spark sql的执行计划中&#xff0c;执行计划分为两大类&#xff0c;即逻辑执行计划、物…

uniapp 打开文件管理器上传(H5、微信小程序、android app三端)文件

H5跟安卓APP 手机打开的效果图&#xff1a; Vue页面&#xff1a; <template><view class"content"><button click"uploadFiles">点击上传</button></view> </template><script>export default {data() {return…

云原生之深入解析Kubernetes策略引擎对比:OPA/Gatekeeper与Kyverno

一、前言 ① Kubernetes 策略 Kubernetes 的 Pod Security Policy&#xff0c;正如其名字所暗示的&#xff0c;仅是针对 Pod 工作的&#xff0c;是一种用来验证和控制 Pod 及其属性的机制。另外 PSP 只能屏蔽非法 Pod 的创建&#xff0c;无法执行任何补救/纠正措施。而 Gatek…

http与apache

目录 1.http相关概念 2.http请求的完整过程 3.访问浏览器背后的原理过程 4.动态页面与静态页面区别 静态页面&#xff1a; 动态页面&#xff1a; 5.http协议版本 6.http请求方法 7.HTTP协议报文格式 8.http响应状态码 1xx&#xff1a;提示信息 2xx&#xff1a;成功…

【C++11并发】Atomic 笔记

简介 用atomic定义的变量&#xff0c;支持原子操作&#xff0c;即要么全部完成操作&#xff0c;要不全部没有完成&#xff0c;我们是不可能看到中间状态。一般在多线程程序中&#xff0c;可以用atomic来完成数据同步。 标准库为我们主要提供了四类工具 atomic类模板操作atomi…

tomcat启动 页面报错404(3种解决方案)

1.端口号被占用 修改配置文件下的端口号&#xff1a;\apache-tomcat-8.5.57\conf\server.xml 我这里修改端口号为&#xff1a;9999 修改后页面输入&#xff1a;http://localhost:9999/ 2.没有配置环境变量 配置环境变量请查看&#xff1a;保姆级教程 3.查看下 apache-tomca…

SpringBoot Seata 死锁问题排查

现象描述&#xff1a;Spring Boot项目&#xff0c;启动的时候卡住了&#xff0c;一直卡在那里不动&#xff0c;没有报错&#xff0c;也没有日志输出 但是&#xff0c;奇怪的是&#xff0c;本地可以正常启动 好吧&#xff0c;姑且先不深究为什么本地可以启动而部署到服务器上就无…

【Flink系列五】Checkpoint及Barrier原理

本章内容 一致性检查点从检查点恢复状态检查点实现算法-barrier保存点Savepoint状态后端&#xff08;state backend&#xff09; 本文先设置一个前提&#xff0c;流处理的数据都是可回放的&#xff08;可以理解成消费的kafka的数据&#xff09; 一致性检查点&#xff08;che…

单片机第三季-第四课:STM32下载、MDK和调试器

目录 1&#xff0c;扩展板使用的STM32芯片类型 2&#xff0c;使用普中科技软件下载程序 3&#xff0c;keil介绍 4&#xff0c;JLINK调试器介绍 5&#xff0c;使用普中的调试器进行debug 6&#xff0c;使用Simulator仿真 1&#xff0c;扩展板使用的STM32芯片类型 扩展版…

如何用docker在自己服务器上部署springboot项目

一、将springboot项目打包 1、maven clean项目 2、maven package项目 打包成功之后生成jar文件&#xff08;在target目录下&#xff09; 3、为Java创建Dockerfile 引入jdk8的Docker镜像 FROM openjdk:8 为了使运行其余命令时更容易&#xff0c;让我们设置映像的工作目录。这将…

1、初识 llvm源码编译 及virtualbox和ubuntu环境搭建

很久没更新了&#xff0c;最近准备研究逆向和加固&#xff0c;于是跟着看雪hanbing老师学习彻底搞懂ollvm&#xff0c;终于把所有流程跑通了&#xff0c;中间遇到了太多的坑&#xff0c;所以必须记录一下&#xff0c;能避免自己和帮助他人最好。 环境搭建太重要了&#xff0c;…

26、pytest使用allure解读

官方实例 # content of pytest_quick_start_test.py import allurepytestmark [allure.epic("My first epic"), allure.feature("Quick start feature")]allure.id(1) allure.story("Simple story") allure.title("test_allure_simple_te…

LabVIEW使用单板RIO开发远程监控电源信号

LabVIEW使用单板RIO开发远程监控电源信号 设计和构建用于智能电网的本地功耗分析系统&#xff0c;主要服务于领先的电力监控设备设计者和制造商。随着智能电网投资的增加&#xff0c;对于能够有效处理替代电源&#xff08;如太阳能和风能&#xff09;间歇性功率水平的技术需求…

vue 提交表单重复点击,重复提交防抖问题

问题&#xff1a;用户点击保存时&#xff0c;可能会多次点击。导致生成重复数据。 目标&#xff1a;多次点击时&#xff0c;1s内只允许提交一次数据。 解决方案&#xff1a; 1.在utils文件夹创建文件preventReClick.js export default {install (Vue) {// 防止按钮重复点击V…

lv11 嵌入式开发 IIC(上) 19

目录 1 IIC总线简介 1.1 串行、半双工&#xff08;同时只能1收或者1发&#xff09; 1.2 IIC总线通信过程 2 IIC总线信号实现 2.1 IIC总线寻址方式 2.2 起始信号和停止信号 2.3 字节传送与应答 2.4 同步信号 2.5 典型IIC时序 3 练习 1 IIC总线简介 1.1 串行、半双工&a…