flink学习(7)——window

 概述

窗口的长度(大小): 决定了要计算最近多长时间的数据

窗口的间隔: 决定了每隔多久计算一次

举例:每隔10min,计算最近24h的热搜词,24小时是长度,每隔10分钟是间隔。

窗口的分类

1、根据window前是否调用keyBy分为键控窗口和非键控窗口

2、根据window中参数的配置分为基于时间的,基于条数的,会话窗口

SlidingProcessingTimeWindows —— 滑动窗口,按照处理时间

TumblingProcessingTimeWindows —— 滚动窗口,按照处理时间

ProcessingTimeSessionWindows —— 会话窗口

 Keyed Window --键控窗口

// Keyed Window
stream.keyBy(...)              <-  按照一个Key进行分组.window(...)            <-  将数据流中的元素分配到相应的窗口中[.trigger(...)]            <-  指定触发器Trigger(可选)[.evictor(...)]            <-  指定清除器Evictor(可选).reduce/aggregate/process/apply()      <-  窗口处理函数Window Function

Non-Keyed Window

// Non-Keyed Window
stream.windowAll(...)         <-  不分组,将数据流中的所有元素分配到相应的窗口中[.trigger(...)]            <-  指定触发器Trigger(可选)[.evictor(...)]            <-  指定清除器Evictor(可选).reduce/aggregate/process()      <-  窗口处理函数Window Function

方括号([…]) 中的命令是可选的。

首先:我们要决定是否对一个DataStream按照Key进行分组,这一步必须在窗口计算之前进行。

经过keyBy的数据流将形成多组数据,下游算子的多个实例可以并行计算。

windowAll不对数据流进行分组,所有数据将发送到下游算子单个实例上。

决定是否分组之后,窗口的后续操作基本相同。

经过windowAll的算子是不分组的窗口(Non-Keyed Window),它们的原理和操作与Keyed Window类似,唯一的区别在于所有数据将发送给下游的单个实例,或者说下游算子的并行度为1

Flink窗口的骨架结构中有两个必须的两个操作:

  • 使用窗口分配器(WindowAssigner)将数据流中的元素分配到对应的窗口。

  • 当满足窗口触发条件后,对窗口内的数据使用窗口处理函数(Window Function)进行处理,常用的Window Function有reduce、aggregate、process。

其他的trigger、evictor则是窗口的触发和销毁过程中的附加选项,主要面向需要更多自定义的高级编程者,如果不设置则会使用默认的配置。

 

 

基于时间的窗口 

滚动窗口- TumblingWindow概念

package com.bigdata.day04;public class _01_windows {/*** 1、实时统计每个红绿灯通过的汽车数量* 2、实时统计每个红绿灯每个1分钟,统计最近1分钟通过的汽车数量 ——滚动* 3、实时统计每个红绿灯每个1分钟,统计最近2分钟通过的汽车数量 ——滑动*/public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSource<String> socketStream = env.socketTextStream("localhost", 8889);//3. transformation-数据处理转换socketStream.map(new MapFunction<String, Tuple2<Integer,Integer>>() {@Overridepublic Tuple2<Integer,Integer> map(String line) throws Exception {String[] words = line.split(" ");return new Tuple2<>(Integer.parseInt(words[0]),Integer.parseInt(words[1]));}}).keyBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() {@Overridepublic Integer getKey(Tuple2<Integer, Integer> value) throws Exception {return value.f0;}})// 基于这个部分实现 滚动窗口 每一分钟 统计前一分钟的数据.window(TumblingProcessingTimeWindows.of(Time.minutes(1))).sum(1).print();env.execute();}
}

滑动窗口– SlidingWindow概念 

package com.bigdata.day04;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.Window;/*** @基本功能:* @program:flinkProject* @author: jinnian* @create:2024-11-25 10:13:46**/
public class _01_windows {/*** 1、实时统计每个红绿灯通过的汽车数量* 2、实时统计每个红绿灯每个1分钟,统计最近1分钟通过的汽车数量 ——滚动* 3、实时统计每个红绿灯每个1分钟,统计最近2分钟通过的汽车数量 ——滑动*/public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSource<String> socketStream = env.socketTextStream("localhost", 8889);//3. transformation-数据处理转换socketStream.map(new MapFunction<String, Tuple2<Integer,Integer>>() {@Overridepublic Tuple2<Integer,Integer> map(String line) throws Exception {String[] words = line.split(" ");return new Tuple2<>(Integer.parseInt(words[0]),Integer.parseInt(words[1]));}}).keyBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() {@Overridepublic Integer getKey(Tuple2<Integer, Integer> value) throws Exception {return value.f0;}})// 基于这一部分实现,每30秒统计前一分钟的数据,大的在前,小的在后.window(SlidingProcessingTimeWindows.of(Time.minutes(1),Time.seconds(30))).sum(1).print();//5. execute-执行env.execute();}
}
如何显示窗口时间——apply

