0基础学习PyFlink——事件时间和运行时间的窗口

大纲

  • 定制策略
  • 运行策略
  • Reduce
  • 完整代码
  • 滑动窗口案例
  • 参考资料

在 《0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)》一文中,我们使用的是运行时间(Tumbling ProcessingTimeWindows)作为窗口的参考时间:

    reduced=keyed.window(TumblingProcessingTimeWindows.of(Time.milliseconds(2))) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))

而得到的结果也是不稳定的。
在这里插入图片描述
这是因为每次运行时,CPU等系统资源的繁忙程度是不一样的,这就影响了最后的运行结果。
为了让结果稳定,我们可以不依赖运行时间(ProcessingTime),而使用不依赖于运行环境,只依赖于数据的事件时间(EventTime)。
一般,我们需要大数据处理的数据,往往存在一个字段用于标志该条数据的“顺序”。这个信息可以是单调递增的ID,也可以是不唯一的时间戳。我们可以将这类信息看做事件发生的时间。
那如何让输入的数据中的“事件时间”参与到窗口时长的计算中呢?这儿就要引入时间戳和Watermark(水位线)的概念。
假如我们把数据看成一张纸上的内容,水位线则是这张纸的背景。它并不影响纸上内容的表达,只是系统要用它来做更多的事情。
将数据中表达“顺序”的数据转换成时间戳,我们可以使用水位线单调递增时间戳分配器

定制策略

class ElementTimestampAssigner(TimestampAssigner):def extract_timestamp(self, value, record_timestamp)-> int:return int(value[1])……       # define the watermark strategywatermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \.with_timestamp_assigner(ElementTimestampAssigner())

for_monotonous_timestamps会分配一个水位线单调递增时间戳分配器,然后使用with_timestamp_assigner告知输入数据中“顺序”字段的值。这样系统就会根据这个字段的值生成一个单调递增的时间戳。这个时间戳相对顺序就和输入数据一样,是稳定的。
比如上图中,会分别用2,1,4,3……来计算时间戳。

运行策略

然后对原始数据使用该策略,这样source_with_wartermarks中的数据就包含了时间戳。

source_with_wartermarks=source.assign_timestamps_and_watermarks(watermark_strategy)

Reduce

这次我们使用TumblingEventTimeWindows,即事件时间(EventTime)窗口,而不是运行时间(ProcessingTime)窗口。

     # keyingkeyed=source_with_wartermarks.key_by(lambda i: i[0]) # reducingreduced=keyed.window(TumblingEventTimeWindows.of(Time.milliseconds(2))) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))

(‘E’, 1) TimeWindow(start=0, end=2)
(‘E’, 3) (‘E’, 2) TimeWindow(start=2, end=4)
(‘E’, 4) (‘E’, 5) TimeWindow(start=4, end=6)
(‘E’, 6) (‘E’, 7) TimeWindow(start=6, end=8)
(‘E’, 8) (‘E’, 9) TimeWindow(start=8, end=10)
(‘E’, 10) TimeWindow(start=10, end=12)
(E,1)
(E,2)
(E,2)
(E,2)
(E,2)
(E,1)

多运行几次,结果是稳定输出的。
我们再多关注下TimeWindow中的start和end,它们是不重叠的、步长为2、左闭右开的区间。这个符合滚动窗口特性。

完整代码

