【入门Flink】- 09Flink水位线Watermark

窗口的处理过程中,基于数据的时间戳,自定义一个“逻辑时钟”。这个时钟的时间不会自动流逝;它的时间进展,就是靠着新到数据的时间戳来推动的。

什么是水位线

用来衡量事件时间进展的标记,就被称作“水位线”(Watermark)

具体实现上,水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。

有序流中水位线

(1)理想状态(数据量小),数据应该按照生成的先后顺序进入流中,每条数据产生一个水位线

(2)实际应用中,如果当前数据量非常大,且同时涌来的数据时间差会非常小(比如几毫秒),往
往对处理计算也没什么影响。所以为了提高效率,会每隔一段时间生成一个水位线

乱序流中水位线

分布式系统中,数据在节点间传输,会因为网络传输延迟的不确定性,导致顺序发生改变,这就是
所谓的“乱序数据”。

(1)乱序+数据量小:还是靠数据来驱动,每来一个数据就提取它的时间戳、插入一个水位线。乱序数据,插入新的水位线时,要先判断一下时间戳是否比之前的大,否则就不再生成新的水位线。也就是说,只有数据的时间戳比当前时钟大,才能推动时钟前进,这时才插入水位线。

(2)乱序+数据量大:考虑到大量数据同时到来的处理效率,可以周期性地生成水位线。这时
只需要保存一下之前所有数据中的最大时间截,需要插入水位线时,就直接以它作为时间戳生成新的水位线。

(3)乱序+迟到数据:无法正确处理“迟到”的数据。为了让窗口能够正确收集到迟到的数据设置迟到时间,比如2秒:也就是用当前已有数据的最大时间戳减去2秒,就是要插入的水位线的时间戳。9秒的数据到来之后,事件时钟不会直接推进到9秒,而是进展到了7秒;必须等到11秒的数据到来之后,事件时钟才会进展到9秒,此时迟到2秒的数据也会被正确收集处理。【迟到时间不能设置过长,否则会对实时性会有所影响】

水位线的特性

  • 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据
  • 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展
  • 水位线是基于数据的时间戳生成的
  • 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进
  • 水位线可以通过设置延迟,来保证正确处理乱序数据
  • 一个水位线Watermark(t),表示在当前流中事件时间已经达到了时间戳t,代表t之前的所 有数据都到齐了,之后流中不会出现时间t'≤t的数据

水位线与窗口配合,完成对乱序数据的正确处理

水位线是流处理中对低延迟和结果正确性的一个权衡机制。

水位线生成策略

生成水位线的方法:.assignTimestampsAndWatermarks(),主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间。【指定水位线生成策略】

stream.assignTimestampsAndWatermarks(<watermark strategy>);

WatermarkStrategy 水位线策略是一个接口,里面内置一些生成策略:

image-20231111224149038

有序流中内置水位线设置

时间戳单调增长,所以永远不会出现迟到数据的问题。WatermarkStrategy.forMonotonousTimestamps()

        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy.<WaterSensor>forMonotonousTimestamps()// 指定时间戳分配器,从数据中提取 单位毫秒.withTimestampAssigner((SerializableTimestampAssigner<WaterSensor>) (element, recordTimestamp) -> {System.out.println(" 数据 =" + element + ",recordTs=" + recordTimestamp);return element.getTs() * 1000L;});

乱序流中内置水位线设置

由于乱序流中需要等待迟到数据到齐,必须设置一个固定量的延迟时间。WatermarkStrategy. forBoundedOutOfOrderness()

        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy// 乱序数据,等待3s.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 指定时间戳分配器,从数据中提取 单位毫秒.withTimestampAssigner((SerializableTimestampAssigner<WaterSensor>) (element, recordTimestamp) -> {System.out.println(" 数据 =" + element + ",recordTs=" + recordTimestamp);return element.getTs() * 1000L;});

自定义水位线生成器

(1)周期性水位线生成器(Periodic Generator)

周期性生成器一般是通过 onEvent()观察判断输入的事件,而在onPeriodicEmit()里发出水位线。
模仿该类BoundedOutOfOrdernessWatermarks

