聊聊Flink:这次把Flink的触发器(Trigger)、移除器(Evictor)讲透

一、触发器(Trigger)

Trigger 决定了一个窗口(由 window assigner 定义)何时可以被 window function 处理。 每个 WindowAssigner 都有一个默认的 Trigger。 如果默认 trigger 无法满足你的需要,你可以在 trigger(…) 调用中指定自定义的 trigger。

1.1 Flink中预置的Trigger

窗口的计算触发依赖于窗口触发器,每种类型的窗口都有对应的窗口触发机制,都有一个默认的窗口触发器,触发器的作用就是去控制什么时候来触发计算。flink内部定义多种触发器,每种触发器对应于不同的WindowAssigner。常见的触发器如下:

  • EventTimeTrigger:通过对比EventTime和窗口的Endtime确定是否触发窗口计算,如果EventTime大于Window EndTime则触发,否则不触发,窗口将继续等待。
  • ProcessTimeTrigger:通过对比ProcessTime和窗口EndTme确定是否触发窗口,如果ProcessTime大于EndTime则触发计算,否则窗口继续等待。
  • ProcessingTimeoutTrigger:可以将任何触发器转变为超时触发器。
  • ContinuousEventTimeTrigger:根据间隔时间周期性触发窗口或者Window的结束时间小于当前EndTime触发窗口计算。
  • ContinuousProcessingTimeTrigger:根据间隔时间周期性触发窗口或者Window的结束时间小于当前ProcessTime触发窗口计算。
  • CountTrigger:根据接入数据量是否超过设定的阙值判断是否触发窗口计算。
  • DeltaTrigger:根据接入数据计算出来的Delta指标是否超过指定的Threshold去判断是否触发窗口计算。
  • PurgingTrigger:可以将任意触发器作为参数转换为Purge类型的触发器,计算完成后数据将被清理。
  • NeverTrigger:任何时候都不触发窗口计算

1.2 Trigger的抽象类

Trigger 接口提供了五个方法来响应不同的事件:

  • onElement() 方法在每个元素被加入窗口时调用。
  • onEventTime() 方法在注册的 event-time timer 触发时调用。
  • onProcessingTime() 方法在注册的 processing-time timer 触发时调用。
  • canMerge() 方法判断是否可以合并。
  • onMerge() 方法与有状态的 trigger 相关。该方法会在两个窗口合并时, 将窗口对应 trigger 的状态进行合并,比如使用会话窗口时。
  • clear() 方法处理在对应窗口被移除时所需的逻辑。

触发器接口的源码如下:

@PublicEvolving
public abstract class Trigger<T, W extends Window> implements Serializable {private static final long serialVersionUID = -4104633972991191369L;/*** Called for every element that gets added to a pane. The result of this will determine whether* the pane is evaluated to emit results.** @param element The element that arrived.* @param timestamp The timestamp of the element that arrived.* @param window The window to which the element is being added.* @param ctx A context object that can be used to register timer callbacks.*/public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx)throws Exception;/*** Called when a processing-time timer that was set using the trigger context fires.** @param time The timestamp at which the timer fired.* @param window The window for which the timer fired.* @param ctx A context object that can be used to register timer callbacks.*/public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)throws Exception;/*** Called when an event-time timer that was set using the trigger context fires.** @param time The timestamp at which the timer fired.* @param window The window for which the timer fired.* @param ctx A context object that can be used to register timer callbacks.*/public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx)throws Exception;/*** Returns true if this trigger supports merging of trigger state and can therefore be used with* a {@link org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner}.** <p>If this returns {@code true} you must properly implement {@link #onMerge(Window,* OnMergeContext)}*/public boolean canMerge() {return false;}/*** Called when several windows have been merged into one window by the {@link* org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}.** @param window The new window that results from the merge.* @param ctx A context object that can be used to register timer callbacks and access state.*/public void onMerge(W window, OnMergeContext ctx) throws Exception {throw new UnsupportedOperationException("This trigger does not support merging.");}/*** Clears any state that the trigger might still hold for the given window. This is called when* a window is purged. Timers set using {@link TriggerContext#registerEventTimeTimer(long)} and* {@link TriggerContext#registerProcessingTimeTimer(long)} should be deleted here as well as* state acquired using {@link TriggerContext#getPartitionedState(StateDescriptor)}.*/public abstract void clear(W window, TriggerContext ctx) throws Exception;// ------------------------------------------------------------------------/*** A context object that is given to {@link Trigger} methods to allow them to register timer* callbacks and deal with state.*/public interface TriggerContext {// ...}/*** Extension of {@link TriggerContext} that is given to {@link Trigger#onMerge(Window,* OnMergeContext)}.*/public interface OnMergeContext extends TriggerContext {<S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor);}
}

