0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)

大纲

  • map
  • reduce
  • 完整代码
  • 参考资料

在《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》一文中,我们发现如果窗口内元素个数没有达到窗口大小时,计算个数的函数是不会被调用的。如下图中红色部分
在这里插入图片描述
那么有没有办法让上图中(B,2)和(D,5)也会被计算呢?
这就可以使用本节介绍的时间滚动窗口。它不依赖于窗口中元素的个数,而是窗口的时间,即窗口时间到了,计算就会进行。
我们稍微修改下《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》的例子,让元素集中在“A”上。

map

class SumWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]):def apply(self, key: str, window: TimeWindow, inputs: Iterable[tuple]):print(*inputs, window)return [(key,  len([e for e in inputs]))]word_count_data = [("A",2),("A",1),("A",4),("A",3),("A",6),("A",5),("A",7),("A",8),("A",9),("A",10),("A",11),("A",12),("A",13),("A",14),("A",15),("A",16),("A",17),("A",18),("A",19),("A",20)]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource = env.from_collection(word_count_data, source_type_info)# source.print()# keyingkeyed=source.key_by(lambda i: i[0]) 

reduce

    # reducingreduced=keyed.window(TumblingProcessingTimeWindows.of(Time.milliseconds(2))) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))# # define the sinkreduced.print()# submit for executionenv.execute()

这儿我们的Window使用的是滚动时间窗口,其中参数Time.milliseconds(2)是指窗口时长,即2毫秒一个窗口。
我们运行多次代码可以得到不同的结果

(‘A’, 2) (‘A’, 1) (‘A’, 4) (‘A’, 3) (‘A’, 6) (‘A’, 5) (‘A’, 7) (‘A’, 8) (‘A’, 9) (‘A’, 10) (‘A’, 11) (‘A’, 12) TimeWindow(start=1698771761164, end=1698771761166)
(A,12)
(‘A’, 13) (‘A’, 14) (‘A’, 15) (‘A’, 16) (‘A’, 17) (‘A’, 18) (‘A’, 19) (‘A’, 20) TimeWindow(start=1698771761166, end=1698771761168)
(A,8)

在这里插入图片描述

或者

(‘A’, 2) (‘A’, 1) (‘A’, 4) (‘A’, 3) (‘A’, 6) (‘A’, 5) (‘A’, 7) (‘A’, 8) (‘A’, 9) (‘A’, 10) (‘A’, 11) (‘A’, 12) (‘A’, 13) (‘A’, 14) (‘A’, 15) (‘A’, 16) TimeWindow(start=1698771731386, end=1698771731388)
(A,16)
(‘A’, 17) (‘A’, 18) (‘A’, 19) (‘A’, 20) TimeWindow(start=1698771731388, end=1698771731390)
(A,4)

在这里插入图片描述

或者

(‘A’, 2) (‘A’, 1) (‘A’, 4) (‘A’, 3) (‘A’, 6) (‘A’, 5) (‘A’, 7) (‘A’, 8) (‘A’, 9) (‘A’, 10) (‘A’, 11) (‘A’, 12) (‘A’, 13) (‘A’, 14) (‘A’, 15) (‘A’, 16) (‘A’, 17) (‘A’, 18) (‘A’, 19) (‘A’, 20) TimeWindow(start=1698771714992, end=1698771714994)
(A,20)

在这里插入图片描述

可以发现结果并不稳定。但是可以发现,每个元素都参与了计算,而不像个数滚动窗口那样部分数据没有被触发计算。

完整代码