——apply将reduce替代

kafka生产数据
package com.bigdata.day04;public class _02_kafka生产数据 {public static void main(String[] args) throws InterruptedException {// Properties 它是map的一种Properties properties = new Properties();// 设置连接kafka集群的ip和端口properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(properties);String[] arr = {"联通换猫","遥遥领先","恒大歌舞团","恒大足球队","郑州烂尾楼"};Random random = new Random();for (int i = 0; i < 5000; i++) {int index = random.nextInt(arr.length);ProducerRecord<String, String> record = new ProducerRecord<>("edu", arr[index]);producer.send(record);Thread.sleep(30);}}
}
flink消费数据
package com.bigdata.day04;public class _02_flink消费数据 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);Properties properties = new Properties();properties.setProperty("bootstrap.servers", "bigdata01:9092");properties.setProperty("group.id", "gw2");FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>("edu",new SimpleStringSchema(),properties);DataStreamSource<String> source = env.addSource(consumer);source.map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String,Integer> map(String value) throws Exception {return Tuple2.of(value,1);}}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> value) throws Exception {return value.f0;}}).window(SlidingProcessingTimeWindows.of(Time.minutes(1),Time.seconds(30)))//.window(TumblingProcessingTimeWindows.of(Time.seconds(30)))/****/.apply(new WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {@Overridepublic void apply(String key, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {StringBuilder sb = new StringBuilder();long start = window.getStart();long end = window.getEnd();String startStr = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss");String endStr = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss");int sum = 0;for (Tuple2<String, Integer> tuple2 : input) {sum +=tuple2.f1;}sb.append("开始时间:"+startStr+",").append("结束时间:"+endStr+",").append("key: "+key+ ",").append("数量:"+sum);out.collect(sb.toString());}}).print();env.execute();}
}

基于条数的窗口——countWindow

package com.bigdata.day04;public class _04_agg函数 {public static final Tuple3[] ENGLISH = new Tuple3[] {Tuple3.of("class1", "张三", 100L),Tuple3.of("class1", "李四", 40L),Tuple3.of("class1", "王五", 60L),Tuple3.of("class2", "赵六", 20L),Tuple3.of("class2", "小七", 30L),Tuple3.of("class2", "小八", 50L),};public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSource<Tuple3<String,String,Long>> dataStreamSource = env.fromElements(ENGLISH);// 此时我要获取每个班级的平均成绩// 输入数据的类型(IN)、累加器的类型(ACC)和输出数据的类型(OUT)// IN——Tuple3<String, String, Long>// ACC——Tuple3<String, Integer,Long> 第一个是班级(key)第二个是数量,第三个是总的成绩// OUT —— Tuple2<String,Double> 第一个是班级 第二个是平均成绩dataStreamSource.countWindowAll(3).aggregate(new AggregateFunction<Tuple3<String, String, Long>, Tuple3<String, Integer,Long>, Tuple2<String,Double>>() {// 初始化一个 累加器@Overridepublic Tuple3<String, Integer, Long> createAccumulator() {return Tuple3.of(null,0,0L);}// 累加器和输入的值进行累加// Tuple3<String, String, Long> value 第一个是传入的值// Tuple3<String, Integer, Long> accumulator 第二个是累加器的值@Overridepublic Tuple3<String, Integer, Long> add(Tuple3<String, String, Long> value, Tuple3<String, Integer, Long> accumulator) {return Tuple3.of(value.f0,accumulator.f1+1,accumulator.f2+value.f2);}// 获取结果——在不同节点的结果进行汇总后实现@Overridepublic Tuple2<String, Double> getResult(Tuple3<String, Integer, Long> accumulator) {return Tuple2.of(accumulator.f0, (double) accumulator.f2 / accumulator.f1);}// 由于flink是分布式,所以在别的节点也会进行累加 ,该方法是不同节点的结果进行汇总// 即累加器之间的累加@Overridepublic Tuple3<String, Integer, Long> merge(Tuple3<String, Integer, Long> a, Tuple3<String, Integer, Long> b) {return Tuple3.of(a.f0,a.f1+b.f1,a.f2+b.f2);}}).print();//4. sink-数据输出//5. execute-执行env.execute();}
}