关于上述方法,需要注意三件事:

(1)前三个方法返回TriggerResult枚举类型,其包含四个枚举值:

  • CONTINUE:表示对窗口不执行任何操作。即不触发窗口计算,也不删除元素。
  • FIRE:触发窗口计算,但是保留窗口元素。
  • PURGE:不触发窗口计算,丢弃窗口,并且删除窗口的元素。
  • FIRE_AND_PURGE:触发窗口计算,输出结果,然后将窗口中的数据和窗口进行清除。

源码如下:

public enum TriggerResult {// 不触发,也不删除元素CONTINUE(false, false),// 触发窗口,窗口出发后删除窗口中的元素FIRE_AND_PURGE(true, true),// 触发窗口,但是保留窗口元素FIRE(true, false),// 不触发窗口,丢弃窗口,并且删除窗口的元素PURGE(false, true);// ------------------------------------------------------------------------private final boolean fire;private final boolean purge;TriggerResult(boolean fire, boolean purge) {this.purge = purge;this.fire = fire;}public boolean isFire() {return fire;}public boolean isPurge() {return purge;}
}

(2) 每一个窗口分配器都拥有一个属于自己的 Trigger,Trigger上会有定时器,用来决定一个窗口何时能够被计算或清除,当定时器触发后,会调用对应的回调返回,返回TriggerResult。Trigger的返回结果可以是 continue(不做任何操作),fire(处理窗口数据),purge(移除窗口和窗口中的数据),或者 fire + purge。一个Trigger的调用结果只是fire的话,那么会计算窗口并保留窗口原样,也就是说窗口中的数据仍然保留不变,等待下次Trigger fire的时候再次执行计算。一个窗口可以被重复计算多次知道它被 purge 了。在purge之前,窗口会一直占用着内存。

1.3 ProcessingTimeTrigger源码分析

