Flink窗口分类简介及示例代码

水善利万物而不争,处众人之所恶,故几于道💦

文章目录

      • 1. 流式计算
      • 2. 窗口
      • 3. 窗口的分类
        • ◆ 基于时间的窗口(时间驱动)
          • 1) 滚动窗口(Tumbling Windows)
          • 2) 滑动窗口(Sliding Windows)
          • 3) 会话窗口(Session Windows)
        • ◆ 基于元素个数的(数据驱动)
          • 1) 滚动窗口(Tumbling Windows)
          • 2) 滑动窗口(Sliding Windows)

1. 流式计算

  Flink作为一个流式处理引擎,被设计用来处理无限数据集,理论上来说,无限数据集是一种不断产生,源源不断的数据集,说白了就是你不知道这个数据流它啥时候结束,这就是无限数据集。

  流式计算的思想是每来一个数据我就直接处理,而不用等,因此他非常适合在实时性要求比较高的场景下使用。

2. 窗口

  在流处理的场景下,如果我们想要统计过去某个时间段或过去多少条数据的指标时,就需要用到窗口,在Flink中,窗口(window)可以将流划分为有限块进行处理,Flink将这些有限的块抽象为“存储桶(bucket)”,我们可以在这些所谓的桶上做计算,也就实现了无限数据的有限计算。

  窗口(Window)是处理无界流的关键所在。窗口可以将数据流装入大小有限的“桶”中,再对每个“桶”加以处理。

  窗口的声明周期是:一个窗口在第一个属于它的元素到达时就会被创建,然后在时间(event 或 processing time) 超过窗口的“结束时间戳 + 用户定义的 allowed lateness (可容忍的迟到时间)”时 被完全删除。Flink 仅保证删除基于时间的窗口,其他类型的窗口不做保证, 比如全局窗口(Global Windows)。 例如,对于一个基于 event time 且范围互不重合(滚动)的窗口策略, 如果窗口设置的时长为五分钟、可容忍的迟到时间(allowed lateness)为 1 分钟, 那么第一个元素落入 12:00 至 12:05 这个区间时,Flink 就会为这个区间创建一个新的窗口。 当 watermark 越过 12:06 时,这个窗口将被摧毁。关于窗口的详细介绍查看->官方对于窗口的介绍

3. 窗口的分类

◆ 基于时间的窗口(时间驱动)

1) 滚动窗口(Tumbling Windows)

window(TumblingProcessingTimeWindows.of(Time.seconds(10))),参数是时间滚动窗口大小。10秒滚动一个窗口

  滚动窗口将元素分发到指定大小的窗口。滚动窗口的大小是固定的,且个窗口之间没有空隙,不会重叠。比如说,如果你指定了滚动窗口的大小为5分钟,那么每5分钟就会有一个窗口被计算,且一个新的窗口被创建。如下图所示:
在这里插入图片描述
示例代码:

public class Flink01_Window_Time {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(2);env.socketTextStream("hadoop101",9999).map(line->{String[] datas = line.split(",");return new WaterSensor(datas[0],Long.valueOf(datas[1]),Integer.valueOf(datas[2]));}).keyBy(WaterSensor::getId)// 定义一个长度为10s的滚动窗口 每隔10s滚动一次.window(TumblingProcessingTimeWindows.of(Time.seconds(10))).process(// 泛型的含义是:输入元素的类型,输出元素的类型,key的类型,窗口类型new ProcessWindowFunction<WaterSensor, Object, String, TimeWindow>() {// 在窗口关闭的时候触发一次@Overridepublic void process(String s, // key ,keyBy之后的keyContext context,  //上下文对象:里面封装了一些信息,比如窗口开始时间,结束时间,定时服务器...Iterable<WaterSensor> elements, // 存储了这个窗口内所有的元素Collector<Object> out) throws Exception {// 把Iterable中所有的元素取出并存入到 list 集合中List<WaterSensor> list = AnqclnUtil.toList(elements);// 获取窗口的相关信息,窗口开始时间和结束时间String startTime = AnqclnUtil.toDateTime(context.window().getStart());String endTime = AnqclnUtil.toDateTime(context.window().getEnd());out.collect("窗口:"+startTime+" "+endTime+" ,key:"+s + " ,list:"+list);}}).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}

自定义的工具类:

public class AnqclnUtil {// 要先声明泛型public static <T>List<T> toList(Iterable<T> elements) {List<T> list = new ArrayList<>();for (T t : elements) {list.add(t);}return list;}// 将long类型的时间转换为时间字符串public static String toDateTime(long ts) {return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(ts);}
}

运行结果:

在这里插入图片描述

2) 滑动窗口(Sliding Windows)