会话窗口

package com.bigdata.day04;public class _03_会话窗口 {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);DataStreamSource<String> source = env.socketTextStream("localhost", 8889);source.map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {String[] s = value.split(" ");return Tuple2.of(s[0],Integer.valueOf(s[1]));}}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> value) throws Exception {return value.f0;}// 1、主要就是 ProcessingTimeSessionWindows 参数的使用// 2、使用 EventTimeSessionWindows的时候,若没有水印就不会有结果}).window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))).reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {return Tuple2.of(value1.f0,value1.f1+value2.f1);}}).print();env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/479177.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【8210A-TX2】Ubuntu18.04 + ROS_ Melodic + TM-16多线激光 雷达评测

简介&#xff1a;介绍 TM-16多线激光雷达 在8210A载板&#xff0c;TX2核心模块环境&#xff08;Ubuntu18.04&#xff09;下测试ROS驱动&#xff0c;打开使用RVIZ 查看点云数据&#xff0c;本文的前提条件是你的TX2里已经安装了ROS版本&#xff1a;Melodic。 大家好&#xff0c;…

【排版教程】Word、WPS 分节符(奇数页等) 自动变成 分节符(下一页) 解决办法

毕业设计排版时&#xff0c;一般要求每章节的起始页为奇数页&#xff0c;空白页不显示页眉和页脚。具体做法如下&#xff1a; 1 Word 在一个章节的内容完成后&#xff0c;在【布局】中&#xff0c;点击【分隔符】&#xff0c;然后选择【奇数页】 这样在下一章节开始的时&…

【GAMES101笔记速查——Lecture 20 Color and Perception】

颜色与感知 目录 1 光场&#xff08;Light Field / Lumigraph&#xff09; 1.1 全光函数 1.1.1 改进&#xff1a;引入波长 1.1.2 改进&#xff1a;添加时间t 1.1.3 改进&#xff1a;人可以移动&#xff0c;添加空间坐标 1.1.4 改进&#xff1a;不把函数当电影来看。 1.…

HTML5和CSS3新增特性

HTML5的新特性 HTML5新增的语义化标签 HTML5 的新增特性主要是针对于以前的不足&#xff0c;增加了一些新的标签、新的表单和新的表单属性等。 这些新特性都有兼容性问题&#xff0c;基本是 IE9 以上版本的浏览器才支持&#xff0c;如果不考虑兼容性问题&#xff0c;可以大量…

ArcGIS+deck.gl矢量切片三维化表示建筑白模

01 背景介绍 很多ArcGIS API for JavaScript的用户想要ArcGIS的矢量切片技术体系实现Mapbox gl将城市建筑物footprint矢量切片三维化成建筑白模的效果。效果如图&#xff1a;截图来自mapbox studio1但目前仅靠ArcGIS VectorTileServer 和 ArcGIS API for JavaScript本身无法达…

Windows下安装FreeSurfer教程

简介 FreeSurfer 是一个开源软件包&#xff0c;用于分析和可视化横断面和纵向研究的结构、功能和扩散神经成像数据。它由Athinoula A. Martinos 生物医学成像中心的计算神经成像实验室开发。 官网 功能 FreeSurfer 为结构 MRI 数据提供完整的处理流&#xff0c;包括&#xf…

RTMP协议

背景介绍 RTMP&#xff08;Real Time Messaging Protocol&#xff09; 是由 Adobe 公司基于 Flash Player 播放器对应的音视频 flv 封装格式提出的一种&#xff0c;基于TCP 的数据传输协议。本身具有稳定、兼容性强、高穿透的特点。常被应用于流媒体直播、点播等场景。常用于推…

计算机网络----基本概念

基本概念 在这一章从整体上介绍计算机网络的概况, 为后续的学习搭建起整体的框架; 介绍计算机网络中的基础术语和概念; 什么是因特网 『 因特网 』是一个世界范围内互联了数以亿计的计算设备的计算机网络; 因特网具体构成 因特网互联了数以亿计的计算设备, 这些设备被称为…

CKA认证 | Day4 K8s管理应用生命周期(下)

第四章 K8s管理应用程序生命周期&#xff08;下&#xff09; 1、Pod对象 1.1 Pod 的基本概念 Pod 是 Kubernetes 中最基本和最重要的概念之一&#xff0c;是一个逻辑抽象概念&#xff0c;Kubernetes创建和管理的最小单元&#xff0c; 一个Pod由一个容器或多个容器组成。它简…

