0基础学习PyFlink——个数滑动窗口(Sliding Count Windows)

大纲

  • 滑动(Sliding)和滚动(Tumbling)的区别
  • 样例
    • 窗口为2,滑动距离为1
    • 窗口为3,滑动距离为1
    • 窗口为3,滑动距离为2
    • 窗口为3,滑动距离为3
  • 完整代码
  • 参考资料

在 《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》一文中,我们介绍了滚动窗口。本节我们要介绍滑动窗口。

滑动(Sliding)和滚动(Tumbling)的区别

正如其名,“滑动”是指这个窗口沿着一定的方向,按着一定的速度“滑行”。
在这里插入图片描述
而滚动窗口,则是一个个“衔接着”,而不是像上面那样交错着。
在这里插入图片描述
它们的相同之处就是:只有窗口内的事件数量到达窗口要求的数值时,这些窗口才会触发计算。

样例

我们只要对《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》中的代码做轻微的改动即可。为了简化样例,我们只看Key为E的元素的滑动。

word_count_data = [("E",3),("E",1),("E",4),("E",2),("E",6),("E",5)]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource = env.from_collection(word_count_data, source_type_info)# source.print()# keyingkeyed=source.key_by(lambda i: i[0]) 

窗口为2,滑动距离为1

count_window会根据传入的第二参数决定是构建滚动(CountTumblingWindowAssigner)窗口还是滑动(CountSlidingWindowAssigner)窗口。

    def count_window(self, size: int, slide: int = 0):"""Windows this KeyedStream into tumbling or sliding count windows.:param size: The size of the windows in number of elements.:param slide: The slide interval in number of elements... versionadded:: 1.16.0"""if slide == 0:return WindowedStream(self, CountTumblingWindowAssigner(size))else:return WindowedStream(self, CountSlidingWindowAssigner(size, slide))

我们只要给count_window第二个参数传递一个不为0的值,即可达到滑动效果。

    # reducingwindows_size = 2sliding_size = 1reduced=keyed.count_window(windows_size, sliding_size) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))# # define the sinkreduced.print()# submit for executionenv.execute()

(E,2)
(E,2)
(E,2)
(E,2)
(E,2)

在这里插入图片描述

窗口为3,滑动距离为1

    # reducingwindows_size = 3sliding_size = 1reduced=keyed.count_window(windows_size, sliding_size) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))

(E,3)
(E,3)
(E,3)
(E,3)

在这里插入图片描述

窗口为3,滑动距离为2

    # reducingwindows_size = 3sliding_size = 2reduced=keyed.count_window(windows_size, sliding_size) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))

(E,3)
(E,3)

在这里插入图片描述

窗口为3,滑动距离为3

这个就等效于滚动窗口了,因为“滑”过了窗口大小。

    # reducingwindows_size = 3sliding_size = 3reduced=keyed.count_window(windows_size, sliding_size) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))

(E,3)
(E,3)

在这里插入图片描述

完整代码

from typing import Iterablefrom pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, WindowFunction
from pyflink.datastream.window import CountWindowclass SumWindowFunction(WindowFunction[tuple, tuple, str, CountWindow]):def apply(self, key: str, window: CountWindow, inputs: Iterable[tuple]):return [(key,  len([e for e in inputs]))]word_count_data = [("E",3),("E",1),("E",4),("E",2),("E",6),("E",5)]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource = env.from_collection(word_count_data, source_type_info)# source.print()# keyingkeyed=source.key_by(lambda i: i[0]) # reducingwindows_size = 3sliding_size = 1reduced=keyed.count_window(windows_size, sliding_size) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))# # define the sinkreduced.print()# submit for executionenv.execute()if __name__ == '__main__':word_count()

参考资料

  • https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/learn-flink/streaming_analytics/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/177658.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Explaining and harnessing adversarial examples

Explaining and harnessing adversarial examples----《解释和利用对抗样本》 背景: 早期的研究工作认为神经网络容易受到对抗样本误导是由于其非线性特征和过拟合。 创新点: 该论文作者认为神经网络易受对抗性扰动影响的主要原因是它的线性本质&#xf…

stm32 模拟I2C

目录 简介 I2C 物理层 协议层 ①②:起始信号和结束信号 ③ 应答和非应答信号 ④数据有效性 ⑤数据传输 ⑥空闲状态 简介 I2C 物理层 一个 I2C 总线两条线组成,一个双向串行数据线SDA用来表示数据,一个串行时钟线SCL用于数据收发同步…

5个最流行的文本生成纹理AI工具

在线工具推荐: Three.js AI纹理开发包 - YOLO合成数据生成器 - GLTF/GLB在线编辑 - 3D模型格式在线转换 - 3D场景编辑器 拥抱文本生成纹理AI模型改变游戏规则的力量,人工智能驱动的创新彻底改变了游戏开发中的资产创建。 这些出色的工具可将书面描述转换…

《Generic Dynamic Graph Convolutional Network for traffic flow forecasting》阅读笔记

论文标题 《Generic Dynamic Graph Convolutional Network for traffic flow forecasting》 干什么活:交通流预测(traffic flow forecasting )方法:动态图卷积网络(Dynamic Graph Convolutional Network)…

Ubuntu 使用 nginx 搭建 https 文件服务器

Ubuntu 使用 nginx 搭建 https 文件服务器 搭建步骤安装 nginx生成证书修改 config重启 nginx 搭建步骤 安装 nginx生成证书修改 config重启 nginx 安装 nginx apt 安装: sudo apt-get install nginx生成证书 使用 openssl 生成证书: 到对应的路径…

