0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)

大纲

  • Tumbling Count Windows
    • map
    • reduce
      • Window Size为2
      • Window Size为3
      • Window Size为4
      • Window Size为5
      • Window Size为6
  • 完整代码
  • 参考资料

之前的案例中,我们的Source都是确定内容的数据。而Flink是可以处理流式(Streaming)数据的,就是数据会源源不断输入。
在这里插入图片描述
对于这种数据,我们称之为无界流,即没有“终止的界限”。但是程序在底层一定不能等着无止境的数据都传递结束再处理,因为“无止境”就意味着“终止的界限”触发计算的条件是不存在的。那么我们可以人为的给它设置一个“界”,这就是我们本节介绍的窗口。

Tumbling Count Windows

Tumbling Count Windows是指按元素个数计数的滚动窗口。
滚动窗口是指没有元素重叠的窗口,比如下面图是个数为2的窗口。(元素重叠的窗口我们会在《0基础学习PyFlink——个数滑动窗口(Sliding Count Windows)》介绍)
在这里插入图片描述
个数为3的窗口
在这里插入图片描述
我们用代码探索下这个概念

map

word_count_data = [("A",2),("A",1),("B",3),("B",1),("B",2),("C",3),("C",1),("C",4),("C",2),("D",3),("D",1),("D",4),("D",2),("D",5),("E",3),("E",1),("E",4),("E",2),("E",6),("E",5)]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource = env.from_collection(word_count_data, source_type_info)# source.print()# keyingkeyed=source.key_by(lambda i: i[0]) 

这段代码构造了一个KeyedStream,用于存储word_count_data中的数据。
我们并没有让Source是流的形式,是因为为了降低例子复杂度。但是我们将runntime mode设置为流(STREAMING)模式。
在这里插入图片描述

reduce

我们需要定义一个Reduce类,用于对元组中的数据进行计算。这个类需要继承于WindowFunction,并实现相应方法(本例中是apply)。
apply会计算一个相同key的元素个数。比如key是“E”的元组个数是6。

class SumWindowFunction(WindowFunction[tuple, tuple, str, CountWindow]):def apply(self, key: str, window: CountWindow, inputs: Iterable[tuple]):return [(key,  len([e for e in inputs]))]

Window Size为2

    # reducingreduced=keyed.count_window(2) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))# # define the sinkreduced.print()# submit for executionenv.execute()

(A,2)
(B,2)
(C,2)
(C,2)
(D,2)
(D,2)
(E,2)
(E,2)
(E,2)

  • A的个数是2是因为A的确只有两个元组,而一个Size为2的Window正好承载了这两个元素。于是有(A,2)这个结果;
  • B的个数是3。但是会产生两个窗口,第一个窗口承载了前两个元素,第二个窗口当前只有一个元素。于是第一个窗口进行了Reduce计算,得出一个(B,2);第二个窗口还没进行reduce计算,就没有展现出结果;
  • C有4个,正好可以被2个窗口承载。这样我们就看到2个(C,2)。
  • D有5个,情况和B类似。它被分成了3个窗口,只有2个窗口满足个数条件,于是就输出2个(D,2);最后一个窗口因为元素不够,就没尽兴reduce计算了。
  • E有6个,正好被3个窗口承载。我们就看到3个(E,2)。
    在这里插入图片描述

Window Size为3

    # reducingreduced=keyed.count_window(3) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))

(B,3)
(C,3)
(D,3)
(E,3)
(E,3)

在这里插入图片描述

Window Size为4

    # reducingreduced=keyed.count_window(4) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))

(C,4)
(D,4)
(E,4)

在这里插入图片描述

Window Size为5

    # reducingreduced=keyed.count_window(5) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))

(D,5)
(E,5)

在这里插入图片描述

Window Size为6

    # reducingreduced=keyed.count_window(6) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))

(E,6)

在这里插入图片描述

完整代码