from typing import Iterable
import time
from pyflink.common import Types, Time
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, WindowFunction
from pyflink.datastream.window import TimeWindow, TumblingProcessingTimeWindowsclass SumWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]):def apply(self, key: str, window: TimeWindow, inputs: Iterable[tuple]):print(*inputs, window)return [(key,  len([e for e in inputs]))]word_count_data = [("A",2),("A",1),("A",4),("A",3),("A",6),("A",5),("A",7),("A",8),("A",9),("A",10),("A",11),("A",12),("A",13),("A",14),("A",15),("A",16),("A",17),("A",18),("A",19),("A",20)]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource = env.from_collection(word_count_data, source_type_info)# source.print()# keyingkeyed=source.key_by(lambda i: i[0]) # reducingreduced=keyed.window(TumblingProcessingTimeWindows.of(Time.milliseconds(2))) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))# # define the sinkreduced.print()# submit for executionenv.execute()if __name__ == '__main__':word_count()

参考资料

  • https://nightlies.apache.org/flink/flink-docs-master/api/python/reference/pyflink.datastream/api/pyflink.datastream.window.TumblingProcessingTimeWindows.html#pyflink.datastream.window.TumblingProcessingTimeWindows

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/178339.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Prometheus接入AlterManager配置钉钉告警(基于K8S环境部署)

文章目录 一、钉钉群创建报警机器人二、安装Webhook-dingtalk插件三、配置Webhook-dingtalk插件对接钉钉群四、配置AlterManager告警发送至Webhook-dingtalk五、Prometheus接入AlterManager配置六、部署PrometheusAlterManager(放到一个Pod中)七、测试告警 注意:请基…

Redis安装-常用命令及操作

目录 一.Redis简介 二.redis安装 1.1安装Linux版本 1.2安装 windows版本 三.redis的常用命令 Redis哈希(Hash) 一.Redis简介 Redis是一个开源(BSD许可),内存存储的数据结构服务器,可用作数据库,高速缓存和消息队…

SpringCloud(三) Ribbon负载均衡

SpringCloud(二) Eureka注册中心的使用-CSDN博客 在SpringCloud(二)中学习了如何通过Eureka实现服务的注册和发送,从而通过RestTemplate实现不同微服务之间的调用,加上LoadBalance注解之后实现负载均衡,那负载均衡的原理是什么呢? 目录 一, 负载均衡 1.1 负载均衡原理 1.2 源…

数据可视化篇——pyecharts模块

在之前的文章中我们已经介绍过爬虫采集到的数据用途之一就是用作可视化报表,而pyecharts作为Python中可视化工具的一大神器必然就受到广大程序员的喜爱。 一、什么是Echarts? ECharts 官方网站 : https://echarts.apache.org/zh/index.html ECharts 是…

UI动效的都可以用哪些工作来制作

随着UI设计的不断发展,UI动效越来越多地应用于现实生活中。手机,iPad、计算机、网页和其他设备被广泛使用,所以问题来了,为什么UI动态效果越来越被广泛使用?它的优点是什么?哪些软件可以设计UI动态效果&…

Apache ECharts简介和相关操作

文章目录 一、Apache ECharts介绍二、快速入门1.下载echarts.js文件2.新建index.html文件3.准备一个DOM容器用于显示图表4.完整代码展示5.相关配置 三、演示效果四、总结 一、Apache ECharts介绍 Apache ECharts 是一款基于 Javascript 的数据可视化图表库,提供直观…

3 — NLP 中的标记化:分解文本数据的艺术

一、说明 这是一个系列文章的第三篇文章, 文章前半部分分别是: 1 、NLP 的文本预处理技术 2、NLP文本预处理技术:词干提取和词形还原 在本文中,我们将介绍标记化主题。在开始之前,我建议您阅读我之前介绍的关…

最新版一媒体7.3、星媒体、皮皮剪辑,视频MD ,安卓手机剪辑去重神器+搬运脚本+去视频重软件工具

最新版一媒体app安卓版介绍: 这是一款功能强大的视频搬运工具,内置海量视频编辑工具,支持一键智能化处理、混剪、搬运、还能快速解析和去水印等等,超多实用功能等着您来体验! 老牌手机剪辑去重神器,用过的…

【Linux】安装使用Nginx负载均衡,并且部署前端项目

