Flink时间语义和时间窗口

前言

在实际的流计算业务场景中,我们会发现,数据和数据的计算往往都和时间具有相关性。

举几个例子:

  • 直播间右上角通常会显示观看直播的人数,并且这个数字每隔一段时间就会更新一次,比如10秒。
  • 电商平台的商品列表,会显示商品过去24小时的销量、或者总销量
  • 阅读CSDN博客会显示总的阅读量,并且会持续更新

归纳总结可以发现,这些和时间相关的数据计算可以统一用一个计算模型来描述:每隔一段时间,计算过去一段时间内的数据,并输出结果。这个计算模型,就是时间窗口。

时间窗口类型

时间窗口计算模型具备三个重要的属性:

  • 时间窗口的计算频次,即 隔多久计算一次
  • 时间窗口的大小,即 计算过去多久的数据
  • 时间窗口内数据的处理逻辑

举例来说,每隔1分钟计算商品过去24小时的销量。时间窗口的计算频次就是1分钟,时间窗口的大小是24小时,窗口数据的处理逻辑是 对商品销量求和。

Flink 提供了三种时间窗口的类型

滚动窗口(Tumble Window)

滚动窗口的特点是:时间窗口大小和计算频次相同!

顾名思义,滚动窗口就像一个车轮一样滚滚向前,因为窗口大小和计算频次相同,所以窗口是紧密相连的,窗口内的数据不会重复计算。

举个例子,每隔1分钟计算商品过去1分钟的销量。

如下示例程序,每隔5秒计算过去5秒的订单销售额:

public class TumblingWindow {public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.addSource(new SourceFunction<Order>() {@Overridepublic void run(SourceContext<Order> sourceContext) throws Exception {while (true) {Threads.sleep(1000);Order order = Order.mock();sourceContext.collectWithTimestamp(order, order.createTime);sourceContext.emitWatermark(new Watermark(System.currentTimeMillis()));}}@Overridepublic void cancel() {}}).keyBy(i -> i.itemId).window(TumblingEventTimeWindows.of(Duration.ofSeconds(5L))).sum("orderAmount").print();environment.execute();}@Data@NoArgsConstructor@AllArgsConstructorpublic static class Order {public String itemId;public long orderAmount;public long createTime;static Order mock() {return new Order("001", ThreadLocalRandom.current().nextLong(100), System.currentTimeMillis());}}
}

这里采用滚动窗口计算模型,窗口大小和计算频次均是5秒,运行作业后,控制台会每隔5秒输出一次总销售额

1> TumblingWindow.Order(itemId=001, orderAmount=250, createTime=1722344630342)
1> TumblingWindow.Order(itemId=001, orderAmount=270, createTime=1722344635388)
1> TumblingWindow.Order(itemId=001, orderAmount=147, createTime=1722344640407)
1> TumblingWindow.Order(itemId=001, orderAmount=253, createTime=1722344645430)
......

滑动窗口(Sliding Window)

滑动窗口的特点是:时间窗口大小和计算频次不相同,如果窗口大小大于计算频次,就会导致数据被重复计算;如果窗口大小小于计算频次,就会导致数据被漏计算;如果二者相等,那就是滚动窗口了。

举个例子,每隔1分钟计算商品过去1小时的销量。窗口大小为1小时,计算频次为1分钟,因此数据会被重复计算多次。

如下示例程序,每隔1秒计算过去5秒的订单销售额,部分订单会被重复计算多次:

public class SlidingWindow {public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.addSource(new SourceFunction<TumblingWindow.Order>() {@Overridepublic void run(SourceContext<TumblingWindow.Order> sourceContext) throws Exception {while (true) {Threads.sleep(1000);TumblingWindow.Order order = TumblingWindow.Order.mock();sourceContext.collectWithTimestamp(order, order.createTime);sourceContext.emitWatermark(new Watermark(System.currentTimeMillis()));}}@Overridepublic void cancel() {}}).keyBy(i -> i.itemId).window(SlidingEventTimeWindows.of(Duration.ofSeconds(5L), Duration.ofSeconds(1L))).sum("orderAmount").print();environment.execute();}
}

作业运行后,控制台每秒会输出一次过去5秒的销售额。

会话窗口(Session Window)

会话窗口的窗口大小和计算频次非常灵活,可以动态改变,每次都不一样。当窗口隔一段时间没有接收到新的数据,Flink就认为会话可以关闭并计算了,等下一次有新的数据进来,就会开启一个新的会话。这里的“隔一段时间”就是值会话窗口的间隔(Gap),这个间隔可以固定设置也可以动态设置。

