说说FLINK细粒度滑动窗口如何处理

分析&回答

Flink的窗口机制是其底层核心之一,也是高效流处理的关键。Flink窗口分配的基类是WindowAssigner抽象类,下面的类图示出了Flink能够提供的所有窗口类型。

Flink窗口分为滚动(tumbling)、滑动(sliding)和会话(session)窗口三大类,本文要说的是滑动窗口。

下图示出一个典型的统计用户访问的滑动窗口,来自官方文档。

假设每两条虚线之间代表1分钟时间差,那么窗口大小(size)就是2分钟,滑动步长(slide)是1分钟。若时间特征为事件时间,代码如下。

dataStream .keyBy("userId") .window(SlidingEventTimeWindows.of(Time.minutes(2), Time.minutes(1))); 由图可知,当前滑动窗口与上一个滑动窗口会有重叠。在窗口大小size是步长slide的2倍的情况下,(几乎)每个DataStream元素都会处于2个窗口内。

我们简单参考一下相关的Flink源码,以加深理解。以下是窗口算子WindowOperator的processElement()方法的部分源码。

    @Overridepublic void processElement(StreamRecord<IN> element) throws Exception {final Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp(), windowAssignerContext);boolean isSkippedElement = true;final K key = this.<K>getKeyedStateBackend().getCurrentKey();if (windowAssigner instanceof MergingWindowAssigner) {// 会话窗口的处理逻辑,略去} else {for (W window : elementWindows) {if (isWindowLate(window)) {continue;}isSkippedElement = false;windowState.setCurrentNamespace(window);windowState.add(element.getValue());triggerContext.key = key;triggerContext.window = window;TriggerResult triggerResult = triggerContext.onElement(element);if (triggerResult.isFire()) {ACC contents = windowState.get();if (contents == null) {continue;}emitWindowContents(window, contents);}if (triggerResult.isPurge()) {windowState.clear();}registerCleanupTimer(window);}}// 最后是侧输出迟到数据的逻辑,略去}
复制代码

该方法先调用WindowAssigner.assignWindows()方法,根据输入元素的时间戳判断它应该属于哪些窗口。接着遍历所有窗口,将该元素加入对应的窗口状态(即缓存)中,并根据触发器返回的TriggerResult决定是输出(fire)还是清除(purge)窗口的内容,emitWindowContents()方法会调用用户函数。最后,还要调用registerCleanupTimer()方法注册计时器用来在窗口彻底过期时清除窗口状态。

以下是SlidingEventTimeWindows.assignWindows()方法的源码。

    @Overridepublic Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {if (timestamp > Long.MIN_VALUE) {List<TimeWindow> windows = new ArrayList<>((int) (size / slide));long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);for (long start = lastStart;start > timestamp - size;start -= slide) {windows.add(new TimeWindow(start, start + size));}return windows;} else {throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +"'DataStream.assignTimestampsAndWatermarks(...)'?");}}public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {return timestamp - (timestamp - offset + windowSize) % windowSize;}
复制代码

这段代码就不难理解了,先调用getWindowStartWithOffset()方法根据元素的时间戳计算出其窗口的起点时间戳,再逐次循环向后滑动,产生size / slide个窗口。我们可以将size / slide叫做“粒度”,亦即上述代码中返回的Collection集合的大小。粒度越大(“细”),滑动窗口之间的重合也越大。

代码读完了,有一个貌似稀松平常的需求:

以3分钟的频率实时计算App内各个子模块近24小时的PV和UV。

直觉上我们需要用粒度为1440 / 3 = 480的滑动窗口来实现它,但是细粒度的滑动窗口会带来性能问题,有两点:

状态 由代码可知,WindowOperator内维护了窗口本身的内部状态windowState(类型为InternalAppendingState)。对于一个元素,会将其写入对应的(key, window)二元组所圈定的状态中。可见,如果粒度为480,那么每个元素到来,更新windowState时都要遍历480个窗口并写入,开销是非常大的。在采用HDFS/RocksDB作为状态后端时,checkpoint的瓶颈也尤其明显。

定时器 在Flink中,定时器的实际实现是TimerHeapInternalTimer类,并且是用Flink自己实现的优先队列维护在堆内存中的。而在WindowOperator中,每一个(key, window)二元组都需要注册两个定时器:一是触发器注册的定时器,用于决定窗口数据何时输出;二是registerCleanupTimer()方法注册的清理定时器,用于在窗口彻底过期(如allowedLateness过期)之后及时清理掉窗口的内部状态。细粒度滑动窗口会造成维护的定时器增多,内存负担加重。

在官方文档Windows最后一节的最后,也有如下的提醒:

Flink creates one copy of each element per window to which it belongs. Given this, tumbling windows keep one copy of each element (an element belongs to exactly one window unless it is dropped late). In contrast, sliding windows create several of each element, as explained in the Window Assigners section. Hence, a sliding window of size 1 day and slide 1 second might not be a good idea.