window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5))),参数是时间滑动窗口大小和滑动距离。5秒滑动一个窗口,每个窗口最多放10个元素

  与滚动窗口类似,滑动窗口的 assigner 分发元素到指定大小的窗口,窗口大小通过 window size 参数设置。 滑动窗口需要一个额外的滑动距离(window slide)参数来控制生成新窗口的频率。 因此,如果 slide 小于窗口大小,滑动窗口可以允许窗口重叠。这种情况下,一个元素可能会被分发到多个窗口。

  比如说,你设置了大小为 10 分钟,滑动距离 5 分钟的窗口,你会在每 5 分钟得到一个新的窗口, 里面包含之前 10 分钟到达的数据(如下图所示)。

在这里插入图片描述

示例代码:

public class Flink01_Window_Time {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(2);env.socketTextStream("hadoop101",9999).map(line->{String[] datas = line.split(",");return new WaterSensor(datas[0],Long.valueOf(datas[1]),Integer.valueOf(datas[2]));}).keyBy(WaterSensor::getId)// 定义一个长度为10s的滚动窗口 每隔10s滚动一次
//                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))// 定义一个滑动窗口:窗口长度是10s,滑动间隔5s   这种情况一个元素可能出现在多个窗口,因为有滑动.window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5))).process(// 泛型的含义是:输入元素的类型,输出元素的类型,key的类型,窗口类型new ProcessWindowFunction<WaterSensor, Object, String, TimeWindow>() {// 在窗口关闭的时候触发一次@Overridepublic void process(String s, // key ,keyBy之后的keyContext context,  //上下文对象:里面封装了一些信息,比如窗口开始时间,结束时间,定时服务器...Iterable<WaterSensor> elements, // 存储了这个窗口内所有的元素Collector<Object> out) throws Exception {// 把Iterable中所有的元素取出并存入到 list 集合中List<WaterSensor> list = AnqclnUtil.toList(elements);// 获取窗口的相关信息,窗口开始时间和结束时间String startTime = AnqclnUtil.toDateTime(context.window().getStart());String endTime = AnqclnUtil.toDateTime(context.window().getEnd());out.collect("窗口:"+startTime+" "+endTime+" ,key:"+s + " ,list:"+list);}}).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}

运行结果:
在这里插入图片描述

3) 会话窗口(Session Windows)

window(ProcessingTimeSessionWindows.withGap(Time.seconds(4))),参数是会话间隔,也就是多久没有活跃就关闭当前会话。4秒不活跃就关闭窗口。

  会话窗口的 assigner 会把数据按活跃的会话分组。 与滚动窗口和滑动窗口不同,会话窗口不会相互重叠,且没有固定的开始或结束时间。 会话窗口在一段时间没有收到数据之后会关闭,即在一段不活跃的间隔之后。 会话窗口的 assigner 可以设置固定的会话间隔(session gap)或 用 session gap extractor 函数来动态地定义多长时间算作不活跃。 当超出了不活跃的时间段,当前的会话就会关闭,并且将接下来的数据分发到新的会话窗口。

在这里插入图片描述

示例代码:

public class Flink01_Window_Time {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(2);env.socketTextStream("hadoop101",9999).map(line->{String[] datas = line.split(",");return new WaterSensor(datas[0],Long.valueOf(datas[1]),Integer.valueOf(datas[2]));}).keyBy(WaterSensor::getId)// 定义一个长度为10s的滚动窗口 每隔10s滚动一次
//                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))// 定义一个滑动窗口:窗口长度是10s,滑动间隔5s   这种情况一个元素可能出现在多个窗口,因为有滑动
//                .window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))// 定义一个session窗口,时间间隔为4s  对于session窗口来说,不同的key出发时间不同,每个key都维护自己的session.window(ProcessingTimeSessionWindows.withGap(Time.seconds(4))).process(// 泛型的含义是:输入元素的类型,输出元素的类型,key的类型,窗口类型new ProcessWindowFunction<WaterSensor, Object, String, TimeWindow>() {// 在窗口关闭的时候触发一次@Overridepublic void process(String s, // key ,keyBy之后的keyContext context,  //上下文对象:里面封装了一些信息,比如窗口开始时间,结束时间,定时服务器...Iterable<WaterSensor> elements, // 存储了这个窗口内所有的元素Collector<Object> out) throws Exception {// 把Iterable中所有的元素取出并存入到 list 集合中List<WaterSensor> list = AnqclnUtil.toList(elements);// 获取窗口的相关信息,窗口开始时间和结束时间String startTime = AnqclnUtil.toDateTime(context.window().getStart());String endTime = AnqclnUtil.toDateTime(context.window().getEnd());out.collect("窗口:"+startTime+" "+endTime+" ,key:"+s + " ,list:"+list);}}).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}

运行结果:
在这里插入图片描述

◆ 基于元素个数的(数据驱动)

1) 滚动窗口(Tumbling Windows)

countWindow(3),参数是个数滚动窗口大小。3个元素滚动一个窗口

  每来多少个元素就滚动一次

示例代码:

public class Flink02_Window_Count {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(2);env.socketTextStream("hadoop101",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId)// 定义长度为3的基于个数的滚动窗口 因为是keyBy之后的,所以key一样的不到三条不会触发.countWindow(3).process(new ProcessWindowFunction<WaterSensor, String, String, GlobalWindow>() {@Overridepublic void process(String s,Context context,Iterable<WaterSensor> elements,Collector<String> out) throws Exception {List<WaterSensor> list = AnqclnUtil.toList(elements);out.collect(" key: "+s+" "+list);}}).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}

运行结果:
在这里插入图片描述

2) 滑动窗口(Sliding Windows)

countWindow(3,2),参数是个数滑动窗口大小和滑动步长。每两个元素产生一个新的窗口,每个窗口最多放3个元素。

  就比滚动的多了个参数,滑动步长。步长是生成新窗口的条件,而窗口大小是指这个窗口最多能放多少个元素

示例代码:

public class Flink02_Window_Count {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(2);env.socketTextStream("hadoop101",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId)// 定义长度为3的基于个数的滚动窗口 因为是keyBy之后的,所以key一样的不到三条不会触发
//                .countWindow(3)// 定义一个长度为3(窗口内元素的最大个数)  每来两个2个元素滑动一次,.countWindow(3,2).process(new ProcessWindowFunction<WaterSensor, String, String, GlobalWindow>() {@Overridepublic void process(String s,Context context,Iterable<WaterSensor> elements,Collector<String> out) throws Exception {List<WaterSensor> list = AnqclnUtil.toList(elements);out.collect(" key: "+s+" "+list);}}).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}

运行结果:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/89020.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Axure RP移动端高保真CRM办公客户管理系统原型模板及元件库

Axure RP移动端高保真CRM办公客户管理系统原型模板及元件库&#xff0c;一套典型的移动端办公工具型APP Axure RP原型模板&#xff0c;可根据实际的产品需求进行扩展&#xff0c;也可以作为移动端原型设计的参考案例。为提升本作品参考价值&#xff0c;在模板设计过程中尽量追求…

VVIC-据关键词取商品列表

一、接口参数说明&#xff1a; item_search-根据关键词取商品列表&#xff0c;点击更多API调试&#xff0c;请移步注册API账号点击获取测试key和secret 公共参数 请求地址: https://api-gw.onebound.cn/vvic/item_search 名称类型必须描述keyString是调用key&#xff08;点击…

c++中的多态

文章目录 1.多态的概念1.1概念 2.多态的定义及实现2.1多态的构成条件2.2虚函数2.3虚函数的重写2.4 C11 override 和 final2.5 重载、覆盖(重写)、隐藏(重定义)的对比 3. 抽象类3.1概念3.2接口继承和实现继承 4.多态的原理4.1虚函数表4.2多态原理分析4.3 动态绑定与静态绑定 5.单…

仿到位|独立版家政上门预约服务小程序家政保洁师傅上门服务小程序上门服务在线派单源码

上门预约服务派单小程序家政 小程序 同城预约 开源代码 独立版. 程序完整,经过安装检测,可放心下载安装。 适合本地的一款上门预约服务小程序,功能丰富,适用多种场景。 程序功能:城市管理/小程序DIY/服务订单/师傅管理/会员卡功能/营销功能/文章功能等等

【深度学习 video detect】Towards High Performance Video Object Detection for Mobiles

文章目录 摘要IntroductionRevisiting Video Object Detection BaselinePractice for Mobiles Model Architecture for MobilesLight Flow 摘要 尽管在桌面GPU上取得了视频目标检测的最近成功&#xff0c;但其架构对于移动设备来说仍然过于沉重。目前尚不清楚在非常有限的计算…

【人工智能前沿弄潮】——生成式AI系列:Diffusers应用 (2) 训练扩散模型(无条件图像生成)

无条件图像生成是扩散模型的一种流行应用&#xff0c;它生成的图像看起来像用于训练的数据集中的图像。与文本或图像到图像模型不同&#xff0c;无条件图像生成不依赖于任何文本或图像。它只生成与其训练数据分布相似的图像。通常&#xff0c;通过在特定数据集上微调预训练模型…

jxls导出问题

![请添加图片描述](https://img-blog.csdnimg.cn/bc74c4207818491c93b75e19b3333451.png 为什么最后导出的文件还是按原样导出啊&#xff0c;没有填充数据 ![在这里插入图片描述](https://img-blog.csdnimg.cn/d4500b9a98c042f6b64a5d0650071303.png

云安全攻防(八)之 Docker Remote API 未授权访问逃逸

Docker Remote API 未授权访问逃逸 基础知识 Docker Remote API 是一个取代远程命令行界面&#xff08;rcli&#xff09;的REST API&#xff0c;其默认绑定2375端口&#xff0c;如管理员对其配置不当可导致未授权访问漏洞。攻击者利用 docker client 或者 http 直接请求就可以…

【PostgreSQL的CLOG解析】

同样还是这张图&#xff0c;之前发过shared_buffer和os cache、wal buffer和work mem的文章&#xff0c;今天的主题是图中的clog&#xff0c;即 commit log&#xff0c;PostgreSQL10之前放在数据库目录的pg_clog下面。PostgreSQL10之后修更名为xact,数据目录变更为pg_xact下面&…

Vue+SpringBoot项目开发:登录页面美化,登录功能实现(三)

写在开始:一个搬砖程序员的随缘记录上一章写了从零开始VueSpringBoot后台管理系统&#xff1a;Vue3TypeScript项目搭建 VueTypeScript的前端项目已经搭建完成了 这一章的内容是引入element-plus和axios实现页面的布局和前后端数据的串联&#xff0c;实现一个登陆的功能&#x…

CSS变形与动画(一):transform变形 与 transition过渡动画 详解(用法 + 代码 + 例子 + 效果)

文章目录 变形与动画transform 变形translate 位移scale 缩放rotate 旋转skew 倾斜多种变形设置变形中心点 transition 过渡动画多种属性变化 变形与动画 transform 变形 包括&#xff1a;位移、旋转、缩放、倾斜。 下面的方法都是transform里的&#xff0c;记得加上。 展示效…

Apache Maven:从构建到部署,一站式解决方案

目录 一、Maven介绍 1. Maven是什么&#xff1f; 2.Maven的作用&#xff1f; 二、Maven仓库介绍 2.1 库的分类 三、Maven安装与配置 3.1 Maven安装 3.2 Maven环境配置 3.3 仓库配置 四、Eclipse与Maven配置 五、Maven项目测试 5.1 新建Maven项目步骤及注意事项 5.…

C/C++test两步完成CMake项目静态分析

您可能一直在静态分析中使用CMake。但您是否尝试过将Parasoft C/Ctest与CMake一起使用吗&#xff1f;以下是如何使用C/Ctest在基于CMake的项目中运行静态分析的详细说明。 CMake是用于构建、测试和打包软件的最流行的工具之一。Parasoft C/Ctest通过简化构建管理过程&#xff…

RabbitMQ基础(2)——发布订阅/fanout模式 topic模式 rabbitmq回调确认 延迟队列(死信)设计

目录 引出点对点(simple)Work queues 一对多发布订阅/fanout模式以登陆验证码为例pom文件导包application.yml文件rabbitmq的配置生产者生成验证码&#xff0c;发送给交换机消费者消费验证码 topic模式配置类增加配置生产者发送信息进行发送控制台查看 rabbitmq回调确认配置类验…

Redis_缓存1_缓存类型

14.redis缓存 14.1简介 穿透型缓存&#xff1a; 缓存与后端数据交互在一起&#xff0c;对服务端的调用隐藏细节。如果从缓存中可以读到数据&#xff0c;就直接返回&#xff0c;如果读不到&#xff0c;就到数据库中去读取&#xff0c;从数据库中读到数据&#xff0c;也是先更…

制造执行系统(MES)在新能源领域的应用

制造执行系统&#xff08;MES&#xff09;在新能源领域有许多应用&#xff0c;特别是在管理、监控和优化新能源生产过程方面。新能源包括太阳能、风能、生物质能、地热能等。以下是一些MES在新能源方面的应用领域&#xff1a; 生产计划与调度&#xff1a;MES可以协助规划和调度…

谷粒商城第十一天-品牌管理中关联分类

目录 一、总述 二、前端部分 1. 调整查询调用 2. 关联分类 三、后端部分 四、总结 一、总述 之前是在商品的分类管理中直接使用的若依的逆向代码 有下面的几个问题&#xff1a; 1. 表格上面的参数填写之后&#xff0c;都是按照完全匹配进行搜索&#xff0c;没有模糊匹配…

计算机网络—HTTP

这里写目录标题 HTTP是什么HTTP常见状态码HTTP常见字段GET与POST的区别Get和Post是安全和幂等吗PUT幂等&#xff0c;不安全DELETE幂等&#xff0c;不是安全 HTTP缓存技术HTTP缓存实现技术 HTTP1.0优缺点和性能HTTP1.1优缺点和性能HTTP2优缺点和性能HTTP3优缺点和性能HTTP和HTTP…

vuex学习总结

一、vuex工作原理 工作流程&#xff1a;需求&#xff1a;改变组件count的sun变量的值&#xff0c;先调用dispatch函数传入jia函数和要改变的值给actions&#xff08;这个actions里面必须有jia这个函数&#xff09;&#xff1b;actions收到后调用commit函数将jia方法和值传给mut…

做BI领域的ChatGPT,思迈特升级一站式ABI平台

8月8日&#xff0c;以「指标驱动 智能决策」为主题&#xff0c;2023 Smartbi V11系列新品发布会在广州丽思卡尔顿酒店开幕。 ​ 后疫情时代&#xff0c;BI发展趋势的观察与应对 在发布会上&#xff0c;思迈特CEO吴华夫在开场致辞中表示&#xff0c;当前大环境背景下&#xf…