Flink窗口分配器WindowAssigner

前言

Flink 数据流经过 keyBy 分组后,下一步就是 WindowAssigner。

WindowAssigner 定义了 stream 中的元素如何被分发到各个窗口,元素可以被分发到一个或多个窗口中,Flink 内置了常用的窗口分配器,包括:tumbling windows、 sliding windows、 session windows 和 global windows。除了 global windows ,其它分配器都是基于时间来分发数据的。

当然,你也可以继承 WindowAssigner 抽象类实现自定义的窗口分配逻辑。

WindowAssigner

先看一下 WindowAssigner 抽象类的定义:

@PublicEvolving
public abstract class WindowAssigner<T, W extends Window> implements Serializable {private static final long serialVersionUID = 1L;public WindowAssigner() {}public abstract Collection<W> assignWindows(T var1, long var2, WindowAssignerContext var4);public Trigger<T, W> getDefaultTrigger() {return this.getDefaultTrigger(new StreamExecutionEnvironment());}/** @deprecated */@Deprecatedpublic abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment var1);public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig var1);public abstract boolean isEventTime();@PublicEvolvingpublic abstract static class WindowAssignerContext {public WindowAssignerContext() {}public abstract long getCurrentProcessingTime();}
}

四个方法,作用如下:

  • assignWindows 将元素 element 分发到一个或多个窗口,返回值是窗口集合
  • getDefaultTrigger 返回默认的窗口触发器 Trigger
  • getWindowSerializer 返回窗口序列化器(窗口也要在算子间传输)
  • isEventTime 是否基于事件时间语义

Flink 内置的 WindowAssigner 实现类关系图如下:

首先,可以按照基于何种时间语义划分出三大类:

  • 基于事件时间语义
  • 基于处理时间语义
  • 不基于时间语义 --> GlobalWindows

在基于时间语义的大类下面,又可以按照时间窗口算法划分为三个具体实现:

  • 滚动窗口分配算法 tumbling windows
  • 滑动窗口分配算法 sliding windows
  • 会话窗口分配算法 session windows

定义窗口Window

窗口对象被 Flink 统一封装为抽象类org.apache.flink.streaming.api.windowing.windows.Window,Flink 内置了两种实现,分别是:

  • TimeWindow 基于时间范围的窗口,包含开始时间戳和结束时间戳
  • GlobalWindow 全局窗口,与时间无关的窗口

如果内置的这两种窗口无法满足你的需求,你也可以自定义窗口。需要注意的是,窗口本身是要在算子间传输的,所以你在自定义窗口的同时,还必须提供一个窗口序列化器,以便于 Flink 可以将你的窗口对象序列化传输。

如下示例,我们定义了一个基于数字范围的 NumberWindow,可以将一个数字划分到对应的数字范围窗口内。

public class NumberWindow extends Window {private final int min;private final int max;public NumberWindow(int min, int max) {this.min = min;this.max = max;}public int getMin() {return min;}public int getMax() {return max;}@Overridepublic boolean equals(Object o) {if (this == o) return true;if (o == null || getClass() != o.getClass()) return false;NumberWindow that = (NumberWindow) o;return min == that.min && max == that.max;}@Overridepublic int hashCode() {return Objects.hash(min, max);}@Overridepublic long maxTimestamp() {return Long.MAX_VALUE;}
}

Window 实现还必须配套一个序列化器,主要是实现 两个int变量到窗口对象的转换。

public static class Serializer extends TypeSerializerSingleton<NumberWindow> {@Overridepublic boolean isImmutableType() {return true;}@Overridepublic NumberWindow createInstance() {return new NumberWindow(0, 0);}@Overridepublic NumberWindow copy(NumberWindow numberWindow) {return numberWindow;}@Overridepublic NumberWindow copy(NumberWindow numberWindow, NumberWindow t1) {return numberWindow;}@Overridepublic int getLength() {return 8;}@Overridepublic void serialize(NumberWindow numberWindow, DataOutputView dataOutputView) throws IOException {dataOutputView.writeInt(numberWindow.getMin());dataOutputView.writeInt(numberWindow.getMax());}@Overridepublic NumberWindow deserialize(DataInputView dataInputView) throws IOException {return new NumberWindow(dataInputView.readInt(), dataInputView.readInt());}@Overridepublic NumberWindow deserialize(NumberWindow numberWindow, DataInputView dataInputView) throws IOException {return this.deserialize(dataInputView);}@Overridepublic void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException {dataOutputView.writeInt(dataInputView.readInt());dataOutputView.writeInt(dataInputView.readInt());}@Overridepublic TypeSerializerSnapshot<NumberWindow> snapshotConfiguration() {return new TimeWindowSerializerSnapshot();}public static final class TimeWindowSerializerSnapshot extends SimpleTypeSerializerSnapshot<NumberWindow> {public TimeWindowSerializerSnapshot() {super(Serializer::new);}}
}