可能有看官会问:预聚合不能解决细粒度窗口的问题吗?答案是不能。预聚合只是让AggregateFunction/ReduceFunction之后的数据量降低,但是进入WindowOperator的窗口状态的数据还是没变的。换句话说,就算触发器实现为FIRE_AND_PURGE,遍历大量窗口并写入状态的开销也是无法消除的。

扯了这么多,有解决方案吗?

当然是有的,办法总比困难多。我们一般使用 滚动窗口+在线存储+读时聚合 的思路作为workaround。简单来讲就是:

弃用滑动窗口,用长度等于原滑动窗口步长的滚动窗口代替; 每个滚动窗口将其周期内的数据做聚合,打入外部在线存储(内存数据库如Redis,LSM-based NoSQL存储如HBase); 扫描在线存储中对应时间区间(可以灵活指定)的所有行,并将计算结果返回给前端展示。 针对上面的PV/UV问题,如果采用Redis作为在线存储,我们可以将时间戳放在key内,并设定24小时过期时间。用数字字符串存储3分钟周期内的PV量,用HyperLogLog存储3分钟周期内的UV量。近24小时的PV和UV就分别可以通过简单加减和HyperLogLog的pfmerge/pfcount命令得出了。当然,实际操作起来还是要根据需求和服务器资源而定。

喵呜面试助手:一站式解决面试问题,你可以搜索微信小程序 [喵呜面试助手] 或关注 [喵呜刷题] -> 面试助手 免费刷题。如有好的面试知识或技巧期待您的共享!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/112793.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

postgresql-子查询

postgresql-子查询 简介派生表IN 操作符ALL 操作符ANY 操作符关联子查询横向子查询EXISTS 操作符 简介 子查询&#xff08;Subquery&#xff09;是指嵌套在其他 SELECT、INSERT、UPDATE 以及 DELETE 语句中的 查询语句。 子查询的作用与多表连接查询有点类似&#xff0c;也是为…

NTP时钟同步服务器

目录 一、什么是NTP&#xff1f; 二、计算机时间分类 三、NTP如何工作&#xff1f; 四、NTP时钟同步方式&#xff08;linux&#xff09; 五、时间同步实现软件&#xff08;既是客户端软件也是服务端软件&#xff09; 六、chrony时钟同步软件介绍 七、/etc/chrony.conf配置文件介…

JVM内存管理、内存分区:堆、方法区、虚拟机栈、本地方法栈、程序计数器

内存管理 内存分区 线程共享 堆 存放实例&#xff0c;字符串常量&#xff08;直接引用&#xff09;&#xff0c;静态变量&#xff0c;线程分配缓冲区&#xff08;TLAB线程私有&#xff09;。垃圾收集器管理的区域 方法区 非堆&#xff0c;和堆相对的概念。存储已被虚拟机加载的…

uniapp的 picker 日期时间选择器

效果图&#xff1a; dateTimePicker.js function withData(param){return param < 10 ? 0 param : param; } function getLoopArray(start,end){var start start || 0;var end end || 1;var array [];for (var i start; i < end; i) {array.push(withData(i))…

【ACM出版】第四届人工智能与计算工程国际学术会议(ICAICE 2023)

ACM出版|第四届人工智能与计算工程国际学术会议 The 4th International Conference on Artificial Intelligence and Computer Engineering 为了在人工智能技术应用与计算工程领域进一步的探索&#xff0c;与国内外学界和业界相关人员交流新问题、新发现、新成果、新应用&…

__call__函数

一、定义 在Python中&#xff0c;__call__函数是一个特殊的方法&#xff0c;用于使一个对象可以像函数一样被调用。当一个对象定义了__call__方法时&#xff0c;它就成为了一个可调用对象。 二、使用 class Counter:def __init__(self):self.count 0def __call__(self):sel…

查局域网所有占用IP

查局域网所有占用IP 按&#xff1a;winr 出现下面界面&#xff0c;在文本框中输入 cmd 按确定即可出现cmd命令界面 在cmd命令窗口输入你想要ping的网段&#xff0c;下面192.168.20.%i即为你想要ping的网段&#xff0c;%i代表0-255 for /L %i IN (1,1,254) DO ping -w 1 -n 1…

优化物料编码规则,提升物料管理效率

导 读 ( 文/ 2358 ) 物料是生产过程的必需品。对物料进行身份的唯一标识&#xff0c;可以更好的管理物料库存、库位&#xff0c;更方便的对物料进行追溯。通过编码规则的设计&#xff0c;可以对物料按照不同的属性、类别或特征进行分类&#xff0c;从而更好地进行库存分析、计划…

业务需要咨询?开发遇到 bug 想反馈?开发者在线提单功能上线!