from typing import Iterablefrom pyflink.common import Types, Time, WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, WindowFunction
from pyflink.datastream.window import TumblingEventTimeWindows, TimeWindow, TumblingProcessingTimeWindows, SlidingProcessingTimeWindows
from pyflink.common.watermark_strategy import TimestampAssignerclass ElementTimestampAssigner(TimestampAssigner):def extract_timestamp(self, value, record_timestamp)-> int:return int(value[1])class SumWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]):def apply(self, key: str, window: TimeWindow, inputs: Iterable[tuple]):print(*inputs, window)return [(key,  len([e for e in inputs]))]word_count_data = [("E",3),("E",1),("E",4),("E",2),("E",6),("E",5),("E",7),("E",8),("E",9),("E",10)]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource = env.from_collection(word_count_data, source_type_info)# source.print()# define the watermark strategywatermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \.with_timestamp_assigner(ElementTimestampAssigner())source_with_wartermarks=source.assign_timestamps_and_watermarks(watermark_strategy)# keyingkeyed=source_with_wartermarks.key_by(lambda i: i[0]) # reducingreduced=keyed.window(TumblingEventTimeWindows.of(Time.milliseconds(2))) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))# # define the sinkreduced.print()# submit for executionenv.execute()if __name__ == '__main__':word_count()

滑动窗口案例

from typing import Iterablefrom pyflink.common import Types, Time, WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, WindowFunction
from pyflink.datastream.window import SlidingEventTimeWindows, TimeWindow
from pyflink.common.watermark_strategy import TimestampAssignerclass ElementTimestampAssigner(TimestampAssigner):def extract_timestamp(self, value, record_timestamp)-> int:return int(value[1])class SumWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]):def apply(self, key: str, window: TimeWindow, inputs: Iterable[tuple]):print(*inputs, window)return [(key,  len([e for e in inputs]))]word_count_data = [("E",3),("E",1),("E",4),("E",2),("E",6),("E",5),("E",7),("E",8),("E",9),("E",10)]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource = env.from_collection(word_count_data, source_type_info)# source.print()# define the watermark strategywatermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \.with_timestamp_assigner(ElementTimestampAssigner())source_with_wartermarks=source.assign_timestamps_and_watermarks(watermark_strategy)# keyingkeyed=source_with_wartermarks.key_by(lambda i: i[0]) # reducingreduced=keyed.window(SlidingEventTimeWindows.of(Time.milliseconds(2), Time.milliseconds(1))) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))# # define the sinkreduced.print()# submit for executionenv.execute()if __name__ == '__main__':word_count()

(‘E’, 1) TimeWindow(start=0, end=2)
(‘E’, 1) (‘E’, 2) TimeWindow(start=1, end=3)
(‘E’, 3) (‘E’, 2) TimeWindow(start=2, end=4)
(‘E’, 3) (‘E’, 4) TimeWindow(start=3, end=5)
(‘E’, 4) (‘E’, 5) TimeWindow(start=4, end=6)
(‘E’, 6) (‘E’, 5) TimeWindow(start=5, end=7)
(‘E’, 6) (‘E’, 7) TimeWindow(start=6, end=8)
(‘E’, 7) (‘E’, 8) TimeWindow(start=7, end=9)
(‘E’, 8) (‘E’, 9) TimeWindow(start=8, end=10)
(‘E’, 9) (‘E’, 10) TimeWindow(start=9, end=11)
(‘E’, 10) TimeWindow(start=10, end=12)
(E,1)
(E,2)
(E,2)
(E,2)
(E,2)
(E,2)
(E,2)
(E,2)
(E,2)
(E,2)
(E,1)

通过TimeWindow的信息,我们看到这是一个步长为1、长度为2左闭右开的窗口。这个符合滑动窗口特点。

在这里插入图片描述

参考资料

  • https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/dev/datastream/event-time/built_in/
  • https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/learn-flink/streaming_analytics/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/179383.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

国际测试委员会BenchCouncil首发“开源系统杰出成果榜” 百度飞桨上榜

📕作者简介:热爱跑步的恒川,致力于C/C、Java、Python等多编程语言,热爱跑步,喜爱音乐的一位博主。 📗本文收录于恒川的日常汇报系列,大家有兴趣的可以看一看 📘相关专栏C语言初阶、C…

关于pytorch张量维度转换及张量运算

关于pytorch张量维度转换大全 1 tensor.view()2 tensor.reshape()3 tensor.squeeze()和tensor.unsqueeze()3.1 tensor.squeeze() 降维3.2 tensor.unsqueeze(idx)升维 4 tensor.permute()5 torch.cat([a,b],dim)6 torch.stack()7 torch.chunk()和torch.split()8 与tensor相乘运算…

