【大数据】详解 Flink 中的 WaterMark

详解 Flink 中的 WaterMark

  • 1.基础概念
    • 1.1 流处理
    • 1.2 乱序
    • 1.3 窗口及其生命周期
    • 1.4 Keyed vs Non-Keyed
    • 1.5 Flink 中的时间
  • 2.Watermark
    • 2.1 案例一
    • 2.2 案例二
    • 2.3 如何设置最大乱序时间
    • 2.4 延迟数据重定向
  • 3.在 DDL 中的定义
    • 3.1 事件时间
    • 3.2 处理时间

1.基础概念

1.1 流处理

流处理,最本质的是在处理数据的时候,接受一条处理一条数据。

批处理,则是累积数据到一定程度在处理。这是他们本质的区别。

在设计上 Flink 认为数据是流式的,批处理只是流处理的特例。同时对数据分为有界数据和无界数据。

  • 有界数据对应批处理,API 对应 DateSet
  • 无界数据对应流处理,API 对应 DataStream

1.2 乱序

什么是乱序呢?

可以理解为数据到达的顺序和其实际产生时间的排序不一致。导致这的原因有很多,比如延迟,消息积压,重试等等。

我们知道,流处理从事件产生,到流经 source,再到 operator,中间是有一个过程和时间的。虽然大部分情况下,流到 operator 的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order 或者说 late element)。

✅ 某数据源中的某些数据由于某种原因(如:网络原因,外部存储自身原因)会有 5 秒的延时,也就是在实际时间的第 1 秒产生的数据有可能在第 5 秒中产生的数据之后到来(比如到 Window 处理节点)。例如,有 1 ~ 10 个事件,乱序到达的序列是:2, 3, 4, 5, 1, 6, 3, 8, 9, 10, 7。

1.3 窗口及其生命周期

对于 Flink,如果来一条消息计算一条,这样是可以的,但是这样计算是非常频繁而且消耗资源,如果想做一些统计这是不可能的。所以对于 Spark 和 Flink 都产生了窗口计算。

比如,因为我们想看到过去一分钟或过去半小时的访问数据,这时候我们就需要窗口。

  • Window:Window 是处理无界流的关键,Window 将流拆分为一个个有限大小的 buckets,可以在每一个 buckets 中进行计算。
  • 当 Window 是时间窗口的时候,每个 Window 都会有一个开始时间(start_time)和结束时间(end_time)(左闭右开),这个时间是系统时间。

简而言之,只要属于此窗口的第一个元素到达,就会创建一个窗口,当时间(事件或处理时间)超过其结束时间戳加上用户指定的允许延迟时,窗口将被完全删除。

窗口有如下组件:

  • Window Assigner:用来决定某个元素被分配到哪个或哪些窗口中去。
  • Trigger:触发器。决定了一个窗口何时能够被计算或清除。触发策略可能类似于 “当窗口中的元素数量大于 4” 时,或 “当水位线通过窗口结束时”。
  • Evictor:驱逐器。Evictor 提供了在使用 WindowFunction 之前或者之后从窗口中删除元素的能力。

窗口还拥有函数,比如 ProcessWindowFunctionReduceFunctionAggregateFunctionFoldFunction。该函数将包含要应用于窗口内容的计算,而触发器指定窗口被认为准备好应用该函数的条件。

1.4 Keyed vs Non-Keyed

在定义窗口之前,要指定的第一件事是流是否需要 Keyed,使用 keyBy(...) 将无界流分成逻辑的 keyed stream。如果未调用 keyBy(...),则表示流不是 keyed stream

  • 对于 Keyed 流,可以将传入事件的任何属性用作 key。拥有 keyed stream 将允许窗口计算由多个任务并行执行,因为每个逻辑 Keyed 流可以独立于其余任务进行处理。相同 Key 的所有元素将被发送到同一个任务。
  • 在 Non-Keyed 流的情况下,原始流将不会被分成多个逻辑流,并且所有窗口逻辑将由单个任务执行,即并行性为 1。

1.5 Flink 中的时间

Flink 在流处理程序支持不同的时间概念。分别为是 事件时间Event Time)、处理时间Processing Time)、提取时间Ingestion Time)。