public class CustomBoundedOutOfOrdernessGenerator<T> implements WatermarkGenerator<T> {private Long delayTime = 5000L; // 延迟时间private Long maxTs = Long.MIN_VALUE + delayTime + 1L; // 观察到的最大时间戳@Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput output) {// 每来一条数据就调用一次maxTs = Math.max(eventTimestamp, maxTs); // 更新最大时间戳}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 发射水位线,默认 200ms 调用一次output.emitWatermark(new Watermark(maxTs - delayTime - 1L));}
}

在 onPeriodicEmit()里调用 output.emitWatermark(),就可以发出水位线了;方法由系统框架周期性地调用,默认 200ms 一次。【不建议修改】

 env.getConfig().setAutoWatermarkInterval(400L);

(2)断点式水位线生成器(Punctuated Generator)

断点式生成器会不停地检测 onEvent()中的事件,当发现带有水位线信息的事件时,就立即发出水位线。

如下:只要有数据来就直接发射水位线

    @Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput output) {// 每来一条数据就调用一次maxTs = Math.max(eventTimestamp, maxTs); // 更新最大时间戳output.emitWatermark(new Watermark(maxTs - delayTime - 1L));}

(3)在数据源中发送水位线

可以在自定义的数据源中抽取事件时间,然后发送水位线。

自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks 方法生成水位线二者只能取其一

env.fromSource(
kafkaSource,
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), 
// WatermarkStrategy.noWatermarks() 或者不发送水位线
"kafkasource"
)

水位线的传递(空闲等待withIdleness)

一个任务接收到多个上游并行任务传递来的水位线时,应该以最小的作为当前任务的事件时钟

如下案例:当程序并行度设置为2时,自定义分区器导致一个分区一直拿不到数据(最小时钟一直为null),此时如不加以干预,事件时钟将永远不会推进,存在问题。设置空闲时间,当超过空闲时间一直收不到该分区数据,直接忽略该分区,还是会依旧推进时间时钟

        env.setParallelism(2);// 自定义分区器:数据%分区数,只输入奇数,都只会去往map 的一个子任务SingleOutputStreamOperator<Integer> socketDS = env.socketTextStream("xxxx", 7777).partitionCustom(new MyPartitioner(), r -> r).map(Integer::parseInt).assignTimestampsAndWatermarks(WatermarkStrategy.<Integer>forMonotonousTimestamps().withTimestampAssigner((r, ts) -> r * 1000L).withIdleness(Duration.ofSeconds(5)) // 空闲等待 5s);// 分成两组:奇数一组,偶数一组,开 10s 的事件时间滚动窗口socketDS.keyBy(r -> r % 2).window(TumblingEventTimeWindows.of(Time.seconds(10)))...

迟到数据的处理

1)推迟水印推进(设置延迟时间)

水印产生时,设置一个乱序容忍度,推迟系统时间的推进,保证窗口计算被延迟执行,为乱序的数据争取更多的时间进入窗口。

WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));

2)设置窗口延迟关闭

Flink 的窗口,也允许迟到数据。当触发了窗口计算后,会先计算当前的结果,但是此时并不会关闭窗口。当达到设置延迟关闭时间之后,才会真正关闭窗口,关闭窗口后再迟到的数据就不会再处理。

.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))

3)使用侧流接收迟到的数据

最后兜底,窗口关闭之后的迟到数据,使用侧输出流输出。

完整方案:

public class WatermarkLateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("124.222.253.33", 7777).map(new WaterSensorMapFunction());WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy// 1.设置迟到时间 3s.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, recordTimestamp) -> element.getTs() * 1000L)// .withIdleness(Duration.ofSeconds(5)); // 空闲等待 5s;SingleOutputStreamOperator<WaterSensor>sensorDSWithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);OutputTag<WaterSensor> lateTag = new OutputTag<>("latedata", Types.POJO(WaterSensor.class));SingleOutputStreamOperator<String> process = sensorDSWithWatermark.keyBy(WaterSensor::getId).window(TumblingEventTimeWindows.of(Time.seconds(10))) .allowedLateness(Time.seconds(2)) // 2.推迟2s关窗.sideOutputLateData(lateTag) // 3.关窗后的迟到数据,放入侧输出流.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ") 包含 " + count + " 条数据===>" + elements);}});process.print();// 从主流获取侧输出流,打印process.getSideOutput(lateTag).printToErr("关窗后的迟到数据");env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/188893.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