RESTful接口实现与测试

目录标题 是什么?设计风格HTTP协议四种传参方式常用注解RequestBody与ResponseBodyRequestMapping注解RestController与ControllerPathVariable 与RequestParam 接受复杂嵌套对象参数Http数据转换的原理自定义HttpMessageConverter统一规划接口响应的数据格式实战&a…

为什么重写 redisTemplate

为什么重写 redisTemplate 1.安装 redis 上传 redis 的安装包tar -xvf redis-5.0.7.tar.gzyum -y install gcc-cmakemake PREFIX/soft/redis installcd /soft/redis/bin./redis-server redis.conf 2. 集成 redisTemplate maven 依赖 <dependency><groupId>org…

详解Java经典数据结构——HashMap

Java 的 HashMap 是一个常用的基于哈希表的数据结构&#xff0c;它实现了 Map 接口&#xff0c;可以存储键值对。下面我们进行详细介绍&#xff1a; 基本结构&#xff1a;HashMap 底层是基于哈希表来实现的&#xff0c;每次插入一个键值对时&#xff0c;会先对该键进行 Hash 运…

Locust:可能是一款最被低估的压测工具

01、Locust介绍 开源性能测试工具https://www.locust.io/&#xff0c;基于Python的性能压测工具&#xff0c;使用Python代码来定义用户行为&#xff0c;模拟百万计的并发用户访问。每个测试用户的行为由您定义&#xff0c;并且通过Web UI实时监控聚集过程。 压力发生器作为性…

本地部署Jellyfin影音服务器并实现远程访问影音库

文章目录 1. 前言2. Jellyfin服务网站搭建2.1. Jellyfin下载和安装2.2. Jellyfin网页测试 3.本地网页发布3.1 cpolar的安装和注册3.2 Cpolar云端设置3.3 Cpolar本地设置 4.公网访问测试5. 结语 1. 前言 随着移动智能设备的普及&#xff0c;各种各样的使用需求也被开发出来&…

【Python3】【力扣题】219. 存在重复元素 II

【力扣题】题目描述&#xff1a; 【Python3】代码&#xff1a; 1、解题思路&#xff1a;哈希表。遍历每个元素&#xff0c;将元素及下标添加到字典&#xff0c;若当前元素已在字典中且下标之间距离k&#xff0c;则存在重复元素。 知识点&#xff1a;{}&#xff1a;创建空字典…

【OpenCV实现图像梯度,Canny边缘检测】

文章目录 概要图像梯度Canny边缘检测小结 概要 OpenCV中&#xff0c;可以使用各种函数实现图像梯度和Canny边缘检测&#xff0c;这些操作对于图像处理和分析非常重要。 图像梯度通常用于寻找图像中的边缘和轮廓。在OpenCV中&#xff0c;可以使用cv2.Sobel()函数计算图像的梯度…

都是80m²小户型,凭啥她家那么好看!福州中宅装饰,福州装修

杨桥新苑 本案来自杨桥新苑的住宅&#xff0c; 质朴弥漫在80㎡的小家&#xff0c; 自然淡雅的木纹&#xff0c;精炼的玄关隔断&#xff0c; 简约的设计里传达着中式的静谧风雅&#xff0c; 简练的空间加入中国元素&#xff0c; 让人从进门开始就沾染一丝艺术气息。 风格&a…

瑞禧生物分享~今天是 碲化银粉体 Ag2Te CAS:12002-99-2

碲化银粉体 Ag2Te CAS&#xff1a;12002-99-2 纯度&#xff1a;99% 仅用于科研 储藏条件&#xff1a;冷藏-20℃ 简介&#xff1a;碲化银是一种无机化合物&#xff0c;化学式是Ag2Te。它是一种单斜晶体&#xff0c;并以矿物的形式存在于自然界中。化学计量的碲化银具有n型半导…