@PublicEvolving
public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {private static final long serialVersionUID = 1L;private ProcessingTimeTrigger() {}@Overridepublic TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {ctx.registerProcessingTimeTimer(window.maxTimestamp());return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx)throws Exception {return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {return TriggerResult.FIRE;}@Overridepublic void clear(TimeWindow window, TriggerContext ctx) throws Exception {ctx.deleteProcessingTimeTimer(window.maxTimestamp());}@Overridepublic boolean canMerge() {return true;}@Overridepublic void onMerge(TimeWindow window, OnMergeContext ctx) {// only register a timer if the time is not yet past the end of the merged window// this is in line with the logic in onElement(). If the time is past the end of// the window onElement() will fire and setting a timer here would fire the window twice.long windowMaxTimestamp = window.maxTimestamp();if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {ctx.registerProcessingTimeTimer(windowMaxTimestamp);}}@Overridepublic String toString() {return "ProcessingTimeTrigger()";}/** Creates a new trigger that fires once system time passes the end of the window. */public static ProcessingTimeTrigger create() {return new ProcessingTimeTrigger();}
}

在 onElement()方法中,ctx.registerProcessingTimeTimer(window.maxTimestamp())将会注册一个ProcessingTime定时器,时间参数是window.maxTimestamp(),也就是窗口的最终时间,当时间到达这个窗口最终时间,定时器触发并调用 onProcessingTime()方法,在 onProcessingTime() 方法中,return TriggerResult.FIRE 即返回 FIRE,触发窗口中数据的计算,但是会保留窗口元素。

需要注意的是ProcessingTimeTrigger类只会在窗口的最终时间到达的时候触发窗口函数的计算,计算完成后并不会清除窗口中的数据,这些数据存储在内存中,除非调用PURGE或FIRE_AND_PURGE,否则数据将一直存在内存中。实际上,Flink中提供的Trigger类,除了PurgingTrigger类,其他的都不会对窗口中的数据进行清除。

EventTimeTriggerr在onElement设置的定时器:

在这里插入图片描述
EventTime通过registerEventTimeTimer注册定时器,在内部Watermark达到或超过Timer设定的时间戳时触发。

二、移除器(Evictor)

2.1 Evictor扮演的角色
在这里插入图片描述
当一个元素进入stream中之后,一般要经历Window(开窗)、Trigger(触发器)、Evitor(移除器)、Windowfunction(窗口计算操作),具体过程如下:

  • Window中的WindowAssigner(窗口分配器)定义了数据应该被分配到哪个窗口中,每一个 WindowAssigner都会有一个默认的Trigger,如果用户在代码中指定了窗口的trigger,默认的 trigger 将会被覆盖。
  • Trigger上会有定时器,用来决定一个窗口何时能够被计算或清除。Trigger的返回结果可以是 continue(不做任何操作),fire(处理窗口数据),purge(移除窗口和窗口中的数据),或者 fire + purge。一个Trigger的调用结果只是fire的话,那么会计算窗口并保留窗口原样,也就是说窗口中的数据仍然保留不变,等待下次Trigger fire的时候再次执行计算。一个窗口可以被重复计算多次知道它被 purge 了。在purge之前,窗口会一直占用着内存。
  • 当Trigger fire了,窗口中的元素集合就会交给Evictor(如果指定了的话)。Evictor 主要用来遍历窗口中的元素列表,并决定最先进入窗口的多少个元素需要被移除。剩余的元素会交给用户指定的函数进行窗口的计算。如果没有 Evictor 的话,窗口中的所有元素会一起交给WindowFunction进行计算。
  • WindowFunction收到了窗口的元素(可能经过了 Evictor 的过滤),并计算出窗口的结果值,并发送给下游。窗口计算操作有很多,比如预定义的sum(),min(),max(),还有 ReduceFunction,WindowFunction。WindowFunction 是最通用的计算函数,其他的预定义的函数基本都是基于该函数实现的。

现在,大致了解了Evitor(移除器)扮演的角色和移除器在流中的哪个位置,让我们继续看为何使用Evictor。

Evictor接口定义如下:

在这里插入图片描述
evictBefore()包含要在窗口函数之前应用的清除逻辑,而evictAfter()包含要在窗口函数之后应用的清除逻辑。应用窗口函数之前清除的元素将不会被窗口函数处理。

窗格是具有相同Key和相同窗口的元素组成的桶,即同一个窗口中相同Key的元素一定属于同一个窗格。一个元素可以在多个窗格中(当一个元素被分配给多个窗口时),这些窗格都有自己的清除器实例。

注:window默认没有evictor,一旦把window指定Evictor,该window会由EvictWindowOperator类来负责操作。

2.2 Flink内置的Evitor

  • CountEvictor:保留窗口中用户指定的元素数量,并丢弃窗口缓冲区剩余的元素。
  • DeltaEvictor:依次计算窗口缓冲区中的最后一个元素与其余每个元素之间的delta值,若delta值大于等于指定的阈值,则该元素会被移除。使用DeltaEvictor清除器需要指定两个参数,一个是double类型的阈值;另一个是DeltaFunction接口的实例,DeltaFunction用于指定具体的delta值计算逻辑。
  • TimeEvictor:传入一个以毫秒为单位的时间间隔参数(例如以size表示),对于给定的窗口,取窗口中元素的最大时间戳(例如以max表示),使用TimeEvictor清除器将删除所有时间戳小于或等于max-size的元素(即清除从窗口开头到指定的截止时间之间的元素)。

2.2.1 CountEvictor

private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {if (size <= maxCount) {// 小于最大数量,不做处理return;} else {int evictedCount = 0;for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext();){iterator.next();evictedCount++;if (evictedCount > size - maxCount) {break;} else {// 移除前size - maxCount个元素,只剩下最后maxCount个元素iterator.remove();}}}
}

2.2.2 DeltaEvictor

DeltaEvictor通过计算DeltaFunction的值(依次传入每个元素和最后一个元素),并将其与threshold进行对比,如果DeltaFunction计算结果大于等于threshold,则该元素会被移除。DeltaEvictor的实现如下:

private void evict(Iterable<TimestampedValue<T>> elements, int size, EvictorContext ctx) {// 获取最后一个元素TimestampedValue<T> lastElement = Iterables.getLast(elements);for (Iterator<TimestampedValue<T>> iterator = elements.iterator(); iterator.hasNext();){TimestampedValue<T> element = iterator.next();// 依次计算每个元素和最后一个元素的delta值,同时和threshold的值进行比较// 若计算结果大于threshold值或者是相等,则该元素会被移除if (deltaFunction.getDelta(element.getValue(), lastElement.getValue()) >= this.threshold) {iterator.remove();}}
}

2.2.3 TimeEvictor

TimeEvictor以时间为判断标准,决定元素是否会被移除。TimeEvictor会获取窗口中所有元素的最大时间戳currentTime,currentTime减去窗口大小(windowSize) 可得到能保留最久的元素的时间戳evictCutoff,然后再遍历窗口中的元素,如果元素的时间戳小于evictCutoff,就执行移除操作,否则不移除。具体逻辑如下图所示:

在这里插入图片描述
TimeEvictor的代码实现如下:

private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {// 如果element没有timestamp,直接返回if (!hasTimestamp(elements)) {return;}// 获取elements中最大的时间戳(到来最晚的元素的时间)long currentTime = getMaxTimestamp(elements);// 截止时间为: 到来最晚的元素的时间 - 窗口大小(可以理解为保留最近的多久的元素)long evictCutoff = currentTime - windowSize;for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext(); ) {TimestampedValue<Object> record = iterator.next();// 清除所有时间戳小于截止时间的元素if (record.getTimestamp() <= evictCutoff) {iterator.remove();}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/479595.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

docker部署nginx,并配置SSL证书

、拉取nginx镜像 docker pull nginx:latest 在此过程中会遇到网络的问题&#xff0c;导致镜像无法下载&#xff0c;这时候需要在服务器中配置下国内的镜像地址。下面包含近期最新的国内镜像&#xff0c;截至2024年11月27日&#xff1a; "https://<你的阿里云账号ID&…

OceanBase 大数据量导入(obloader)

现需要将源数据库&#xff08;Oracle|MySQL等&#xff09;一些表的海量数据迁移到目标数据库 OceanBase 中&#xff0c;基于常规 jdbc 驱动编码的方式涉及开发工作&#xff0c;性能效率也要看编码的处理机制。 OceanBase 官方提供了的 OceanBase Migration Service (OMS) 数据…

【Spring MVC】如何获取cookie/session以及响应@RestController的理解,Header的设置

前言 &#x1f31f;&#x1f31f;本期讲解关于SpringMVC的编程之参数传递~~~ &#x1f308;感兴趣的小伙伴看一看小编主页&#xff1a;GGBondlctrl-CSDN博客 &#x1f525; 你的点赞就是小编不断更新的最大动力 &#x1f386;那么废…

【详细介绍及演示】Flink之checkpoint检查点的使用

目录 一、介绍 二、 设置checkpoint检查点演示 1、 代码演示 2、测试代码效果 3、查看快照情况 ​编辑 三、在集群上运行 1、第一次运行 2、第二次运行 四、自定义检查点savePoint 1、提交一个flink job 打成jar包 2、输入一些数据&#xff0c;观察单词对应的数字的…

JAVA篇05 —— 内部类(Local、Anonymous、Member、Static)

欢迎来到我的主页&#xff1a;【一只认真写代码的程序猿】 本篇文章收录于专栏【小小爪哇】 如果这篇文章对你有帮助&#xff0c;希望点赞收藏加关注啦~ 目录 1 内部类Inner Class 1.1 局部内部类 1.2 匿名内部类&#xff08;※※&#xff09; 1.3 匿名类最佳实践&#xf…

Spring Boot 与 Spring Cloud Alibaba 版本兼容对照

版本选择要点 Spring Boot 3.x 与 Spring Cloud Alibaba 2022.0.x Spring Boot 3.x 基于 Jakarta EE&#xff0c;javax.* 更换为 jakarta.*。 需要使用 Spring Cloud 2022.0.x 和 Spring Cloud Alibaba 2022.0.x。 Alibaba 2022.0.x 对 Spring Boot 3.x 的支持在其发行说明中…

jsp的pageContext对象

jsp的pageContext对象 是页面的上下文对象&#xff0c;表示当前页面运行环境&#xff0c;用于获取当前页面jsp页面信息&#xff0c;作用范围为当前的jsp页面 pageContext对象可以访问当前页面的所有jsp内置对象 jsp的四种内置对象 4中作用域&#xff1a;pagecontext,request…

网络安全在数字时代保护库存数据中的作用

如今&#xff0c;通过软件管理库存已成为一种标准做法。企业使用数字工具来跟踪库存水平、管理供应链和规划财务。 然而&#xff0c;技术的便利性也带来了网络威胁的风险。黑客将库存数据视为有价值的目标。保护这些数据不仅重要&#xff0c;而且必不可少。 了解网络安全及其…

Python图像处理:打造平滑液化效果动画

液化动画中的强度变化是通过在每一帧中逐渐调整液化效果的强度参数来实现的。在提供的代码示例中&#xff0c;强度变化是通过一个简单的线性插值方法来控制的&#xff0c;即随着动画帧数的增加&#xff0c;液化效果的强度也逐渐增加。 def liquify_image(image, center, radius…

day2全局注册

全局注册代码&#xff1a; //文件核心作用&#xff1a;导入App.vue,基于App.vue创建结构渲染index.htmlimport Vue from vue import App from ./App.vue //编写导入的代码&#xff0c;往代码的顶部编写&#xff08;规范&#xff09; import HmButton from ./components/Hm-But…

wireshark基础

免责声明&#xff1a; 笔记的只是方便各位师傅学习知识&#xff0c;以下代码、网站只涉及学习内容&#xff0c;其他的都与本人无关&#xff0c;切莫逾越法律红线&#xff0c;否则后果自负。 泷羽sec官网&#xff1a;https://longyusec.com/ 泷羽sec B站地址&#xff1a;https:/…

学习笔记037——Java中【Synchronized锁】

文章目录 1、修饰方法1.1、静态方法&#xff0c;锁定的是类1.2、非静态方法&#xff0c;锁定的是方法的调用者&#xff08;对象&#xff09; 2、修饰代码块&#xff0c;锁定的是传入的对象2.1、没有锁之前&#xff1a;2.2、有锁后&#xff1a; 实现线程同步&#xff0c;让多个线…

⭐️ GitHub Star 数量前十的工作流项目

文章开始前&#xff0c;我们先做个小调查&#xff1a;在日常工作中&#xff0c;你会使用自动化工作流工具吗&#xff1f;&#x1f64b; 事实上&#xff0c;工作流工具已经变成了提升效率的关键。其实在此之前我们已经写过一篇博客&#xff0c;跟大家分享五个好用的工作流工具。…

智能桥梁安全运行监测系统守护桥梁安全卫士

一、方案背景 桥梁作为交通基础设施中不可或缺的重要组成部分&#xff0c;其安全稳定的运行直接关联到广大人民群众的生命财产安全以及整个社会的稳定与和谐。桥梁不仅是连接两地的通道&#xff0c;更是经济发展和社会进步的重要纽带。为了确保桥梁的安全运行&#xff0c;桥梁安…

[创业之路-155] :《领先的密码-BLM方法论全面解读与应用指南》- 综合管理框架

目录 一、BLM&#xff08;业务领先模型&#xff09;综合管理框架 1、BLM模型的起源与发展 2、BLM模型的核心组成 3、BLM模型的战略规划与执行流程 4、BLM模型的应用价值 二、BLM&#xff08;业务领先模型&#xff09;实施案例 1. 华为的实施案例 2. 某知名企业A的实施案…

用Java爬虫“搜刮”工厂数据:一场数据的寻宝之旅

引言&#xff1a;数据的宝藏 在这个数字化的时代&#xff0c;数据就像是隐藏在数字丛林中的宝藏&#xff0c;等待着勇敢的探险家去发掘。而我们&#xff0c;就是那些手持Java魔杖的现代海盗&#xff0c;准备用我们的爬虫船去征服那些数据的海洋。今天&#xff0c;我们将一起踏…

redis 底层数据结构

概述 Redis 6 和 Redis 7 之间对比&#xff1a; Redis6 和 Redis7 最大的区别就在于 Redis7 已经用 listpack 替代了 ziplist. 以下是基于 Redis 7基础分析。 RedisObject Redis是⼀个<k,v>型的数据库&#xff0c;其中key通常都是string类型的字符串对象&#xff0c;⽽…

UE5 实现组合键触发事件的方法

因为工作原因。 需要用大括号{和}来触发事件 但是在蓝图中搜了一下&#xff0c;发现键盘事件里根本就没有{}这两个键。 花费了一下午&#xff0c;终于找到解决的方法了&#xff0c;也就是增强输入的弦操作 首先创建一个项目 纯蓝图或者C都可行 进入到内容浏览器的默认页面 …

使用Github Action将Docker镜像转存到阿里云私有仓库,供国内服务器使用,免费易用

文章目录 一、前言二、 工具准备&#xff1a;三、最终效果示例四、具体步骤第一大部分是配置阿里云1. 首先登录阿里云容器镜像服务 [服务地址](https://cr.console.aliyun.com/cn-hangzhou/instances)2. 选择个人版本3. 创建 命名空间4. 进入访问凭证来查看&#xff0c;用户名字…

用两个栈实现队列 剑指offer

题目描述 用两个栈实现一个队列。队列声明如下图&#xff0c;请实现它的两个函数appendTail和deleteHead,分别完成在队尾插入节点和队头删除节点的功能。 代码实现 测试用例 相关题目