css:两个行内块元素和图片垂直居中对齐

目录 两个行内块元素垂直居中对齐图片垂直居中问题图片和文字垂直居中对齐参考文章 两个行内块元素垂直居中对齐 先看一段代码&#xff1a; <style> .box {width: 200px;height: 200px;line-height: 200px;font-size: 20px;text-align: center;display: inline-block;b…

Xilinx FPGA平台DDR3设计详解(一):DDR SDRAM系统框架

DDR SDRAM&#xff08;双倍速率同步动态随机存储器&#xff09;是一种内存技术&#xff0c;它可以在时钟信号的上升沿和下降沿都传输数据&#xff0c;从而提高数据传输的速率。DDR SDRAM已经发展了多代&#xff0c;包括DDR、DDR2、DDR3、DDR4和DDR5&#xff0c;每一代都有不同的…

搭建Docker

一、概念 云服务器大家肯定不陌生了&#xff0c;相比较传统物理服务器来说他的价格&#xff0c;个性化的配置服务&#xff0c;节省了很多的运维成本&#xff0c;越来越多的企业以及个人开发者更加的青睐于云服务器。有了属于自己的服务器就可以部署搭建自己个人网站了&#xf…

Python实战 | 使用 Python 和 TensorFlow 构建卷积神经网络(CNN)进行人脸识别

专栏集锦&#xff0c;大佬们可以收藏以备不时之需 Spring Cloud实战专栏&#xff1a;https://blog.csdn.net/superdangbo/category_9270827.html Python 实战专栏&#xff1a;https://blog.csdn.net/superdangbo/category_9271194.html Logback 详解专栏&#xff1a;https:/…

openEuler编译安装nmon性能监控工具及可视化分析工具

ln 介绍 nmon&#xff08;short for Nigel’s Monitor&#xff09;是一个性能分析工具&#xff0c;由蓝色巨人IBM开发&#xff0c;最早用于自家操作系统UNIX&#xff0c;AIX &#xff08;Advanced Interactive eXecutive&#xff09;。现在也能用在Linux上。它可以显示系统的…

STM32--时钟树

一、什么是时钟&#xff1f; 时钟是单片机的脉搏&#xff0c;是系统工作的同步节拍。单片机上至CPU&#xff0c;下至总线外设&#xff0c;它们工作时序的配合&#xff0c;都需要一个同步的时钟信号来统一指挥。时钟信号是周期性的脉冲信号。 二、什么是时钟树&#xff1f; S…

51单片机PCF8591数字电压表数码管显示设计( proteus仿真+程序+设计报告+讲解视频)

PCF8591数字电压表数码管显示 1.主要功能&#xff1a;讲解视频&#xff1a;2.仿真3. 程序代码4. 设计报告5. 设计资料内容清单&&下载链接资料下载链接&#xff08;可点击&#xff09;&#xff1a; 51单片机PCF8591数字电压表数码管设计( proteus仿真程序设计报告讲解视…

设计模式之--原型模式(深浅拷贝)

原型模式 缘起 某天&#xff0c;小明的Leader找到小明:“小明啊&#xff0c;如果有个发简历的需求&#xff0c;就是有个简历的模板&#xff0c;然后打印很多份&#xff0c;要去一份一份展示出来&#xff0c;用编程怎么实现呢&#xff1f;” 小明一听&#xff0c;脑袋里就有了…

【云备份|| 日志 day6】文件业务处理模块

云备份day6 业务处理 业务处理 云备份项目中 &#xff0c;业务处理模块是针对客户端的业务请求进行处理&#xff0c;并最终给与响应。而整个过程中包含以下要实现的功能&#xff1a; 借助网络通信模块httplib库搭建http服务器与客户端进行网络通信针对收到的请求进行对应的业…

IDEA重新choose source

大概现状是这样&#xff1a;之前有个工程&#xff0c;依赖了别的模块基础包&#xff0c;但当时并没有依赖包的源码工程&#xff0c;因此&#xff0c;通过鼠标左键点进去&#xff0c;看到的是jar包里的class文件&#xff0c;注释什么的都去掉了的&#xff0c;不好看。后面有这个…

采用示波器显示扭矩传感器模拟信号