AI:50-基于深度学习的柑橘类水果分类

🚀 本文选自专栏:AI领域专栏 从基础到实践,深入了解算法、案例和最新趋势。无论你是初学者还是经验丰富的数据科学家,通过案例和项目实践,掌握核心概念和实用技能。每篇案例都包含代码实例,详细讲解供大家学习。 📌📌📌本专栏包含以下学习方向: 机器学习、深度学…

【Linux】Nignx及负载均衡动静分离

&#x1f389;&#x1f389;欢迎来到我的CSDN主页&#xff01;&#x1f389;&#x1f389; &#x1f3c5;我是Java方文山&#xff0c;一个在CSDN分享笔记的博主。&#x1f4da;&#x1f4da; &#x1f31f;推荐给大家我的专栏《微信小程序开发实战》。&#x1f3af;&#x1f3a…

系列四、全局配置文件mybatis-config.xml

一、全局配置文件中的属性 mybatis全局配置中的文件非常多&#xff0c;主要有如下几个&#xff1a; properties&#xff08;属性&#xff09;settings&#xff08;全局配置参数&#xff09;typeAliases&#xff08;类型别名&#xff09;typeHandlers&#xff08;类型处理器&am…

服务上千家企业,矩阵通2.0重磅上线,全链路管理新媒体矩阵

自上线以来 矩阵通已服务了上千家企业级客户 覆盖汽车、家居、媒体、金融、教育等多个行业 矩阵通1.0时代 我们以“数据”为基座打造出10功能 帮助企业轻松管理新媒体矩阵 实现账号管理、数据分析、竞对监测、 人员考核、风险监管等需求 而现在 矩阵通2.0重磅上线 新增…

华纳云:centos系统中怎么查看cpu信息?

在CentOS系统中&#xff0c;我们可以使用一些命令来查看CPU的详细信息。下面介绍几个常用的命令&#xff1a; 1. lscpu lscpu命令可以显示CPU的架构、型号、核心数、线程数、频率等信息。 # lscpu 执行以上命令后&#xff0c;会输出类似以下内容&#xff1a; 2. cat /proc/…

配置OpenCV

Open CV中包含很多图像处理的算法&#xff0c;因此学会正确使用Open CV也是人脸识别研究的一项重要工作。在 VS2017中应用Open CV&#xff0c;需要进行手动配置&#xff0c;下面给出在VS2017中配置Open CV的详细步骤。 1.下载并安装OpenCV3.4.1与VS2017的软件。 2.配置Open CV环…

07、vue : 无法加载文件 C:\Users\JH\AppData\Roaming\npm\vue.ps1,因为在此系统上禁止运行脚本。

目录 问题解决&#xff1a; 问题 vue : 无法加载文件 C:\Users\JH\AppData\Roaming\npm\vue.ps1&#xff0c;因为在此系统上禁止运行脚本。 在使用 VSCode 时&#xff0c;创建 Vue 项目报的错 创建不了 Vue 项目 解决&#xff1a; 因为在此系统上禁止运行该脚本&#xff0…

【排序算法】 计数排序(非比较排序)详解!了解哈希思想!

&#x1f3a5; 屿小夏 &#xff1a; 个人主页 &#x1f525;个人专栏 &#xff1a; 算法—排序篇 &#x1f304; 莫道桑榆晚&#xff0c;为霞尚满天&#xff01; 文章目录 &#x1f4d1;前言&#x1f324;️计数排序的概念☁️什么是计数排序&#xff1f;☁️计数排序思想⭐绝对…

四川天蝶电子商务有限公司:短视频运营怎么样?

短视频是一种以短小精悍的内容来吸引用户的新型媒体形式&#xff0c;近年来在社交网络平台上迅速走红&#xff0c;成为当今互联网世界的新宠。然而&#xff0c;要想成功运营短视频&#xff0c;需要借助一系列的策略和技巧&#xff0c;通过精心的规划和执行&#xff0c;才能够吸…