从时间序列角度来说,发生的先后顺序是:事件时间提取时间处理时间

  • Event Time 是事件在现实世界中发生的时间,它通常由事件中的时间戳描述。
  • Ingestion Time 是数据进入 Apache Flink 流处理系统的时间,也就是 Flink 读取数据源时间。
  • Processing Time 是数据流入到具体某个算子 (消息被计算处理) 时候相应的系统时间。也就是 Flink 程序处理该事件时当前系统时间。

2.Watermark

Watermark 是 Apache Flink 为了处理 EventTime 窗口计算提出的一种机制,本质上也是一种时间戳。Watermark 是用于处理乱序事件或延迟数据的,这通常用 Watermark 机制结合 Window 来实现(Watermarks 用来触发 Window 窗口计算)。

2.1 案例一

public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {private final long maxOutOfOrderness = 3000; // 3.0 secondsprivate long currentMaxTimestamp;@Overridepublic long extractTimestamp(MyEvent element, long previousElementTimestamp) {long timestamp = element.getCreationTime();currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);return timestamp;}@Overridepublic Watermark getCurrentWatermark() {// return the watermark as current highest timestamp minus the out-of-orderness bound// 生成 watermarkreturn new Watermark(currentMaxTimestamp - maxOutOfOrderness);}
}

在这里插入图片描述
上图中是一个 10s 大小的窗口,1000020000 为一个窗口。当 EventTime 为 23000 的数据到来,生成的 WaterMark 的时间戳为 20000,大于等于 window_end_time,会触发窗口计算。

2.2 案例二

public class TimeLagWatermarkGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {private final long maxTimeLag = 3000; // 3 seconds@Overridepublic long extractTimestamp(MyEvent element, long previousElementTimestamp) {return element.getCreationTime();}@Overridepublic Watermark getCurrentWatermark() {// return the watermark as current time minus the maximum time lagreturn new Watermark(System.currentTimeMillis() - maxTimeLag);}
}

在这里插入图片描述
只是简单的用当前系统时间减去最大延迟时间生成 Watermark ,当 WaterMark 为 20000 时,大于等于窗口的结束时间,会触发 1000020000 窗口计算。再当 EventTime 为 19500 的数据到来,它本应该是属于窗口 1000020000 窗口的,但这个窗口已经触发计算了,所以此数据会被丢弃。

2.3 如何设置最大乱序时间

虽说水位线表明着早于它的事件不应该再出现,接收到水位线以前的的消息是不可避免的,这就是所谓的 迟到事件。实际上迟到事件是乱序事件的特例,和一般乱序事件不同的是它们的乱序程度超出了水位线的预计,导致窗口在它们到达之前已经关闭。

迟到事件出现时窗口已经关闭并产出了计算结果,因此处理的方法有 3 种:

  • 重新激活已经关闭的窗口并重新计算以修正结果。将迟到事件收集起来另外处理。将迟到事件视为错误消息并丢弃。Flink 默认的处理方式是直接丢弃,其他两种方式分别使用 Side OutputAllowed Lateness
  • Side Output 机制 可以将迟到事件单独放入一个数据流分支,这会作为 Window 计算结果的副产品,以便用户获取并对其进行特殊处理。
  • Allowed Lateness 机制 允许用户设置一个允许的最大迟到时长。Flink 会在窗口关闭后一直保存窗口的状态直至超过允许迟到时长,这期间迟到的事件不会被丢弃,而是默认会触发窗口重新计算。因为保存窗口状态需要额外内存,并且如果窗口计算使用了 ProcessWindowFunction API 还可能使得每个迟到事件触发一次窗口的全量计算,代价比较大,所以允许迟到时长不宜设得太长,迟到事件也不宜过多,否则应该考虑降低水位线提高的速度或者调整算法。

这里总结机制为:

  • 窗口 Window 的作用是为了周期性的获取数据。
  • WaterMark 的作用是防止数据出现乱序(经常),事件时间内获取不到指定的全部数据,而做的一种保险方法。
  • allowLateNess 是将窗口关闭时间再延迟一段时间。
  • sideOutPut 是最后兜底操作,所有过期延迟数据,指定窗口已经彻底关闭了,就会把数据放到侧输出流。