自定义WindowAssigner

窗口对象定义好了,接下来就是定义窗口分配对象。

简单原则,我们把数字划分为三个窗口,分别是:小数窗口、中位数窗口、大数窗口。
如下示例,继承 WindowAssigner 类,重写 assignWindows 方法,把数字划分到对应的窗口中。

public static class MyWindowAssigner extends WindowAssigner<Integer, NumberWindow> {private final int startingMedian;private final int startingLarge;public MyWindowAssigner(int startingMedian, int startingLarge) {this.startingMedian = startingMedian;this.startingLarge = startingLarge;}@Overridepublic Collection<NumberWindow> assignWindows(Integer element, long timestamp, WindowAssignerContext windowAssignerContext) {// 将数字划分到 小数、中位数、大数 窗口NumberWindow window;if (element < startingMedian) {window = new NumberWindow(Integer.MIN_VALUE, startingMedian - 1);} else if (element < startingLarge) {window = new NumberWindow(startingMedian, startingLarge - 1);} else {window = new NumberWindow(startingLarge, Integer.MAX_VALUE);}return List.of(window);}@Overridepublic Trigger<Integer, NumberWindow> getDefaultTrigger(StreamExecutionEnvironment streamExecutionEnvironment) {return null;}@Overridepublic TypeSerializer<NumberWindow> getWindowSerializer(ExecutionConfig executionConfig) {return new NumberWindow.Serializer();}@Overridepublic boolean isEventTime() {return false;}
}

把流程串起来

窗口对象和窗口分配的逻辑都有了,接下来就是把整个流程给串起来。

如下示例程序,我们定义了一个一秒内生成10个一百以内随机数的数据源Source,然后将这些数字流分为一组,并为其指定我们自定义的 MyWindowAssigner 窗口分配策略,策略中划分了三个窗口,数字小于20的归为小数一档、20到80的归为中位数一档、大于80的归为大数一档,根本数字分配对应的窗口。然后我们自定义了 Trigger,当窗口内积攒的数字达到十个,就触发窗口计算并关闭窗口。最终 ProcessWindowFunction 打印窗口内的数字并求和。

public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.addSource(new SourceFunction<Integer>() {@Overridepublic void run(SourceContext<Integer> sourceContext) throws Exception {while (true) {Threads.sleep(100);sourceContext.collect(ThreadLocalRandom.current().nextInt(100));}}@Overridepublic void cancel() {}}).keyBy(i -> "all").window(new MyWindowAssigner(20, 80)).trigger(new Trigger<Integer, NumberWindow>() {@Overridepublic TriggerResult onElement(Integer element, long timestamp, NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {ValueState<Integer> countState = triggerContext.getPartitionedState(new ValueStateDescriptor<>("count", Integer.class));Integer count = Optional.ofNullable(countState.value()).orElse(0) + 1;if (count < 10) {countState.update(count);return TriggerResult.CONTINUE;}countState.update(0);return TriggerResult.FIRE_AND_PURGE;}@Overridepublic TriggerResult onProcessingTime(long timestamp, NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {return null;}@Overridepublic TriggerResult onEventTime(long timestamp, NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {return null;}@Overridepublic void clear(NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {}}).process(new ProcessWindowFunction<Integer, Object, String, NumberWindow>() {@Overridepublic void process(String key, ProcessWindowFunction<Integer, Object, String, NumberWindow>.Context context, Iterable<Integer> iterable, Collector<Object> collector) throws Exception {StringBuilder builder = new StringBuilder("[" + context.window().getMin() + " - " + context.window().getMax() + "] [");int sum = 0;for (Integer value : iterable) {builder.append(value + ",");sum += value;}builder.append("] sum=" + sum);System.err.println(builder.toString());}});environment.execute();
}

运行 Flink 作业,控制台输出:

[20 - 79] [30,32,24,66,63,37,] sum=252
[20 - 79] [71,48,41,55,75,79,] sum=369
[80 - 2147483647] [99,90,88,98,85,99,] sum=559
[20 - 79] [74,30,56,70,36,78,] sum=344

尾巴

Flink 的 WindowAssigner 在数据处理中发挥着关键作用。它决定了如何将源源不断的数据流切分成不同的窗口,以便进行有针对性的聚合、计算和分析。
通过合理配置 WindowAssigner,我们能够根据时间、数量或自定义的逻辑来划分数据,灵活地适应各种业务场景。这使得 Flink 能够对海量的实时数据进行高效且精准的处理,帮助我们从数据中提取有价值的信息和洞察。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/453833.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【C++篇】栈的层叠与队列的流动:在 STL 的节奏中聆听算法的静谧旋律

文章目录 C 栈与队列详解&#xff1a;基础与进阶应用前言第一章&#xff1a;栈的介绍与使用1.1 栈的介绍1.2 栈的使用1.2.1 最小栈1.2.2 示例与输出 1.3 栈的模拟实现 第二章&#xff1a;队列的介绍与使用2.1 队列的介绍2.2 队列的使用2.2.1 示例与输出 2.3 队列的模拟实现2.3.…

【linux】线程(二)

10. pthread_t 类型 注意&#xff1a; 每一个线程的库级别的tcb的起始地址&#xff0c;就是线程的 tid每一个线程都有自己独立的栈结构线程和线程之间&#xff0c;也是可以被其他线程看到并访问的&#xff08;比如全局函数&#xff09; 代码 如果想要进程拥有私人的全局变量(即…

拥抱“新市民” ,数字银行的“谋与变”

【潮汐商业评论/原创】 数字银行&#xff0c;既是金融行业的创新物种&#xff0c;其在发展的过程中也彰显着普惠金融的基因。 “我劝你买点银行理财吧&#xff0c;选一家靠谱的银行就是最靠谱的理财方式了&#xff0c;踏踏实实地把钱存银行里面不会有问题的”&#xff0c;周日…

SpringBoot篇(二、制作SpringBoot程序)

目录 一、代码位置 二、四种方式 1. IDEA联网版 2. 官网 3. 阿里云 4. 手动 五、在IDEA中隐藏指定文件/文件夹 六、复制工程-快速操作 七、更改引导类别名 一、代码位置 二、四种方式 1. IDEA联网版 2. 官网 官网制作&#xff1a;Spring Boot 3. 阿里云 阿里云版制…

react18中的计算属性及useMemo的性能优化技巧

react18里面的计算属性和使用useMemo来提升组件性能的方法 计算属性 实现效果 代码实现 函数式组件极简洁的实现&#xff0c;就这样 import { useState } from "react"; function FullName() {const [firstName, setFirstName] useState("");const [la…

AlDente Pro for Mac电脑 充电限制保护工具 安装教程【简单,轻松上手】

Mac分享吧 文章目录 AlDente Pro for Mac 充电限制保护工具 安装完成&#xff0c;软件打开效果一、AlDente Pro for Mac 充电限制保护工具 Mac电脑版——v1.28.41️⃣&#xff1a;下载软件2️⃣&#xff1a;安装软件&#xff0c;将安装包从左侧拖入右侧文件夹中&#xff0c;等…

c++初阶--string类(使用)

大家好&#xff0c;许久不见&#xff0c;今天我们来学习c中的string类&#xff0c;在这一部分&#xff0c;我们首先应该学习一下string类的用法&#xff0c;然后再试着自己去实现一下string类。 在这里&#xff0c;我使用的是这个网站来查找的string类&#xff0c;这里面的内容…

Web,RESTful API 在微服务中的作用是什么?

大家好&#xff0c;我是锋哥。今天分享关于【Web&#xff0c;RESTful API 在微服务中的作用是什么&#xff1f;】面试题&#xff1f;希望对大家有帮助&#xff1b; Web&#xff0c;RESTful API 在微服务中的作用是什么&#xff1f; 在微服务架构中&#xff0c;Web 和 RESTful …

react18中如何实现同步的setState来实现所见即所得的效果

在react项目中&#xff0c;实现添加列表项&#xff0c;最后一项自动显示在可视区域范围&#xff01;&#xff01; 实现效果 代码实现 import { useState, useRef } from "react"; import { flushSync } from "react-dom"; function FlushSyncRef() {con…

基于SSM网络在线考试系统的设计

管理员账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;学生管理&#xff0c;在线考试管理&#xff0c;试题管理&#xff0c;考试管理&#xff0c;系统管理 前台账号功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;在线考试&#xff0c;公告信…

word删除空白页 | 亲测有效

想要删掉word里面的末尾空白页&#xff0c;但是按了delete之后也没有用 找了很久找到了以下亲测有效的方法 1. 通过鼠标右键在要删除的空白页面处显示段落标记 2. 在字号输入01&#xff0c;按ENTER&#xff08;回车键&#xff09; 3.成功删除了&#xff01;&#xff01;

ECharts饼图-饼图34,附视频讲解与代码下载

引言&#xff1a; 在数据可视化的世界里&#xff0c;ECharts凭借其丰富的图表类型和强大的配置能力&#xff0c;成为了众多开发者的首选。今天&#xff0c;我将带大家一起实现一个饼图图表&#xff0c;通过该图表我们可以直观地展示和分析数据。此外&#xff0c;我还将提供详…

模型实战(27)之 YOLO11 推理、验证及训练自己的数据集

模型实战(27)之 YOLO11推理、验证及训练自己的数据集 2024年10.17,YOLO11是近期十月份刚经ultralytics团队更新优化发布的视觉算法深度学习网络模型,其网络模型结构代码实现也采用了比较新的Python数据结构,所以虚拟环境搭建安装包也比较新,经过多次踩坑,把关键环节记录…

电子便签:从偶像剧到职场的实用转变

在快节奏的现代生活中&#xff0c;便签已经成为了我们不可或缺的助手&#xff0c;无论是纸质的还是电子的&#xff0c;它们都以小巧的“身躯”承载着我们的日常记忆和待办事项。从偶像剧中常见的“便利贴”女孩形象&#xff0c;到如今电子便签的普及&#xff0c;它们帮助我们捕…

百度搜索竞价推广有必要做吗?怎么做效果好!

百度竞价推广&#xff0c;有的行业适合&#xff0c;有的行业则不行&#xff0c;下面我给大家分享下哪些行业可以。 如果是招商加盟、招代理商、招经销商&#xff0c;或者是高客单价咨询服务费&#xff0c;甚至是找合作方、渠道方的企业主都可以投放竞价推广。 总之一句话&…

网络安全的五大误区,你中招了吗?

在数字化时代&#xff0c;网络安全问题日益突出&#xff0c;许多人在使用网络过程中存在一些误区&#xff0c;导致个人信息泄露、财产损失等问题。本文将为您揭示网络安全的五大误区&#xff0c;帮助您提高安全防范意识。 误区一&#xff1a;使用复杂密码就一定安全 许多人认为…

基于SpringBoot+Vue+uniapp微信小程序的垃圾分类系统的详细设计和实现(源码+lw+部署文档+讲解等)

项目运行截图 技术框架 后端采用SpringBoot框架 Spring Boot 是一个用于快速开发基于 Spring 框架的应用程序的开源框架。它采用约定大于配置的理念&#xff0c;提供了一套默认的配置&#xff0c;让开发者可以更专注于业务逻辑而不是配置文件。Spring Boot 通过自动化配置和约…

万户ezEIP企业管理系统 productlist.aspx SQL注入漏洞复现

0x01 产品简介 万户ezEIP是一种企业资源规划软件,旨在帮助企业管理其各个方面的业务流程。它提供了一套集成的解决方案,涵盖了财务、供应链管理、销售和市场营销、人力资源等各个领域。 0x02 漏洞概述 万户ezEIP企业管理系统 productlist.aspx 接口存在SQL注入漏洞,未经身…

11_原始值的响应式方案-ref

目录 引入 ref解决响应丢失的问题自动脱 ref 引入 ref 在之前实现的 reactive 方法&#xff0c;其代理的目标必须是一个非原始值才行&#xff0c;例如&#xff1a; let str vue // 无法拦截 str 的修改 str vue3上述这个例子表达的意思就是&#xff0c;我们还缺少一个能够对…

ZYNQ:流水灯实验

实验目的 PL_LED0 和 PL_LED1 连接到 ZYNQ 的 PL 端&#xff0c;PL_LED0 和 PL_LED1循环往复产生流水灯的效果&#xff0c;流水间隔时间为 0.5s。 原理图 程序设计 本次实验是需要实现两个LED的循环熄灭点亮&#xff0c;时间间隔是0.5S,对时间间隔的控制使用计数器来完成。本…