目录 一、Nginx概述 1. 什么 2. 背景 3. 作用 二、Nginx负载均衡 1. 讲述 2. 使用 1. 下载 2. 安装 3. 负载均衡 三、前端部署 1. 准备 2. 部署 一、Nginx概述 1. 什么 Nginx是一个高性能的开源Web服务器和反向代理服务器。它具有轻量级、高并发、低内存消耗的…

c++设计模式二:原型模式

使用场景:当需要构建多个相同的类对象时,而且该类对象结构较为复杂,如果每个都重新组织构建会很麻烦。 其实,就是写一个拷贝构造函数,或者写一个拷贝每个成员变量的clone()方法。 举例说明:比如一个相亲网站…

通过条件竞争实现内核提权

条件竞争漏洞(Race Condition Vulnerability)是一种在多线程或多进程并发执行时可能导致不正确行为或数据损坏的安全问题。这种漏洞通常发生在多个线程或进程试图访问和修改共享资源(如内存、文件、网络连接等)时,由于…

01背包问题

算法分析: 算法优化:(转为一维进行存储) : 代码实现:

Qt5 安装 phonon

Qt5 安装 phonon Qt5 安装 phonon问题描述安装组件 Qt5 安装 phonon 开发环境:Qt Creator 4.6.2 Based on Qt 5.9.6 问题描述 在运行 Qt5 项目时,显示错误: error: Unknown module(s) in QT: phonon这是缺少组件的原因,QT: pho…

微服务框架SpringcloudAlibaba+Nacos集成RabbitMQ

目前公司使用jeepluscloud版本,这个版本没有集成消息队列,这里记录一下,集成的过程;这个框架跟ruoyi的那个微服务版本结构一模一样,所以也可以快速上手。 1.项目结构图: 配置类的东西做成一个公共的模块 …

机器学习快速入门教程 Scikit-Learn实现

机器学习是什么? 机器学习是一帮计算机科学家想让计算机像人一样思考所研发出来的计算机理论。他们曾经说过,人和计算机其实本没有差别,同样都是一大批互相连接的信息传递和存储元素所组成的系统。所以有了这样的想法,加上他们得天独厚的数学功底,机器学习的前身也就孕育而生…

的修工单管理系统好用吗?工单系统应该怎么选?

在当今的数字化时代,企业运营效率的高低往往取决于其内部管理工具的先进性和实用性。工单管理系统作为企业运营中的重要工具,其作用日益凸显。市场上存在许多工单管理系统,但“的修”以其独特的产品差异化和优势,在竞争中独树一帜…

容器:软件性能测试的最佳环境

容器总体上提供了一种经济的和可扩展的方法来测试产品在实际情况下的性能,同时还能保持较低的资源成本和开销成本。 软件性能和可伸缩性是我们谈论应用程序开发时经常遇到的话题。一个很大的原因是应用程序的性能和可伸缩性直接影响其在市场上的成功。一个应用程序…

flink状态不能跨算子

背景 在flink中进行状态的维护和管理应该是我们经常做的事情,但是有些同学认为名称一样的状态在不同算子之间的状态是同一个,事实是这样吗? flink状态在保存点中的存放示意图 事实上,每个状态都归属于对应的算子,也…

外汇天眼:MAS下令星展银行暂停六个月的非必要活动

新加坡金融管理局(MAS)已经决定对星展银行有限公司(DBS Bank Ltd)的非必要IT变更进行为期六个月的暂停,以确保银行集中精力恢复其数字银行服务的弹性。在此期间,星展银行将不被允许开展新的业务&#xff0c…

python项目部署代码汇总:目标检测类、人体姿态类

一、AI健身计数 1、图片视频检测 (cpu运行): 注:左上角为fps,左下角为次数统计。 1.哑铃弯举:12,14,16 详细环境安装教程:pyqt5AI健身CPU实时检测mediapipe 可视化界面…