public class TumblingEventWindowExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.setParallelism(1);
//        env.getConfig().setAutoWatermarkInterval(100);DataStream<String> socketStream = env.socketTextStream("localhost", 9999);DataStream<Tuple2<String, Long>> resultStream = socketStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(3)) {@Overridepublic long extractTimestamp(String element) {long eventTime = Long.parseLong(element.split(" ")[0]);System.out.println(eventTime);return eventTime;}}).map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String value) throws Exception {return Tuple2.of(value.split(" ")[1], 1L);}}).keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.seconds(2)) // 允许延迟处理2秒.reduce(new ReduceFunction<Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {return new Tuple2<>(value1.f0, value1.f1 + value2.f1);}});resultStream.print();env.execute();}
}

在这里插入图片描述
watermark 为 21000 时,触发了 [10000, 20000) 窗口计算,由于设置了 allowedLateness(Time.seconds(2)),即允许两秒延迟处理,watermark < window_end_time + lateTime 公式得到满足,因此随后 10000 和 12000 进入窗口时,依然能触发窗口计算;随后 watermark 增加到 22000,watermark < window_end_time + lateTime 不再满足,因此 11000 再次进入窗口时,窗口不再进行计算。

2.4 延迟数据重定向

流的返回值必须是 SingleOutputStreamOperator,其是 DataStream 的子类。通过 getSideOutput 方法获取延迟数据。可以将延迟数据重定向到其他流或者进行输出。

public class TumblingEventWindowExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.setParallelism(1);DataStream<String> socketStream = env.socketTextStream("localhost", 9999);//保存被丢弃的数据OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("late-data"){};//注意,由于getSideOutput方法是SingleOutputStreamOperator子类中的特有方法,所以这里的类型,不能使用它的父类dataStream。SingleOutputStreamOperator<Tuple2<String, Long>> resultStream = socketStream// Time.seconds(3)有序的情况修改为0.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(3)) {@Overridepublic long extractTimestamp(String element) {long eventTime = Long.parseLong(element.split(" ")[0]);System.out.println(eventTime);return eventTime;}}).map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String value) throws Exception {return Tuple2.of(value.split(" ")[1], 1L);}}).keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(10))).sideOutputLateData(outputTag) // 收集延迟大于2s的数据.allowedLateness(Time.seconds(2)) //允许2s延迟.reduce(new ReduceFunction<Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {return new Tuple2<>(value1.f0, value1.f1 + value2.f1);}});resultStream.print();//把迟到的数据暂时打印到控制台,实际中可以保存到其他存储介质中DataStream<Tuple2<String, Long>> sideOutput = resultStream.getSideOutput(outputTag);sideOutput.print();env.execute();}
}

3.在 DDL 中的定义

3.1 事件时间

事件时间属性是通过 CREATE TABLE DDL 语句中的 WATERMARK 语句定义的。水印语句在现有事件时间字段上定义 水印生成表达式,将事件时间字段标记为事件时间属性。

Flink SQL 支持在 TIMESTAMPTIMESTAMP_LTZ 列上定义事件时间属性。如果源中的时间戳数据以 年-月-日-时-分-秒 表示,通常是不含时区信息的字符串值,例如 2020-04-15 20:13:40.564,建议将事件-时间属性定义为 TIMESTAMP 列。

CREATE TABLE user_actions (user_name STRING,data STRING,user_action_time TIMESTAMP(3),-- Declare the user_action_time column as an event-time attribute-- and use a 5-seconds-delayed watermark strategy.WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (...
);SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

如果数据源中的时间戳数据以纪元时间表示,通常是一个长值,例如 1618989564564,建议将事件时间属性定义为 TIMESTAMP_LTZ 列。

CREATE TABLE user_actions (user_name STRING,data STRING,ts BIGINT,time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),-- Declare the time_ltz column as an event-time attribute-- and use a 5-seconds-delayed watermark strategy.WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (...
);SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);

3.2 处理时间

处理时间能让表格程序根据本地机器的时间产生结果。这是最简单的时间概念,但会产生非确定性结果。处理时间不需要提取时间戳或生成水印。

CREATE TABLE DDL 语句中,使用系统 PROCTIME() 函数将处理时间属性定义为计算列。函数返回类型为 TIMESTAMP_LTZ