from typing import Iterablefrom pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, WindowFunction
from pyflink.datastream.window import CountWindowclass SumWindowFunction(WindowFunction[tuple, tuple, str, CountWindow]):def apply(self, key: str, window: CountWindow, inputs: Iterable[tuple]):return [(key,  len([e for e in inputs]))]word_count_data = [("A",2),("A",1),("B",3),("B",1),("B",2),("C",3),("C",1),("C",4),("C",2),("D",3),("D",1),("D",4),("D",2),("D",5),("E",3),("E",1),("E",4),("E",2),("E",6),("E",5)]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource = env.from_collection(word_count_data, source_type_info)# source.print()# keyingkeyed=source.key_by(lambda i: i[0]) # reducingreduced=keyed.count_window(2) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))# # define the sinkreduced.print()# submit for executionenv.execute()if __name__ == '__main__':word_count()

参考资料

  • https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/learn-flink/streaming_analytics/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/177592.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

零信任网络:一种全新的网络安全架构

随着网络技术的不断发展,网络安全问题日益凸显。传统的网络安全策略往往基于信任和验证,但这种信任策略存在一定的局限性。为了解决这一问题,零信任网络作为一种全新的网络安全架构,逐渐受到人们的关注。本文将对零信任网络的概念…

qt 系列(二)---qt designer通过设置控件样式表进行背景颜色设置

1. 前言 一般Layouts不可以进行改变样式表,当我们想修改背景样式表,同时又不改变其他控件的颜色时,可以选择List View 控件改变背景颜色。 2. 设置背景 (1)配置 .qrc 文件 新建mypicture.qrc文件,记事本打…

虚拟机部署与发布J2EE项目(Linux版本)

🎬 艳艳耶✌️:个人主页 🔥 个人专栏 :《Spring与Mybatis集成整合》《Vue.js使用》 ⛺️ 越努力 ,越幸运。 1.jdk安装配置 打开虚拟机 Centos 登入账号,并且使用MobaXterm进行连接 1.1. 传入资源 连接…

Linux启动故障排错

Linux启动过程 开机流程、模块管理-CSDN博客 Grub三个阶段 1st stage:执行Grub主程序。Grub安装在MBR。由于MBR太小,所以与配置文件分开放1.5 stage:识别不同的文件系统2nd stage:加载Grub配置文件 /boot/grub2/grub.cfg。配置…

RISC-V IDE MRS无感远程协助模块详解

RISC-V IDE MRS无感远程协助模块详解 一、说明 1.1 概述 针对RISC-V/ARM等内核MCU的嵌入式集成开发环境MRS(MounRiver Studio)从V1.90版本开始内置无感远程协助模块(Sensorless Remote Assistant Module,以下简称SRA模块)。SRA模块是一款支…

《研发效能(DevOps)工程师》课程简介(一)丨IDCF

为贯彻落实《关于深化人才发展体制机制改革的意见》,推动实施人才强国战略,促进专业技术人员提升职业素养、补充新知识新技能,实现人力资源深度开发,推动经济社会全面发展,根据《中华人民共和国劳动法》有关规定&#…

AI技术发展:防范AI诈骗,守护数字安全

随着AI技术的迅猛发展,人工智能赋予了计算机更多的能力,包括自然语言处理、图像生成、声音合成等。这些领域的突破为人们提供了全新的体验和便捷,但同时也催生了一些潜在的安全风险,其中最突出的就是AI诈骗。本文将探讨如何防范AI…

入栏需看——管理类联考——英语——知识+记忆篇——导航页

文章目录 Section I Use of English——完型填空Section II Reading ComprehensionPart A——阅读理解 A 节(Part A)(四篇)Part B——阅读理解 B 节(Part B)(只有一篇)Part C——翻译…

vue基于ElementUI/Plus自定义的一些组件

vue3-my-ElementPlus 源码请到GitHub下载使用MyTable、MySelect、MyPagination 置顶|Top | 使用案例: 1.0 定义表格数据(测试使用) data() {return {tableData: [],value:[],valueList: [],}; },// 构造表格测试数据// 1 第一行&#xf…

成集云 | 英克ERP对接批发销售门店 | 解决方案

方案介绍 批发连锁门店是一种以批发销售为主,通过连锁经营方式进行管理的商业组织形态。它通常由一个总店或总公司负责管理和运营,下面拥有多个分店或加盟店,形成一个连锁经营网络。主要业务是向下游零售商或消费者销售商品,因此…