扭矩传感器输出的信号波形通常是模拟电压信号&#xff0c;可以通过示波器等仪器进行分析。扭矩传感器的输出信号波形通常有两种类型&#xff1a;正弦波和方波。 应变片传感器扭矩测量采用应变电测技术。在弹性轴上粘贴应变计组成测量电桥&#xff0c;当弹性轴受扭矩产生微小变…

Oracle(16)Managing Privileges

目录 一、基础知识 1、Managing Privileges管理权限 2、System Privileges 系统特权 3、System Privileges : Example系统权限&#xff1a;示例 4、Who Can Grant or Revoke? 谁可以授予或撤销权限&#xff1f; 5、The PUBLIC 6、SYSDBA and SYSOPER 7、Revoke with A…

分类预测 | Matlab实现PSO-LSTM粒子群算法优化长短期记忆神经网络的数据多输入分类预测

分类预测 | Matlab实现PSO-LSTM粒子群算法优化长短期记忆神经网络的数据多输入分类预测 目录 分类预测 | Matlab实现PSO-LSTM粒子群算法优化长短期记忆神经网络的数据多输入分类预测分类效果基本描述程序设计参考资料 分类效果 基本描述 1.Matlab实现PSO-LSTM粒子群算法优化长短…

Llama2通过llama.cpp模型量化 WindowsLinux本地部署

Llama2通过llama.cpp模型量化 Windows&Linux本地部署 什么是LLaMA 1 and 2 LLaMA&#xff0c;它是一组基础语言模型&#xff0c;参数范围从7B到65B。在数万亿的tokens上训练的模型&#xff0c;并表明可以专门使用公开可用的数据集来训练最先进的模型&#xff0c;而无需求…

KCC@广州与 TiDB 社区联手—广州开源盛宴

10月21日&#xff0c;KCC广州与 TiDB 社区联手&#xff0c;在海珠区保利中悦广场 29 楼召开了一次难忘的开源盛宴。这不仅仅是 KCC广州的又一次线下见面&#xff0c;更代表着与 TiDB 社区及广州技术社区的首次深度合作。 活动的策划与组织由 KCC广州负责人 - 惠世冀、PingCAP 的…

Ocelot:.NET开源API网关提供路由管理、服务发现、鉴权限流等功能

随着微服务的兴起&#xff0c;API网关越来越常见。API网关是连接应用程序和用户之间的桥梁&#xff0c;就像一个交通指挥员&#xff0c;负责处理所有进出应用的数据和请求&#xff0c;确保安全、高效、有序地流通。 今天给大家推荐一个.NET开源API网关。 01 项目简介 Ocelot…

用Powershell实现:删除所有不是与.json文件重名的.jpg文件

# 指定要搜索的目录路径 $directoryPath "C:\path\to\your\directory"# 获取该目录下的所有.jpg和.json文件 $jpgFiles Get-ChildItem -Path $directoryPath -Filter *.jpg $jsonFiles Get-ChildItem -Path $directoryPath -Filter *.json | Select-Object -Expan…

海外媒体发稿:彭博社发稿宣传中,5种精准营销方式

在如今的信息发生爆炸时期&#xff0c;营销方式多种多样&#xff0c;但是充分体现精准营销并针对不同用户群体的需求并非易事。下面我们就根据彭博社发稿营销推广为例子&#xff0c;给大家介绍怎样根据不同用户人群方案策划5种精准营销方式。 1.界定总体目标用户人群在制订精准…

CSS注入的四种实现方式

目录 CSS注入窃取标签属性数据 简单的一个实验&#xff1a; 解决hidden 方法1&#xff1a;jsnode.js实现 侧信道攻击 方法2&#xff1a;对比波兰研究院的方案 使用兄弟选择器 方法3&#xff1a;jswebsocket实现CSS注入 实验实现&#xff1a; 方法4&#xff1a;window…

数据分析实战 | 泊松回归——航班数据分析

目录 一、数据及分析对象 二、目的及分析任务 三、方法及工具 四、数据读入 五、数据理解 六、数据准备 七、模型训练 八、模型评价 一、数据及分析对象 CSV文件&#xff1a;o-ring-erosion-only.csv 数据集链接&#xff1a;https://download.csdn.net/download/m0_7…