CREATE TABLE user_actions (user_name STRING,data STRING,-- Declare an additional field as a processing-time attribute.user_action_time AS PROCTIME()
) WITH (...
);SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/248840.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++数据结构与算法——数组

C第二阶段——数据结构和算法&#xff0c;之前学过一点点数据结构&#xff0c;当时是基于Python来学习的&#xff0c;现在基于C查漏补缺&#xff0c;尤其是树的部分。这一部分计划一个月&#xff0c;主要利用代码随想录来学习&#xff0c;刷题使用力扣网站&#xff0c;不定时更…

Linux系列之查看cpu、内存、磁盘使用情况

查看磁盘空间 df命令用于显示磁盘分区上的可使用的磁盘空间。默认显示单位为KB。可以利用该命令来获取硬盘被占用了多少空间&#xff0c;目前还剩下多少空间等信息。使用df -h命令&#xff0c;加个-h参数是为了显示GB MB KB单位&#xff0c;这样更容易查看 Filesystem …

【3DGS】从新视角合成到3D Gaussian Splatting

文章目录 引言&#xff1a;什么是新视角合成任务定义一般步骤NeRF的做法NeRF的三维重建NeRF的渲染 3DGS的三维重建从一组图片估计点云高斯点云模型球谐函数参数优化损失函数和协方差矩阵的优化高斯点的数量控制(Adaptive Density Control)新的问题 3DGS的渲染&#xff1a;快速可…

【行业应用-智慧零售】东胜物联餐饮门店智能叫号解决方案,为企业智能化升级管理服务

随着科技的不断进步&#xff0c;物联网设备已经广泛应用于各行各业&#xff0c;包括餐饮业。在餐饮门店的线下运营过程中&#xff0c;叫号系统是一项重要的设备需求。传统的叫号方式往往会消耗大量的人力和时间&#xff0c;而物联网技术为餐饮行业提供了一种更高效、智能化的解…

Atcoder ABC338 A-D题解

又是一篇姗姗来迟的atcoder题解。 Link:ABC338 Problem A: 妥妥的签到题。 #include <bits/stdc.h> using namespace std; int main(){string str;cin>>str;if(int(str[0])<65 || int(str[0])>90){cout<<"NO"<<endl;return 0;}for…

AIGC,ChatGPT4 实际需求效办公自动化函数应用

用实际需求来给大家演示一下ChatGPT如何助力办应用。 首先我们来提取年份值 我们将公式复制到表格即可。 接下来进行向下填充。 就得到了所有年份&#xff0c; 接下来我们完成第二个需求&#xff0c;按年份统计销售额。 Prompt&#xff1a;有一个表格C列是年份&#xff0c;D列…

Ubuntu系统硬盘分区攻略(磁盘分区、RAID磁盘阵列、RAID阵列、固态硬盘分区、机械硬盘分区、swap分区、swap交换分区)

文章目录 分区需求分区方案分区顺序相关疑问swap分区不是应该放在最后吗&#xff1f;我安装系统分区的时候&#xff0c;上面有available devices&#xff0c;下面有create software raid(md)&#xff0c;我该用哪个&#xff1f;我available devices下面有个893G的固态&#xff…

前端canvas项目实战——简历制作网站(三)——右侧属性栏(线条宽度样式)

目录 前言一、效果展示二、实现步骤1. 实现线条宽度&#xff08;strokeWidth&#xff09;的属性模块2. 实线线条样式&#xff08;strokeDashArray&#xff09;的属性模块3. 意料之外的“联动” 三、Show u the code后记 前言 上一篇博文中&#xff0c;我们初步实现了右侧属性栏…

Zoho如何使用低代码:赋予人力资源以技术实力

Zoho 为客户提供了一套跨功能产品&#xff0c;从运行简单的调查到简化复杂的企业组织职能&#xff0c;Zoho 几乎提供了企业的业务运行所需的一切。 组织在新的规范和挑战中不断进行扩展&#xff0c;这就不断需要构建可定制的解决方案。这就是为什么除了现成的应用程序之外&…

突破瓶颈!程序员最值得关注的19个顶级油管博主

油管可以说是互联网上最有趣的地方&#xff0c;你可以在这里找到任何你感兴趣的东西。这里也是学习和探索编程世界的绝佳方式。有趣又有才华的技术博主非常多&#xff0c;随时随地都可以与全世界的开发者交流学习。 我们整理了一些在编程领域有影响力的博主&#xff0c;希望能给…

