大数据-119 - Flink Window总览 窗口机制-滚动时间窗口-基于时间驱动基于事件驱动

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(正在更新!)

章节内容

上节我们完成了如下的内容:

  • Flink DataSet
  • Flink DataSet 转换操作
  • Flink DataSet 输出
  • 容错机制、对比、发展方向

在这里插入图片描述

Flink Window 背景

Flink认为Batch是Streaming的一个特例,因此Flink底层引擎是一个流式引擎,去上面实现了流处理和批处理,而Window就是从Streaming到Batch的桥梁。

通俗讲,Window是用来对一个无限的流的设置一个有限的集合,从而有界数据集上进行操作的一种机制,流上的集合由Window来划定范围,比如“计算过去10分钟”或者“最后50个元素的和”。
Window可以由时间(TimeWindow)比如30秒或者数据,(CountWindow)比如100个元素驱动。
DataStreamAPI提供了Time和Count的Window。

Flink Window 总览

基本概念

  • Window 是Flink处理无限流的核心,Windows将流拆分为有限大小“桶”,我们可以在其上应用计算。
  • Flink 认为Batch是Streaming的一个特例,所以Flink底层引擎是一个流式引擎,在上面实现了流处理和批处理。
  • 而Window窗口是从Streaming到Batch的一个桥梁。
  • Flink提供了非常完善的窗口机制
  • 在流处理中,数据是连续不断的,因此我们不可能等到所有等到所有数据都到了再开始处理。
  • 当然我们可以每来一个消息就处理一次,但是有时候我们需要做一些聚合操作,例如:在过去一分钟内有多少用户点击了我们的网页
  • 在这种情况下,我们必须定义一个窗口,用来收集最近的一分钟内的数据,并对这个窗口的内数据进行计算
  • 窗口可以基于时间驱动、也可以基于事件驱动
  • 同样基于不同事件驱动的可以分为:翻滚窗口(TumblingWindow 无重叠)、滑动窗口(Sliding Window 有重叠)、会话窗口(SessionWindow 活动间隙)、全局窗口
  • Flink要操作窗口,先要将StreamSource转换成WindowedStream

转换步骤

  • 获取流数据源
  • 获取窗口
  • 操作窗口数据
  • 输出窗口数据

滚动时间窗口

在这里插入图片描述

类型特点

将数据依据固定的窗口长度对数据进行切分:

  • 时间对齐
  • 窗口长度固定,没有重叠

Flink 的滚动时间窗口(Tumbling Window)是一种常见的基于时间的窗口机制,可以通过事件驱动进行计算。滚动窗口的特点是时间窗口是固定长度的,窗口之间没有重叠,每个事件只能进入一个窗口。

在 Flink 中,滚动时间窗口可以基于事件时间(Event Time)或者处理时间(Processing Time)来定义。为了基于事件时间驱动,可以使用 EventTimeSessionWindows 或者 TumblingEventTimeWindows 来进行定义。

关键点

  • 事件时间和水印 (Watermark): 通过 assignTimestampsAndWatermarks 来指定事件时间,并使用水印确保窗口计算不会遗漏延迟的事件。
  • 窗口定义: 使用 TumblingEventTimeWindows.of(Time.seconds(x)) 定义滚动窗口。窗口长度为 x 秒。
  • 触发器: 采用 EventTimeTrigger 触发计算,确保窗口是基于事件时间的。

基于时间驱动

场景:我们需要统计每一分钟用户购买商品的总数,需要将用户的行为事件按每一分钟进行切分,这种切分被叫做 翻滚时间窗口(Tumbling Time Window)。
启动的主类:

package icu.wzk;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import java.text.SimpleDateFormat;
import java.util.Random;public class TumblingWindow {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<String> dataStreamSource = env.getJavaEnv().socketTextStream("localhost", 9999);SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");long timeMillis = System.currentTimeMillis();int random = new Random().nextInt(10);System.out.println("value: " + value + ", random: " + random + ", time: " + format.format(timeMillis));return Tuple2.of(value, random);}});KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream.keyBy(new KeySelector<Tuple2<String, Integer>, Tuple>() {@Overridepublic Tuple getKey(Tuple2<String, Integer> value) throws Exception {return Tuple1.of(value.f0);}});// 基于时间驱动 每隔 10秒 划分一个窗口WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> timeWindow = keyedStream.timeWindow(Time.seconds(10));timeWindow.apply(new MyTimeWindowFunction()).print();env.execute("TumblingWindow");}}

我们实现一个 MyTimeWindowFunction,滚动时间窗口:

package icu.wzk;import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.text.SimpleDateFormat;public class MyTimeWindowFunction implements WindowFunction<Tuple2<String, Integer>, String, Tuple, TimeWindow> {/*** 场景:我们需要统计每一分钟用户购买商品的总数,需要将用户的行为事件按每一分钟进行切分,这种切分被叫做 翻滚时间窗口(Tumbling Time Window)* @author wzk* @date 16:58 2024/7/26**/@Overridepublic void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");int sum = 0;for (Tuple2<String, Integer> tuple2 : input) {sum += tuple2.f1;}out.collect("key: " + tuple.getField(0) + ", value: " + sum  +", window start: " + format.format(window.getStart()) + ", window end: " + format.format(window.getEnd()));}
}

基于事件驱动

场景:当我们想要每100个用户的购买行为作为驱动,那么每当窗口中填满了100个“相同”元素,就会对窗口进行计算。
编写一个启动类:

package icu.wzk;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import java.text.SimpleDateFormat;
import java.util.Random;public class TumblingWindow {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<String> dataStreamSource = env.getJavaEnv().socketTextStream("localhost", 9999);SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");long timeMillis = System.currentTimeMillis();int random = new Random().nextInt(10);System.out.println("value: " + value + ", random: " + random + ", time: " + format.format(timeMillis));return Tuple2.of(value, random);}});KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream.keyBy(new KeySelector<Tuple2<String, Integer>, Tuple>() {@Overridepublic Tuple getKey(Tuple2<String, Integer> value) throws Exception {return Tuple1.of(value.f0);}});// 基于时间驱动 每隔 10秒 划分一个窗口WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> globalWindow = keyedStream.countWindow(3);globalWindow.apply(new MyCountWindowFuntion());env.execute("TumblingWindow");}}

编写一个事件驱动的类:MyCountWindowFuntion

package icu.wzk;import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;import java.text.SimpleDateFormat;public class MyCountWindowFuntion implements WindowFunction<Tuple2<String, Integer>, String, Tuple, GlobalWindow> {/*** 场景:当我们想要每100个用户的购买行为作为驱动,那么每当窗口中填满了100个“相同”元素,就会对窗口进行计算。* @author wzk* @date 17:11 2024/7/26**/@Overridepublic void apply(Tuple tuple, GlobalWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");int sum = 0;for (Tuple2<String, Integer> tuple2 : input) {sum += tuple2.f1;}// 无用的时间戳:默认值是:Long.MAX_VALUE,在事件驱动下,基于计数的情况,不关心时间long maxTimestamp = window.maxTimestamp();out.collect("key:" + tuple.getField(0) + ", value: " + sum + ", maxTimestamp :"+ maxTimestamp + "," + format.format(maxTimestamp));}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/417439.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java利用JXL操作excel

通过JXL操作Excel JXL是韩国人所著,目前停止更新,只支持xls格式,即2007之前的版本 import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.net.URL; import java…

[数据集][目标检测]玉米病害检测数据集VOC+YOLO格式6000张4类别

数据集格式&#xff1a;Pascal VOC格式YOLO格式(不包含分割路径的txt文件&#xff0c;仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数)&#xff1a;6000 标注数量(xml文件个数)&#xff1a;6000 标注数量(txt文件个数)&#xff1a;6000 标注…

《Linux运维总结:基于X86_64+ARM64架构CPU使用docker-compose一键离线部署consul 1.18.1容器版分布式ACL集群》

总结&#xff1a;整理不易&#xff0c;如果对你有帮助&#xff0c;可否点赞关注一下&#xff1f; 更多详细内容请参考&#xff1a;《Linux运维篇&#xff1a;Linux系统运维指南》 一、部署背景 由于业务系统的特殊性&#xff0c;我们需要面向不通的客户安装我们的业务系统&…

[数据集][目标检测]街道乱放广告牌检测数据集VOC+YOLO格式114张1类别

数据集格式&#xff1a;Pascal VOC格式YOLO格式(不包含分割路径的txt文件&#xff0c;仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数)&#xff1a;114 标注数量(xml文件个数)&#xff1a;114 标注数量(txt文件个数)&#xff1a;114 标注类别…

【数据结构】顺序表和链表——顺序表(包含丰富算法题)

文章目录 1. 线性表2. 顺序表2.1 概念与结构2.2 分类2.2.1 静态顺序表2.2.2 动态顺序表 2.3 动态顺序表的实现2.4 顺序表算法题2.4.1 移除元素2.4.2 删除有序数组中的重复项2.4.3 合并两个有序数组 2.5 顺序表问题与思考 1. 线性表 线性表&#xff08;linear list&#xff09;…

数据分析:R语言计算XGBoost线性回归模型的SHAP值

禁止商业或二改转载,仅供自学使用,侵权必究,如需截取部分内容请后台联系作者! 文章目录 介绍SHAP用途计算方法:应用加载R包导入数据数据预处理函数模型介绍 SHAP(SHapley Additive exPlanations)值是一种解释机器学习模型预测的方法。它基于博弈论中的Shapley值概念,…

Vulnhub:hacksudo2

靶机下载地址 信息收集 主机发现 nmap 192.168.31.0/24 -Pn -T4 靶机ip&#xff1a;192.168.31.188 端口扫描 nmap 192.168.31.188 -A -p- -T4 开放端口有80,111,1337(ssh),2049(nfs)。 目录扫描 访问http服务。 点击图片进入游戏。玩了一下没看到什么信息。 目录扫描。…

地理信息科学在考古学中的应用:GIS与遥感技术的时空穿梭之旅

在历史的长河中&#xff0c;每一片土地都承载着文明的记忆。随着科技的进步&#xff0c;地理信息科学&#xff08;GIS&#xff09;与遥感技术正逐渐揭开古老秘密的面纱&#xff0c;让沉睡千年的历史遗迹重新焕发光彩。今天&#xff0c;就让我们踏上一场穿越时空的旅程&#xff…

(一)使用Visual Studio创建ASP.NET Core WebAPI项目

1.创建webAPI项目 选择ASP.NET Core Web API项目模版&#xff08;基于.Core框架可以支持多种系统环境&#xff0c;所以我们选择.Core框架&#xff09;&#xff0c;点下一步。 2.项目名称 项目名称设置为&#xff1a;CoreWebAPI&#xff0c;点下一步 3.选择框架 选择.NET6.0框…

人机融合智能中的计算不可约性

计算的不可约性 是计算理论和复杂性科学中的一个重要概念&#xff0c;主要由 计算机科学家 和 数学家 提出和研究。它指的是在某些系统或过程的模拟中&#xff0c;没有简化或有效的方式来预测其行为&#xff0c;而必须逐步进行每一步的计算来获得结果。 不可约性定义&#xff1…

结构开发笔记(七):solidworks软件(六):装配摄像头、摄像头座以及螺丝,完成摄像头结构示意图

若该文为原创文章&#xff0c;转载请注明原文出处 本文章博客地址&#xff1a;https://hpzwl.blog.csdn.net/article/details/141931518 长沙红胖子Qt&#xff08;长沙创微智科&#xff09;博文大全&#xff1a;开发技术集合&#xff08;包含Qt实用技术、树莓派、三维、OpenCV…

Java-IO:浅谈对NIO的认识

Java-IO&#xff1a;简述常见的IO模型 Java-IO&#xff1a;浅谈对IO的认识 NIO即New IO&#xff0c;这个库是在JDK1.4中才引入的。NIO和IO有相同的作用和目的&#xff0c;但实现方式不同&#xff0c;NIO 主要用到的是块&#xff0c;所以NIO的效率要比IO高很多。在Java API中提供…

LabVIEW如何自学成为专业开发者

自学成为LabVIEW专业开发者需要一个系统化的学习和实践过程&#xff0c;以下是一些关键步骤&#xff1a; 1. 扎实的基础学习 了解LabVIEW的基础概念&#xff1a;首先要熟悉LabVIEW的基本操作、数据流编程理念和图形化编程环境。可以通过LabVIEW的官方教程、Bilibili上的视频课程…

Github 2024-09-02 开源项目周报 Top13

根据Github Trendings的统计,本周(2024-09-02统计)共有13个项目上榜。根据开发语言中项目的数量,汇总情况如下: 开发语言项目数量Python项目3TypeScript项目3Vue项目2Rust项目2Go项目2Dart项目1Jupyter Notebook项目1Shell项目1Dockerfile项目1PHP项目1Blade项目1AI.AppFlow…

Matlab三维图的坐标轴标签 自动平行坐标/自动旋转

下载解压工具包&#xff1a; https://www.mathworks.com/matlabcentral/fileexchange/49542-phymhan-matlab-axis-label-alignment 添加至MATLAB路径: 在三维绘图后增加下列语句即可 ax struct(Axes, gca); align_axislabel([],ax) h3d rotate3d; set(h3d,ActionPreCa…

认识正则表达式

为什么要学习正则表达式 因为爬虫需要&#xff01;&#xff01;&#xff01; 一般来说爬虫需要四个主要步骤&#xff1a; 明确目标 (要知道你准备在哪个范围或者网站去搜索)爬 (将所有的网站的内容全部爬下来)取 (去掉对我们没用处的数据)处理数据&#xff08;按照我们想要的方…

在Centos中的mysql的备份与恢复

1.物理备份 冷备份&#xff1a;关闭数据库时进行热备份&#xff1a;数据库运行时进行&#xff0c;依赖于数据库日志文件温备份&#xff1a;数据库不可写入但可读的状态下进行 2.逻辑备份 对数据库的表或者对象进行备份 3.备份策略 完全备份&#xff1a;每次都备份完整的数…

使用C语言实现字符推箱子游戏

使用C语言实现字符推箱子游戏 推箱子&#xff08;Sokoban&#xff09;是一款经典的益智游戏&#xff0c;玩家通过移动角色将箱子推到目标位置。本文将带你一步步用C语言实现一个简单的字符版本的推箱子游戏。 游戏规则 玩家只能推箱子&#xff0c;不能拉箱子。只能将箱子推到…

Unity界面、组件以及脚本

Unity界面 菜单栏 菜单栏&#xff1a;位于屏幕顶部&#xff0c;包含文件、编辑、资产、游戏对象、组件、地形、动画、图形、AI、窗口、工具和帮助等菜单项。 工具栏 工具栏&#xff1a;位于菜单栏下方&#xff0c;提供了快速访问常用功能的按钮&#xff0c;如播放、暂停、停止…

OpenGL/GLUT实践:实现反弹运动的三角形动画与键盘控制(电子科技大学信软图形与动画Ⅱ实验)

源码见GitHub&#xff1a;A-UESTCer-s-Code 文章目录 1 运行效果2 实验过程2.1 环境配置2.2 绘制三角形2.2.1 渲染函数2.2.2 主函数2.2.3 运行结果 2.3 调整窗口大小2.4 简单动画与按键控制2.4.1 简单旋转2.4.2 键盘控制 2.5 窗口反弹动画2.5.1 处理窗口大小变化2.5.2 渲染函数…