【微服务】Nacos

一、安装 1、官网地址&#xff1a;https://nacos.io/download/nacos-server/ 2、启动&#xff1a;找到bin目录下的startup.cmd双击启动&#xff0c;或者打开一个命令窗口输入&#xff1a; startup.cmd -m standalone双击启动后如下&#xff1a;可以访问控制台地址 访问后的…

学习笔记032——Spring学习笔记

文章目录 一、Spring开发步骤二、Spring配置文件1、Bean标签基本配置2、Bean标签范围配置3、Bean生命周期配置4、Bean实例化三种方式5、Bean的依赖注入概念6、Bean的依赖注入方式【第一种&#xff1a;set方法注入】【第二种&#xff1a;构造方法注入】 7、Bean的依赖注入的数据…

某科技研发公司培训开发体系设计项目成功案例纪实

某科技研发公司培训开发体系设计项目成功案例纪实 ——建立分层分类的培训体系&#xff0c;加强培训跟踪考核&#xff0c;促进培训成果实现 【客户行业】科技研发行业 【问题类型】培训开发体系 【客户背景】 某智能科技研发公司是一家专注于智能科技、计算机软件技术开发与…

Elasticsearch:Retrievers 介绍

检索器&#xff08;retrievers&#xff09;是 Elasticsearch 中搜索 API 中添加的新抽象层。它们提供了在单个 _search API 调用中配置多阶段检索管道的便利。此架构通过消除对复杂搜索查询的多个 Elasticsearch API 调用的需求&#xff0c;简化了应用程序中的搜索逻辑。它还减…

Python学习34天

import random class Game: peo0 rob0 # # def __init__(self,peo,rob): # self.peopeo # self.robrob def Play(self): """ 石头剪刀布游戏&#xff0c;0代表石头&#xff0c;1代见到&#xff0c;2代表石头 …

hive的存储格式

1&#xff09; 四种存储格式 hive的存储格式分为两大类&#xff1a;一类纯文本文件&#xff0c;一类是二进制文件存储。 Hive支持的存储数据的格式主要有&#xff1a;TEXTFILE、SEQUENCEFILE、ORC、PARQUET 第一类&#xff1a;纯文本文件存储 textfile: 纯文本文件存储格式…

solr 远程命令执行 (CVE-2019-17558)

目录 漏洞描述 执行漏洞py脚本&#xff0c;取得shell连接 EXP 漏洞描述 Apache Velocity是一个基于Java的模板引擎&#xff0c;它提供了一个模板语言去引用由Java代码定义的对象。Velocity是Apache基金会旗下的一个开源软件项目&#xff0c;旨在确保Web应用程序在表示层和业…

uname -m(machine) 命令用于显示当前系统的机器硬件架构(Unix Name)

文章目录 关于 arm64 架构检查是否安装了 Rosetta 2其他相关信息解释&#xff1a;命令功能&#xff1a;示例&#xff1a; dgqdgqdeMac-mini / % uname -m arm64您运行的 uname -m 命令显示您的系统架构是 arm64。这意味着您的 Mac Mini 使用的是 Apple 的 M1 或更新的芯片&…

实现在两台宿主机下的docker container 中实现多机器通讯

基于我的实验背景 上位机&#xff1a;ubuntu 20.04 (docker humble 22.04) 下位机&#xff1a;ubuntu 22.04&#xff08;docker noetic 20.04&#xff09; 目标&#xff1a;实现在上位机中的docker container 容器的22.04环境去成功远程访问 非同网段的下位机的20.04的contai…

(计算机组成原理)期末复习

第一章 计算机的基本组成&#xff1a;硬件软件&#xff08;程序&#xff09;计算机系统 软件有系统软件&#xff08;系统管理工具&#xff09;&#xff0c;应用软件 计算机硬件&#xff1a;包括主机和外设&#xff0c;主机包括CPU和内存&#xff0c;***CPU由运算器和控制器所组…

QML TableView 实例演示 + 可能遇到的一些问题(Qt_6_5_3)

一、可能遇到的一些问题 Q1&#xff1a;如何禁用拖动&#xff1f; 在TableView下加一句代码即可&#xff1a; interactive: false 补充&#xff1a;这个属性并不专属于TableView&#xff0c;而是一个通用属性。很多Controls下的控件都可以使用&#xff0c;其主要作用就是控…