BUUCTF-Real-[PHP]XXE

目录 1、原理 2、XXE漏洞产生的原因 3、开始复现 paylaod 复现 4、flag 1、原理 XML数据在传输过程中&#xff0c;攻击者强制XML解析器去访问攻击者指定的资源内容&#xff08;本地/远程&#xff09;&#xff0c;外部实体声明关键字SYSTEM会令XML解析器读取数据&#xf…

idea创建golang项目

目录 1、设置环境 2、创建项目 3、设置项目配置 4、初始化项目 5、安装本项目的外部依赖包 6、运行项目 7、访问页面查看结果 1、设置环境 1 启用 Go Modules 功能go env -w GO111MODULEon 2. 阿里云go env -w GOPROXYhttps://mirrors.aliyun.com/goproxy/,direct上述命…

【Pwn | CTF】BUUCTF test_your_nc1

天命&#xff1a;时隔两年&#xff0c;又杀回了pwn这里 拿到题目的提示&#xff0c;测试你的nc工具 这题直接连接就可以了&#xff0c;windows装了nc工具&#xff0c;直接耍 nc node5.buuoj.cn 28930 下面给一点nc命令的解释&#xff0c;文心一言得出来的 nc命令是一个用于网…

设计模式篇---备忘录模式

文章目录 概念结构实例总结 概念 备忘录模式&#xff1a;在不破坏封装的前提下捕获一个对象的内部状态&#xff0c;并在该对象之外保存这个状态&#xff0c;像这样可以在以后将对象恢复到原先保存的状态。 就好比我们下象棋&#xff0c;下完之后发现走错了&#xff0c;想要回退…

指针的深入了解6

1.回调函数 回调函数就是一个通过函数指针调用的函数。 如果你把函数的指针&#xff08;地址&#xff09;作为参数传递给另一个函数&#xff0c;当这个指针被用来调用其所指向的函数 时&#xff0c;被调用的函数就是回调函数。回调函数不是由该函数的实现方直接调用&#xff0…

openssl3.2 - 测试程序的学习 - 准备openssl测试专用工程的模板

文章目录 openssl3.2 - 测试程序的学习 - 准备openssl测试专用工程的模板概述笔记工程中需要的openssl的库实现补充 - 最终的模板工程END openssl3.2 - 测试程序的学习 - 准备openssl测试专用工程的模板 概述 openssl3.2 - 测试程序的学习 整了几个test.c, 每开一个新的测试工…

架构设计 高性能带来的复杂度

架构设计的主要目的是为了解决软件系统复杂度带来的问题。 复杂度来源之一就是软件的高性能。 对性能孜孜不倦的追求是整个人类技术不断发展的根本驱动力。例如计算机&#xff0c;从电子管计算机到晶体管计算机再到集成电路计算机&#xff0c;运算性能从每秒几次提升到每秒几…

【华为 ICT HCIA eNSP 习题汇总】——题目集11

1、某公司的内网用户采用 NAT 技术的 NO-pat 方式访问互联网&#xff0c;若所有的公网地址均被使用&#xff0c;则后续上网的内网用户会&#xff08;&#xff09;。 A、挤掉前一个用户&#xff0c;强制进行 NAT 转换上网 B、将报文同步到其他 NAT 转换设备上进行 NAT 转换 C、自…

【vue2】路由之 Vue Router

文章目录 一、安装二、基础使用1、简单的示例2、动态路由2.1 定义动态路径参数2.2 获取动态路径的参数2.3 捕获所有路由 3、嵌套路由4、编程式的导航4.1 router.push4.2 router.replace4.3 router.go(n) 5、命名路由6、重定向 三、进阶1、导航守卫1.1 全局前置守卫1.2 全局后置…

无需 Root 卸载手机预装软件,精简过的老年机又行了

基础准备 准备目标手机、USB 数据线、以及一台电脑。手机 USB 连接电脑&#xff0c;开发者选项中打开 USB 调试。&#xff08;开发者选项默认隐藏&#xff0c;需要在关于手机中多次点击版本号才能调出&#xff09;。 安装手机驱动&#xff0c;下载安装 ADB 工具包。 开始操作…