大家是否遇到过下列问题—— 在开发的时候&#xff0c;遇到 bug 需要反馈… 有合作意向的时候&#xff0c;想更多了解业务和相关产品… 在接入的时候&#xff0c;需要得到专业技术支持… 别急&#xff0c;荣耀开发者服务平台在线提单功能上线了~ 处理问题分类说明&#xff1…

ElasticSearch(一)数据类型

ElasticSearch&#xff08;一&#xff09;数据类型 1.简述 Es数据类型分为基础数据类型和复杂类型数据&#xff0c;掌握ES数据类型才能进一步使用ES检索数据内容。 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot…

阿里云申请免费SSL证书的两种验证方式及配置服务器Tomcat升级HTTPS协议

通用教程&#xff0c;其他服务商的免费 SSL 证书也差不多是这个流程。&#xff08;至少腾讯云的操作步骤和本文是一致&#xff0c;嘻嘻&#xff01;&#xff09; 申请 SSL 证书 首先在阿里云上创建并申请 SSL 证书&#xff0c;之后选择 DNS 验证的方式&#xff0c;一种是手动配…

【Go 基础篇】Go语言结构体详解:打开自定义类型的大门

嗨&#xff0c;Go语言学习者们&#xff01;在编程的世界里&#xff0c;数据是核心&#xff0c;而结构体&#xff08;Struct&#xff09;是一种能够帮助我们更有组织地存储和操作数据的重要工具。在本篇博客中&#xff0c;我们将深入探讨Go语言中结构体的概念、定义、初始化、嵌…

PYTHON链家租房数据分析:岭回归、LASSO、随机森林、XGBOOST、KERAS神经网络、KMEANS聚类、地理可视化...

全文下载链接:http://tecdat.cn/?p29480 作者&#xff1a;Xingsheng Yang 1 利用 python 爬取链家网公开的租房数据&#xff1b; 2 对租房信息进行分析&#xff0c;主要对房租相关特征进行分析&#xff0c;并搭建模型用于预测房租&#xff08;点击文末“阅读原文”获取完整代码…

自动化运维:Ansible脚本之playbook剧本

目录 一、理论 1.playbooks 2.YAML 3.使用ansible批量安装apache服务 4.定义、引用变量 5.指定远程主机sudo切换用户 6.when条件判断 7.迭代 8.Templates 模块 9.tags 模块 10.Roles 模块 二、实验 1.使用ansible批量安装apache服务 2.定义、引用变量…

k8s之存储篇---数据卷Volume

数据卷概述 Kubernetes Volume&#xff08;数据卷&#xff09;主要解决了如下两方面问题&#xff1a; 数据持久性&#xff1a;通常情况下&#xff0c;容器运行起来之后&#xff0c;写入到其文件系统的文件暂时性的。当容器崩溃后&#xff0c;kubelet 将会重启该容器&#xff…

深度学习4. 循环神经网络 – Recurrent Neural Network | RNN

目录 循环神经网络 – Recurrent Neural Network | RNN 为什么需要 RNN &#xff1f;独特价值是什么&#xff1f; RNN 的基本原理 RNN 的优化算法 RNN 到 LSTM – 长短期记忆网络 从 LSTM 到 GRU RNN 的应用和使用场景 总结 百度百科维基百科 循环神经网络 – Recurre…

无涯教程-Android - List View函数

Android ListView 是垂直滚动列表中显示的视图&#xff0c;使用 Adapter 从列表(如数组或数据库)中获取内容的列表项会自动插入列表中。 适配器(Adapter)实际上是UI组件和将数据填充到UI组件中的数据源之间的桥梁&#xff0c;适配器保存数据并将数据发送到适配器视图&#xff0…

75 # koa 基本逻辑实现以及属性的扩展

准备工作 新建自己的 kaimo-koa 文件夹&#xff0c;结构如下&#xff1a; lib application.js&#xff1a;创建应用context.js&#xff1a;上下文request.js&#xff1a;koa 中自己实现的 request 的对象response.js&#xff1a;koa 中自己实现的 response 的对象 package.js…

有序充电运营管理平台是基于物联网和大数据技术的充电设施管理系统-安科瑞黄安南

随着我国能源战略发展以及低碳行动的实施&#xff0c;电动汽车已逐步广泛应用&#xff0c;而电动汽车的应用非常符合当今社会对环保意识的要求&#xff0c;以及有效节省化石燃料的消耗。 由于其没有污染排放的优点以及政府部门的关注&#xff0c;电动汽车将成为以后出行的重要…

设计模式行为型-状态模式

文章目录 简介状态模式基础定义状态接口或抽象类实现具体状态类 上下文类与状态转换上下文类的定义和作用状态转换及触发条件 状态模式的优势与适用性优点一&#xff1a;可维护的代码优点二&#xff1a;清晰的状态管理适用场景一&#xff1a;对象拥有多个状态适用场景二&#x…