【Mybatis-Plus】常见的@table类注解

目录 引入Mybatis-Plus依赖 TableName 当实体类的类名在转成小写后和数据库表名相同时 当实体类的类名在转成小写后和数据库表名不相同时 Tableld TableField 当数据库字段名与实体类成员不一致 成员变量名以is开头,且是布尔值 ​编辑 成员变量名与数据库关…

IDEA中application.properties文件中文乱码

现象: 原因: 项目编码格式与IDEA编码格式不一致导致的 解决办法: 在File->Settings->Editor->File Encodings选项中,将Global Encoding,Project Encoding,Default encoding for properties files这三个选项置为一致&a…

使用 Curl 和 DomCrawler 下载抖音视频链接并存储到指定文件夹

项目需求 假设我们需要从抖音平台上下载一些特定的视频,以便进行分析、编辑或其他用途。为了实现这个目标,我们需要编写一个爬虫程序来获取抖音视频的链接,并将其保存到本地文件夹中。 目标分析 在开始编写爬虫之前,我们需要了…

QQ文件怎么恢复?3个方法解决文件丢失问题!

无论是在学习还是工作中,我们都有可能需要接触到QQ这款软件。QQ传输文件十分方便,因此仍然有许多小伙伴喜欢用QQ来发送各种类型的文件。对于大家来说,最害怕的莫过于重要的文件出现丢失的情况。 当我们发现QQ文件意外删除或者过期时该怎么办…

k8s之集群调度

目录 调度 工作机制 调度过程 调度算法 优先级 指定调度节点 调度 Kubernetes 是通过 List-Watch 的机制进行每个组件的协作,保持数据同步的,每个组件之间的设计实现了解耦。 用户是通过 kubectl 根据配置文件,向 APIServer 发送命令…

【java学习—十】操作集合的工具类Collections(8)

文章目录 1. 操作集合的工具类: Collections2. 应用3. 查找、替换3.1. max 与 min3.2. 根据Comparator返回max(min) 3.3. frequency 与 replaceAll4. 同步控制 1. 操作集合的工具类: Collections Collections 是一个操作 Set 、List 和 Map 等集合的工具…

嵌入式Linux系统的闪存设备和文件系统学习纪要

嵌入式Linux系统的闪存设备和文件系统学习纪要 Linux下的文件系统结构如下: NAND Flash 是一种非易失性存储器(Non-Volatile Memory),常用于闪存设备和固态硬盘(SSD)中。以下是几种常见的 NAND Flash 种类&…

PTA 函数题(C语言)-- 阶乘计算升级版

题目title: 阶乘计算升级版 题目作者: 陈越 浙江大学 本题要求实现一个打印非负整数阶乘的函数。 函数接口定义: void Print_Factorial ( const int N ); 其中N是用户传入的参数,其值不超过1000。如果N是非负整数&#…

C#Onnx模型信息查看工具

效果 Netron效果 项目 代码 using Microsoft.ML.OnnxRuntime; using System; using System.Collections.Generic; using System.Text; using System.Windows.Forms;namespace Onnx_Demo {public partial class frmMain : Form{public frmMain(){InitializeComponent();}string…

VS2022 开发方式

使用 C# 在VS 2022 上开发时,发现有多种项目类型可以创建。这些类型放一起容易搞混,于是记录一下各种类型的区别。 这里主要介绍windows控制台程序、MFC程序、WPF程序、WinForm程序的特点。 创建哪种应用? 创建控制台应用 Windows控制台程序…

【数据挖掘 | 数据预处理】缺失值处理 重复值处理 文本处理 确定不来看看?

🤵‍♂️ 个人主页: AI_magician 📡主页地址: 作者简介:CSDN内容合伙人,全栈领域优质创作者。 👨‍💻景愿:旨在于能和更多的热爱计算机的伙伴一起成长!!&…

Centos虚拟机安装配置与MobaXterm工具及Linux常用命令

目录 一、Centos操作系统 1.1 Centos介绍 1.2 Centos虚拟机安装 1.3 配置centos的镜像 1.4 虚拟机开机初始设置 1.4.1 查看网络配置 1.4.2 编辑网络配置 二、MobaXterm工具 2.1 MobaXterm介绍 2.2 MobaXterm安装 2.3 切换国内源 三、Linux常用命令和模式 3.1 查看网络配置 …

“2024中国电子信息展会“百年历史展会,4月深圳,7月成都,11月上海

2024年中国电子信息博览会,将如一位游历全国的使者,跨越千山万水,让人们见证中国电子信息产业的辉煌成就。它的足迹将遍布全国多个地区,4月走进繁花似锦的深圳,7月拥抱历史悠久的成都,11月则落脚国际化的上…

CloudCompare 二次开发(20)——二次曲面拟合

目录 一、概述二、代码集成三、结果展示本文由CSDN点云侠原创,原文链接。爬虫网站自重。 一、概述 由CloudCompare——点云二次曲面拟合一文知:CloudCompare软件中的已经集成了二次曲面拟合功能,但是计算出来的拟合参数是不正确的。因此,本文在原有算法的基础上进行修改,…

LSF 概览——了解 LSF 是如何满足您的作业要求,并找到最佳资源来运行该作业的

LSF 概览 了解 LSF 是如何满足您的作业要求,并找到最佳资源来运行该作业的。 IBM Spectrum LSF ("LSF", load sharing facility 的简称) 软件是行业领先的企业级软件。LSF 将工作分散在现有的各种 IT 资源中,以创建共享的,可扩展…