【NLP】什么是语义搜索以及如何实现 [Python、BERT、Elasticsearch]

语义搜索是一种先进的信息检索技术,旨在通过理解搜索查询和搜索内容的上下文和含义来提高搜索结果的准确性和相关性。与依赖于匹配特定单词或短语的传统基于关键字的搜索不同,语义搜索会考虑查询的意图、上下文和语义。 语义搜索在搜索结果的精度和相关…

AD9371 官方例程HDL JESD204B相关IP端口信号

AD9371 系列快速入口 AD9371ZCU102 移植到 ZCU106 : AD9371 官方例程构建及单音信号收发 ad9371_tx_jesd -->util_ad9371_xcvr接口映射: AD9371 官方例程之 tx_jesd 与 xcvr接口映射 AD9371 官方例程 时钟间的关系与生成 : AD9371 官方…

C语言实现 1.在一个二维数组中形成 n 阶矩阵,2.去掉靠边元素,生成新的 n-2 阶矩阵;3.求矩阵主对角线下元素之和:4.以方阵形式输出数组。

矩阵形式: 1 1 1 1 1 2 1 1 1 1 3 2 1 1 1 4 3 2 1 1 5 4 3 2 1 完整代码: /*编写以下函数 1.在一个二维数组中形成如以下形式的 n 阶矩阵: 1 1 1 1 1 2 1 1 1 1 3 2 1 1 1 4 3 2 1 1 5 4 3 2 1 2.去掉…

车载网络测试 - UDS诊断篇 - CAN与OSI七层模型

目录 为什么会介绍OSI七层模型? CAN规范与OSI模型 1、Physical Layer 1 2、Data Link Layer 2 3、Network Layer 3 & Transport Protocol Layer 4 4、Transport Protocol Layer 4 5、Session Layer 5 & Presentation Layer 6 & Application Laye…

0基础学习PyFlink——使用DataStream进行字数统计

大纲 sourceMapSplittingMapping ReduceKeyingReducing 完整代码结构参考资料 在《0基础学习PyFlink——模拟Hadoop流程》一文中,我们看到Hadoop在处理大数据时的MapReduce过程。 本节介绍的DataStream API,则使用了类似的结构。 source 为了方便&…

深度学习之基于YoloV5火灾烟雾报警系统(GUI界面)

欢迎大家点赞、收藏、关注、评论啦 ,由于篇幅有限,只展示了部分核心代码。 文章目录 一项目简介 二、功能三、火灾烟雾报警系统四. 总结 一项目简介 YoloV5 是深度学习中用于目标检测的一种算法,可以对输入的图像进行识别,标识出…

【计算机网络笔记】传输层——可靠数据传输之流水线机制与滑动窗口协议

系列文章目录 什么是计算机网络? 什么是网络协议? 计算机网络的结构 数据交换之电路交换 数据交换之报文交换和分组交换 分组交换 vs 电路交换 计算机网络性能(1)——速率、带宽、延迟 计算机网络性能(2)…

Linux的账号管理

本章的学习感觉如果不做系统管理员,作为简单了解就可以了 前面介绍了,用户,组,other三个角色, 每个用户创建都会有uid与之对应,创建的用户基本信息在一下两个文件中,也是我们要介绍的内容&…

【webrtc】 对视频质量的码率控制的测试与探索

目录 环境设置 transport-cc goog-remb (webrtc中的两种码率算法) 修改成remb算法 测试 效果 后续 可参考工程 环境设置 要到meshx上操作 telnet 112 然后执行factory_env show |grep meshx_ip 之后telnet meshx_ip 用户名admin 密码****.119 执行一下r…

Python自动化运维监控——批量监听页面发邮件(自由配置ini文件+smtplib)

一、程序样式 1.listen.ini配置文件 2.监控页面 3.日志 二、核心点 smtplib库:这里使用了smtp.qq.com与smtp.163.com两个发送邮件的地址,使用邮箱用户名与授权码来实现登录,端口都使用465,最后抛出异常,finally里…