举个例子,读书类APP都会有的一个功能,就是统计用户的阅读时长。用户必须有持续的动作,APP才会认为用户是真的在阅读,反之用户长时间没有操作,APP会认为用户已经离开,此时不会再统计阅读时长。

如下示例,随机5秒内模拟一次用户行为,会话窗口间隔设置为3秒,超过3秒认为用户离开,关闭窗口并统计用户阅读时长。

public class SessionWindow {public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.addSource(new SourceFunction<UserAction>() {@Overridepublic void run(SourceContext<UserAction> ctx) throws Exception {while (true) {UserAction userAction = UserAction.mock();ctx.collectWithTimestamp(userAction, userAction.time);ctx.emitWatermark(new Watermark(System.currentTimeMillis()));// 随机5秒内 用户才会有新的操作Threads.sleep(ThreadLocalRandom.current().nextLong(0L, 5000L));}}@Overridepublic void cancel() {}})// 超过三秒没有收到用户新的动作,认为用户离开,关闭窗口并计算.windowAll(EventTimeSessionWindows.withGap(Duration.ofSeconds(3L))).aggregate(new AggregateFunction<UserAction, UserReadingTime, UserReadingTime>() {@Overridepublic UserReadingTime createAccumulator() {return new UserReadingTime();}@Overridepublic UserReadingTime add(UserAction userAction, UserReadingTime userReadingTime) {// 记录窗口内的用户阅读开始和结束时间userReadingTime.userId = userAction.userId;if (userReadingTime.startTime == 0L) {userReadingTime.startTime = userAction.time;}userReadingTime.endTime = userAction.time;return userReadingTime;}@Overridepublic UserReadingTime getResult(UserReadingTime userReadingTime) {return userReadingTime;}@Overridepublic UserReadingTime merge(UserReadingTime userReadingTime, UserReadingTime acc1) {return null;}}).addSink(new SinkFunction<UserReadingTime>() {@Overridepublic void invoke(UserReadingTime value, Context context) throws Exception {System.err.println("用户" + value.userId + " 阅读了 " + (value.endTime - value.startTime) + " ms");}});environment.execute();}@Data@AllArgsConstructor@NoArgsConstructorpublic static class UserAction {public Long userId;public long time;public static UserAction mock() {return new UserAction(1L, System.currentTimeMillis());}}@Data@AllArgsConstructor@NoArgsConstructorpublic static class UserReadingTime {public Long userId;public long startTime;public long endTime;}
}

运行Flink作业,控制台随机输出用户的阅读时长

用户1 阅读了 3240 ms
用户1 阅读了 9414 ms
用户1 阅读了 138 ms
用户1 阅读了 2960 ms

时间语义

时间语义和时间窗口息息相关。

Flink 提供了三种不同的时间语义,分别是:处理时间、事件时间、摄入时间。

在不同的时间语义下,针对同样的数据,Flink 分配的时间窗口是不一样的。

举个例子,我们要统计某个商品过去1分钟的销量,这是个典型的一分钟大小的时间窗口。用户在 09:00:50 下了一笔订单,中间由于网络延时等原因,Flink 在 09:01:01 才收到这笔订单数据,恰巧此时 Flink 因为自身作业压力宕机崩溃,在 09:02:10 才恢复作业,该笔订单数据随即被 keyBy 分组发送给下游算子处理。

这个例子中的三个时间点,刚好对应了 Flink 的三种时间语义:

  • 事件时间:事件发生的时间,通常数据本身会携带一个时间戳,即例子中的 09:00:50
  • 摄入时间:Flink 数据源接收数据的subTask算子本地时间,即例子中的 09:01:01
  • 处理时间:Flink 算子处理数据的机器本地时间,即例子中的 09:02:10

事件时间

事件时间是最常用的,在事件时间语义下,数据本身通常会携带一个时间戳,Flink 会根据该时间戳为数据分配正确的时间窗口。

因为事件时间是不会改变的,所以在事件时间语义下,Flink 窗口计算的结果始终是一致的,数据是清晰明确的。

但是,事件时间语义 会带来另一个问题。事件的产生是顺序的,但是数据在传输过程中,可能会因为网络拥塞等种种原因,到达 Flink 时乱序了。此时,Flink 如何处理这些乱序数据就是个麻烦事儿了。

举个例子,还是统计商品过去1分钟的销量,Flink 先是接收到事件时间为 09:00:30 的订单数据,此时将其分配到 [09:00,09:01] 窗口缓存起来,接着接收到了 09:01:30 的订单数据,此时 [09:00,09:01] 窗口可以关闭并计算了吗?显然不能,因为数据乱序到达的原因,谁也不能保证 Flink 待会不会收到 09:00 分钟产生的订单。

那怎么办呢?[09:00,09:01] 窗口总不能一直不关闭吧。为了解决这个问题,Flink 引入了 Watermark 机制,这里不做介绍。

使用事件时间对应的窗口分配器是:

  • TumblingEventTimeWindows 基于事件时间的滚动窗口
  • SlidingEventTimeWindows 基于事件时间的滑动窗口
  • EventTimeSessionWindows 基于事件时间的会话窗口

如下示例,每秒生成一个带时间戳的随机数,数据用 Flink 自带的 Tuple2 封装,同时用 TumblingEventTimeWindows 让 Flink 基于事件时间语义来分配 5秒 的滚动窗口。运行 Flink 作业,控制台每隔5秒会输出前5秒的随机数之和。

public class TumblingWindow {public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.addSource(new SourceFunction<Tuple2<Long, Long>>() {@Overridepublic void run(SourceContext<Tuple2<Long, Long>> sourceContext) throws Exception {while (true) {Threads.sleep(1000);// f0是随机数 f1是时间戳Tuple2<Long, Long> tuple2 = new Tuple2<>(ThreadLocalRandom.current().nextLong(100), System.currentTimeMillis());sourceContext.collectWithTimestamp(tuple2, tuple2.f1);sourceContext.emitWatermark(new Watermark(System.currentTimeMillis()));}}@Overridepublic void cancel() {}}).windowAll(TumblingEventTimeWindows.of(Duration.ofSeconds(5L))).sum(0).print();environment.execute();}
}

控制台输出

// subTask任务ID 数字和 时间戳
19> (108,1722432788302)
20> (308,1722432790305)
21> (324,1722432795346)

总结一下,如果业务要按照事件发生的时间计算结果或分析数据,那么只能选事件时间语义。通常情况下,事件时间也确实更有价值。例如,利用Flink分析用户的行为日志,用户具体在什么时间点做了哪些行为,会更有分析价值,至于 Flink 是什么时候处理这些日志的,对业务方来说并不重要。因为事件时间具有不变性,所以基于事件时间统计的结果总是清晰明确的,缺点是数据到达Flink是乱序的,处理迟到数据会给Flink带来一定的压力。

摄入时间

摄入时间是指数据到达 Flink Source 算子的本地机器时间,它为处理数据流提供了一种相对简单而直观的时间参考,算是在 事件时间 和 处理时间 中间做了一个折中。

摄入时间具备一定的优势。一方面,它避免了事件时间的乱序问题,相较于事件时间具备更高的处理效率;另一方面,相较于处理时间而言,它具备不变性,计算产生的结果也会更加准确。

摄入时间适用于那些对时间精度要求不是特别高,但又希望时间能够相对反映数据进入系统先后顺序的场景。

如下示例,使用摄入时间语义计算过去5秒窗口生成的随机数之和。因为用的是摄入时间,所以无须发送 Watermark,数据本身也无须携带时间戳。

public class IngestionTimeFeature {public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();// 采用摄入时间语义environment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);environment.addSource(new SourceFunction<Tuple1<Long>>() {@Overridepublic void run(SourceContext<Tuple1<Long>> sourceContext) throws Exception {while (true) {Threads.sleep(1000);sourceContext.collect(new Tuple1<>(ThreadLocalRandom.current().nextLong(100)));}}@Overridepublic void cancel() {}}).keyBy(IN -> "all").timeWindow(Time.of(5L, TimeUnit.SECONDS)).sum(0).print();environment.execute();}
}

处理时间

处理时间语义是指数据实际被处理的时间,也就是数据到达Window算子时subTask机器的本地时间。

因为 处理时间语义 完全依靠算子的机器本地时间,所以时间窗口在划分数据和触发计算,都只需要依靠本地时间来驱动,性能是最好的,延迟低,适用于对高性能和延迟敏感的业务。

同样的,处理时间语义也有它的劣势。因为采用的是subTask算子的本地时间,所以数据的时间其实是具备不确定性的。举个例子,订单数据在 09:00:01 被算子接收,它会被分配到 [09:00,09:01]窗口,假设此时该subTask作业故障宕机,等到 09:10:00 才恢复,Flink 重新消费这条数据,它又会被分配到 [09:10,09:11] 窗口,产出的数据就会不一致。因此在使用处理时间语义时,要保证业务方能接受这种因为异常情况导致的计算结果不符合预期的场景。

如下示例,采用处理时间语义,因为是采用subTask本地时间,所以同样也不需要发送 Watermark。

public class ProcessTimeFeature {public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();// 采用处理时间语义environment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);environment.addSource(new SourceFunction<Tuple1<Long>>() {@Overridepublic void run(SourceContext<Tuple1<Long>> sourceContext) throws Exception {while (true) {Threads.sleep(1000);sourceContext.collect(new Tuple1<>(ThreadLocalRandom.current().nextLong(100)));}}@Overridepublic void cancel() {}}).windowAll(TumblingProcessingTimeWindows.of(Duration.ofSeconds(5L))).sum(0).print();environment.execute();}
}

尾巴

Flink 具有丰富的时间语义,包括事件时间、处理时间和摄入时间。事件时间基于数据本身携带的时间戳,处理时间基于系统处理数据的本地时钟,摄入时间则是数据进入 Flink Source算子的时间。

时间窗口是 Flink 处理流式数据的重要方式,Flink 提供了 滚动窗口、滑动窗口、会话窗口 三种窗口类型。滚动窗口有固定大小且不重叠,滑动窗口大小固定且可重叠,会话窗口根据数据间隔来划分。合理选择时间语义和时间窗口,能更准确有效地处理和分析流式数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/452310.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

VSCode自搭建嵌入式环境的make构建工具选择

make构建工具即make.exe。 make.exe作为环境变量&#xff0c;和Makefile脚本同步协作&#xff0c;Makefile里面的语法规定了代码项目中多文件的编译顺序和编译规则。 ①MinGW-64&#xff1a;如果选择MinGW/bin文件目录下的mingw32-make.exe&#xff0c;将其重命名为make.exe&a…

2.cpp输入输出

cpp输入输出 1.cpp输入输出 1.cpp输入输出 项目中需要用到中文提示&#xff0c;需要去设置中更改字符编码为GBK&#xff0c;不然程序会乱码 注意&#xff1a;先设置编码格式&#xff0c;再创建工程 C 中的输入和输出&#xff08;I/O&#xff09;主要是通过标准库中的输入输出…

scala 抽象类

理解抽象类 抽象的定义 定义一个抽象类 &#xff1a;abstract class A {} idea实例 抽象类重写 idea实例 练习 1.abstract2.错3.abstract class A{}4.对

GROUP BY分组

1. 插入测试数据 INSERT INTO course (course_name,teacher_id) VALUES (毛概,1)&#xff0c; (线性代数,2)&#xff0c; (政治&#xff0c;3)&#xff0c; (程序设计语言,1)&#xff0c; (离散数学,2)&#xff0c; (编译技术,3)&#xff0c; (嵌入式基础,1)&#xff0c; (单片…

element plus中menu菜单技巧

我在使用element plus的menu&#xff08;侧边栏&#xff09;组件的过程中遇到了一些问题&#xff0c;就是menu编写样式和路由跳转&#xff0c;下面给大家分享以下&#xff0c;我是怎么解决的。 1.页面效果 我要实现的网站布局是这样的&#xff1a; 侧边栏折叠以后的效果&#…

C++数据结构-红黑树全面解读(进阶篇)

1.红黑树的概念 红黑树是一种二叉搜索树&#xff0c;但在每个结点上增加了一个存储位用于表示结点的颜色&#xff0c;这个颜色可以是红色的&#xff0c;也可以是黑色的&#xff0c;因此我们称之为红黑树。 红黑树通过对任何一条从根到叶子的路径上各个结点着色方式的限制&…

el-table表格里面有一条横线

表格里面 有一条横线&#xff0c; 出现原因&#xff1a;是自定义了表格头.使用了固定列&#xff08;fixed&#xff09;&#xff0c;定宽。就很难受。。。 添加样式文件&#xff1a; <style lang"scss" scoped>::v-deep {.el-table__fixed-right {height: 100%…

【STL】string类的使用

&#x1f31f;&#x1f31f;作者主页&#xff1a;ephemerals__ &#x1f31f;&#x1f31f;所属专栏&#xff1a;C、STL 目录 string类的介绍--为什么学习string类 一、string类的默认成员函数 构造函数(constructor) 析构函数(destructor) 赋值运算符重载operator 二…

java maven

参考链接 maven相关配置 maven依赖管理 依赖具有传递性。 maven依赖范围 maven的生命周期 分为三个相互独立的生命周期&#xff1a; 在执行对应生命周期的操作时&#xff0c;需要进行前面的操作。比如&#xff0c;执行打包install的时候&#xff0c;会执行test。

嵌入式入门学习——8基于Protues仿真Arduino+SSD1306液晶显示数字时钟

0 系列文章入口 嵌入式入门学习——0快速入门&#xff0c;Let‘s Do It&#xff01; SSD1306 1 Protues查找SSD1306器件并放置在画布&#xff0c;画好电气连接&#xff08;这里VCC和GND画反了&#xff0c;后面仿真出错我才看见&#xff0c;要是现实硬件估计就烧毁了&#xf…

抖音快手提取COOKIE双参软件-修行者编程技术网

抖音快手提取COOKIE双参软件-修行者编程技术网 我们在软件开发的过程中首先要知道&#xff0c;什么是ck&#xff0c;什么是双参数 为什么会有ck&#xff0c;ck是否存在算法在其中 UI代码工程展示

【火山引擎】调用火山大模型的方法 | SDK安装 | 配置 | 客户端初始化 | 设置

豆包 (Doubao) 是字节跳动研发的大规模预训练语言模型。 目录 1 安装 2 配置访问凭证 3 客户端初始化 4 设置地域和访问域名 5 设置超时/重试次数 1 安装 通过pip安装PYTHON SDK。 pip install volcengine-python-sdk[ark] 2 配置访问凭证 获取 API Key 访问凭证具体步…

【NOIP提高组】一元三次方程求解

【NOIP提高组】一元三次方程求解 &#x1f490;The Begin&#x1f490;点点关注&#xff0c;收藏不迷路&#x1f490; 有形如&#xff1a;ax3bx2cxd0 这样的一个一元三次方程。给出该方程中各项的系数(a&#xff0c;b&#xff0c;c&#xff0c;d均为实数)&#xff0c;并约定该方…

PyQt 入门教程(3)基础知识 | 3.1、使用QtDesigner创建.ui文件

文章目录 一、使用QtDesigner创建.ui文件1、创建.ui文件2、生成.py文件3、使用新生成的.py文件4、编辑新生成的.py文件 一、使用QtDesigner创建.ui文件 1、创建.ui文件 打开PyCharm&#xff0c;使用自定义外部工具QtDesigner创建mydialog.ui文件&#xff0c;如下&#xff1a; …

基于因果推理的强对流降水临近预报问题研究

我国地域辽阔&#xff0c;自然条件复杂&#xff0c;灾害性天气种类繁多&#xff0c;地区差异性大。雷雨大风、冰雹、短时强降水等强对流天气是造成经济损失、危害生命安全最严重的一类灾害性天气。由于强对流降水具有高强度、小空间尺度等特点&#xff0c;一直是气象预报领域的…

python爬虫技术实现酷我付费破解下载

python爬虫技术实现酷我付费破解下载 1.python编程环境 python解释器:pyhton3版本 代码编辑器:Vscode,PyCharm 2.实现爬虫程序过程 2.1浏览器访问网站的过程 在浏览器导航栏中输入域名并回车(在按下回车的那一瞬间浏览器向网站发送了一个http请求)当网站接收到请求后向…

【Vue】Vue3(1)

文章目录 1 Vue3简介2 Vue3带来了什么2.1 性能的提升2.2 源码的升级2.3 拥抱TypeScript2.4 新的特性 3 创建Vue3.0工程3.1 使用 vue-cli 创建3.2 使用 vite 创建3.3 main.js3.4 App.vue 4 常用 Composition API4.1 拉开序幕的setup4.1.1 setup函数的两种返回值4.1.2 注意点4.1.…

Python酷玩之旅_数据分析入门(matplotlib)

导览 前言matplotlib入门1. 简介1.1 Pairwise data1.2 Statistical distributions1.3 Gridded data1.4 Irregularly gridded data1.5 3D and volumetric data 2. 实践2.1 安装2.2 示例 结语系列回顾 前言 翻看日历&#xff0c;今年的日子已划到了2024年10月19日&#xff0c;今天…

【Linux-进程间通信】vscode使用通信引入匿名管道引入

一、新系统&#xff0c;新软件 1.新系统 哈喽宝子们&#xff0c;从今以后我们不再使用风靡一时的CentOS系统了&#xff0c;因为CentOS已经不在维护了&#xff0c;各大公司几乎也都从CentOS转入其他操作系统了&#xff1b;我们现在由原来的CentOS系统切换到最新的Ubuntu系统&a…

[LeetCode] 232. 用栈实现队列

请你仅使用两个栈实现先入先出队列。队列应当支持一般队列支持的所有操作&#xff08;push、pop、peek、empty&#xff09;&#xff1a; 实现 MyQueue 类&#xff1a; void push(int x) 将元素 x 推到队列的末尾int pop() 从队列的开头移除并返回元素int peek() 返回队列开头…