Flink(九)【时间语义与水位线】

前言

        2023-12-02-20:05,终于写完啦,最近状态不错。刚写完又收到了她的消息哈哈哈哈,开心。

再去全力打拼一次,奋战一场,就算最后打了败仗也无所谓,至少你留下了足迹。        《解忧杂货店》

1、时间语义 

        Flink 中的时间语义有两个:事件时间和处理时间。事件时间也就是数据产生的时间,通常都是数据自带的一个属性。处理时间则是指数据传输到我们集群被处理的时间。然而,由于在我们分布式系统中,数据在网络中有延迟,以及不同机器的时钟可能不一致,所以处理时间通常都要比事件时间滞后一些。

        比如我们在 8:59:59 产生了一条数据,只考虑网络延迟为 2s,窗口的起始时间为 [8:00:00,9:00:00)。如果以事件时间作为默认的时间语义的话,那么我们的集群一定得等到数据在 9:00:01 才会开始计算输出;而如果以处理时间作为默认的时间语义的话,那么当集群机器的时间达达 9:00:00 时立即进行计算输出。所以,不难发现,使用事件时间会牺牲一定的实时性,而使用处理时间则会失去一定的准确性。

        在实际应用中,事件时间更加常见。一般情况下,业务日志数据都会记录数据生成的时间戳,它就可以作为事件时间的判断基础。

        在 Flink 的早期版本中是以处理时间作为基本语义的,但在 Flink 1.12 之后,考虑到事件时间在实际中更加广泛,所以 Flink 就以事件时间作为默认的时间语义了。

2、水位线(Watermark)

2.1、事件时间和窗口

我们的水位线正是基于事件时间提出来的,所以先梳理一下事件时间和窗口的关系。

在这个窗口的处理过程中,我们是基于数据的时间戳(数据自带时间戳属性),自定义了一个“逻辑时钟”。这个时钟的时间不会自动流逝;它的时间进展,就是靠着新到数据的时间戳来推动的。

        事件时间完全依赖数据本身,这样可以保证数据的结果绝对准确。也就是说,不管机器时间是多少,我们只以新来数据的时间戳更新时钟。一般的流处理场景中,事件时间可以基本与处理时间保持同步,只是略微有点延迟。

2.2、水位线概念

        在事件时间语义下,我们不依赖系统时间,而是基于数据自带的时间戳去定义了一个时钟,用来表示当前时间的进展。于是每个并行子任务都会有一个自己的逻辑时钟,它的前进是靠数据的时间戳来驱动的。

        具体实现上,水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置是在某个数据到来之后;这样就可以把这个数据的时间戳抽出来,作为当前水位线的时间戳了。

 上图是理想状态下,数据量小,数据按照有序的状态进入流中,每条数据产生一个水位线

1)有序流中的水位线

然而,实际应用中,数据量非常大,并且数据之间的时间差非常小(几毫秒),如果依然在每条数据后面标记一个水位线,这样的代价是非常大的。所以为了提高效率,一般会每隔一段时间生产一个水位线。这时的水位线就像是一个周期性出现的时间标记。

2)无序流中的水位线

我们知道在分布式系统中,数据在节点间传输,会因为网络传输延迟的不确定性,导致顺序发生改变(比如我们多个 Source 的情况下,数据通过不同的节点发送给下游,而由于不同节点网络性能或硬件的差异,3s 产生的数据可能在 1s 产生的数据之前被发送给下游被处理),这就是所谓的“乱序数据”。

上图中,很明显有很多乱序的数据,所以有可能新的时间戳比之前的还小,如果直接将这个时间的水位线再插入,我们的“时钟”就回退了。所以,当我们插入新的水位线时,要先判断一下时间戳是否比之前的大,否则就不再生成新的水位线。也就是说,只有数据的时间戳比当前时钟大,才能推动时钟前进,这时才插入水位线。但是这样的代价就是,每来一条数据,就去判断一下事件时间是否大于当前水位线时间。

如果考虑到大量数据同时到来的处理效率,明显每个数据比较一次是不可行的。我们同样可以周期性地生成水位线。这时只需要周期性地保存一下该周期内所有数据中的最大时间戳,需要插入水位线时,就直接以它作为时间戳生成新的水位线 。

        但是上面的这种方法依然存在问题:我们无法正确处理“迟到”的数据。在上面的例子中,当 9 秒产生的数据到来之后,我们就直接将时钟推进到了9 秒;如果有一个窗口结束时间就是 9 秒(比如,要统计 0-9 秒的所有数据),那么这时窗口就应该关闭、将收集到的所有数据计算输出结果了。但事实上,由于数据是乱序的,还可能有时间戳为 7 秒、8 秒的数据在 9 秒的数据之后才到来,这就是“迟到数据”(late data)。它们本来也应该属于 0~9 秒这个窗口,但此时窗口已经关闭,于是这些数据就被遗漏了,这会导致统计结果不正确。而解决这种问题的方法也比较简单,就是等一下,也就是说,为了让窗口能够正确的收集迟到的数据,我们可以让窗口等上一段时间,比如 2s。

同样,我们一般都是周期性地生成水位线:

 这里需要特别注意的是,一个窗口所收集的数据,并不是之前所有已经到达的数据,而是真正数据的事件时间在该窗口范围内的。我们需要了解一下水位线和窗口的工作原理:

水位线和窗口的工作原理 (重点)

        我们之前把窗口理解为一个桶,处理完一个范围内的数据后就清空,然后继续下一个窗口。这在处理时间语义下是没有问题的,因为我们并不关心数据的是什么时候产生的,我们只关心数据是什么时候来的,我只保证来一个处理一个,在处理时间范围内处理并输出就好了。但是在事件时间语义下,这种理解是错误的,因为数据属于哪个窗口,是由数据本身的时间戳决定的,一个窗口只会收集真正属于它的那些数据。比如上图中,尽管水位线 W(20)之前有时间戳为 22 的数据到来,10~20 秒的窗口中也不会收集这个数据,进行计算依然可以得到正确的结果。

        所以我们的每个窗口都是一个桶,每次收集数据时它只会取走属于自己窗口内的数据,当达到窗口的结束时间(比如等待 2s 的情况下,窗口 [0,10)的结束时间就是 12,也就是说当来一条 事件时间为 11s 的数据时,我们认为当前的时间达到了 w(11-2)=9,当来一条事件时间为 12s 的数据时 w(12-2)=10 ,10 已经达到了我们的窗口关闭时间,这事就说明事件时间在 10 之前的数据都已经到齐了,窗口[0,10) 也就会关闭了)时,就对桶内的数据进行计算处理。

注意:窗口是我们属于窗口范围内的第一条数据到来的时候现 new 的,也就是动态创建的,而不是静态创建好的。 

3)水位线的特性

  • 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据
  • 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展
  • 水位线是基于数据的时间戳生成的
  • 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进
  • 水位线可以通过设置延迟,来保证正确处理乱序数据
  • 一个水位线 Watermark(t),表示在当前流中事件时间已经达到了时间戳 t, 这代表 t 之前的所有数据都到齐了,之后流中不会出现时间戳 t’ ≤ t 的数据

水位线是 Flink 流处理中保证结果正确性的核心机制,它往往会跟窗口一起配合,完成对乱序数据的正确处理。

2.3、生成水位线

1)生成水位线的总体原则

        完美的水位线是“绝对正确”的,也就是一个水位线一旦出现,就表示这个时间之前的数据已经全部到齐、之后再也不会出现了。而完美的东西总是可望不可即,我们只能尽量去保证水位线的正确。如果对结果正确性要求很高、想要让窗口收集到所有数据,我们该怎么做呢?由于网络传输的延迟不确定(节点挂了,网络异常),为了获取所有迟到数据,保证计算结果完全正确,必须等待足够长的时间,但这会带来更高的延迟

        如果我们希望计算结果能更加准确,那可以将水位线的延迟设置得更高一些,等待的时间越长,自然也就越不容易漏掉数据。不过这样做的代价是处理的实时性降低了,我们可能为极少数的迟到数据增加了很多不必要的延迟。如果我们希望处理得更快、实时性更强,那么可以将水位线延迟设得低一些。这种情况下,可能很多迟到数据会在水位线之后才到达,就会导致窗口遗漏数据,计算结果不准确。

        当然,如果我们对准确性完全不考虑、一味地追求处理速度,可以直接使用处理时间语义(毕竟不在乎数据准确性也就无所谓迟到),这在理论上可以得到最低的延迟。

        所以 Flink 中的水位线,其实是流处理中对低延迟和结果正确性的一个权衡机制,而且把控制的权力交给了程序员,我们可以在代码中定义水位线的生成策略。接下来我们就具体了解一下水位线在代码中的使用。

2)水位线生成策略

在 Flink 的 DataStream API 中 , 有 一 个 单 独 用 于 生 成 水 位 线 的 方法:assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间:

public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks( WatermarkStrategy<T> watermarkStrategy)

这里的 WatermarkStrategy 是一个接口,它包含了一个 “时间戳分配器” 和一个“水位线生成器”。 

DataStream<Event> stream = env.addSource(new ClickSource());
DataStream<Event> withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(<watermark strategy>);

至于为什么要有时间戳分配器,这是因为原始数据中的时间戳只是写入日志数据的一个字段,如果不提取出来并明确把它分配给数据,Flink 是无法知道数据真正产生的时间的。当然,有些时候数据源本身就提供了时间戳信息,比如读取 Kafka 时,我们就可以从 Kafka 数据中直接获取时间戳,而不需要单独提取字段分配了。

public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>,WatermarkGeneratorSupplier<T>{// 时间戳分配器@OverrideTimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);// 水位线生成器@OverrideWatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}

3)Flink 内置水位线策略

 1、有序流中内置水位线设置

我们来演示一个水位线驱动的滚动窗口(注意:这里的水位线是事件时间语义下的),这里演示的是有序流。

对于有序流,主要特点就是时间戳单调增长(Monotonously Increasing Timestamps),所以永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景,直接调用WatermarkStrategy.forMonotonousTimestamps()方法就可以实现。简单来说,就是直接拿当前最大的时间戳作为水位线就可以了。

public class WaterMarkDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("localhost", 9999).map(new WaterSensorFunction())// todo 指定 watermark 策略,我们直接使用实现好的.assignTimestampsAndWatermarks(WatermarkStrategy// 指定watermark的生成:泛型方法,需要指定数据类型,升序的watermark 没有等待时间.<WaterSensor>forMonotonousTimestamps()// 指定如何从数据中提取事件时间.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {   //函数接口 可以用lambda表达式@Overridepublic long extractTimestamp(WaterSensor sensor, long recordTimestamp) {System.out.println("数据=" + sensor + ",recordTs=" + recordTimestamp);return sensor.getTs() * 1000; // 返回的时间戳单位是 ms}}));KeyedStream<WaterSensor, String> sensorKs = sensorDS.keyBy(WaterSensor::getId);// todo 1. 指定窗口分配器:基于事件时间的滚动窗口 watermark 才能起作用WindowedStream<WaterSensor, String, TimeWindow> tumblingWindow = sensorKs.window(TumblingEventTimeWindows.of(Time.seconds(10)));// todo 2. 指定窗口函数:增量聚合的规约函数SingleOutputStreamOperator<String> process = tumblingWindow.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String start = sdf.format(new Date(startTs));String end = sdf.format(new Date(endTs));long size = elements.spliterator().estimateSize();out.collect("key=" + key + " 的窗口[" + start + "," + end + "]包含" + size + "条数据===>" + elements.toString());}});process.print();env.execute();}
}

上面的代码中,我们把 WaterSensor 的 ts 属性当做数据自带的事件时间,因为单位是毫秒,所以我们 *1000。withTimestampAssigner()中的参数里的 recordTimeStamp 的默认值为 Long.MIN_VALUE,一般场景用不到。

测试输入:

s1,1,1
s1,2,2
s1,3,3
s1,5,5
s1,9,9
s1,10,10
s1,20,20

输出结果:

数据=WaterSensor{id='s1', ts=1, vc=1},recordTs=-9223372036854775808
数据=WaterSensor{id='s1', ts=2, vc=2},recordTs=-9223372036854775808
数据=WaterSensor{id='s1', ts=3, vc=3},recordTs=-9223372036854775808
数据=WaterSensor{id='s1', ts=5, vc=5},recordTs=-9223372036854775808
数据=WaterSensor{id='s1', ts=9, vc=9},recordTs=-9223372036854775808
数据=WaterSensor{id='s1', ts=10, vc=10},recordTs=-9223372036854775808
key=s1 的窗口[1970-01-01 08:00:00,1970-01-01 08:00:10]包含5条数据===>[WaterSensor{id='s1', ts=1, vc=1}, WaterSensor{id='s1', ts=2, vc=2}, WaterSensor{id='s1', ts=3, vc=3}, WaterSensor{id='s1', ts=5, vc=5}, WaterSensor{id='s1', ts=9, vc=9}]
数据=WaterSensor{id='s1', ts=20, vc=20},recordTs=-9223372036854775808
key=s1 的窗口[1970-01-01 08:00:10,1970-01-01 08:00:20]包含1条数据===>[WaterSensor{id='s1', ts=10, vc=10}]

 可以看到,我们设置的窗口大小为 10 s,所以当WaterSensor{id='s1',ts=10,vc=10}来的时候才触发窗口计算输出并关闭。我们的窗口是左闭右开的。而且窗口并不会把不属于该窗口的数据包含进去。

2、乱序流中内置水位线设置

        由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间(Fixed Amount of Lateness)。这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳。调用 WatermarkStrategy. forBoundedOutOfOrderness()方法就可以实现。这个方法需要传入一个 maxOutOfOrderness 参数,表示“最大乱序程度”,它表示数据流中乱序数据时间戳的最大差值;如果我们能确定乱序程度,那么设置对应时间长度的延迟,就可以等到所有的乱序数据了。

这里我们继续使用滚动窗口来演示:
我们只需在上面代码的基础上修改:
 

SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("localhost", 9999).map(new WaterSensorFunction())// todo 指定 watermark 策略,我们直接使用实现好的.assignTimestampsAndWatermarks(WatermarkStrategy// 指定watermark的生成: 泛型方法,需要指定数据类型,乱序的watermark 需要设置等待时间.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))   // 等待2s// 指定如何从数据中提取事件时间.withTimestampAssigner((WaterSensor sensor, long recordTimestamp)-> {System.out.println("数据=" + sensor + ",recordTs=" + recordTimestamp);return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms}));

这里我们设置等待时间为 2s。

测试输入:

s1,1,1
s1,2,2
s1,5,5
s1,7,7
s1,9,9
s1,10,10
s1,3,3
s1,11,11
s1,12,12

输出结果:

数据=WaterSensor{id='s1', ts=1, vc=1},recordTs=-9223372036854775808
数据=WaterSensor{id='s1', ts=2, vc=2},recordTs=-9223372036854775808
数据=WaterSensor{id='s1', ts=5, vc=5},recordTs=-9223372036854775808
数据=WaterSensor{id='s1', ts=7, vc=7},recordTs=-9223372036854775808
数据=WaterSensor{id='s1', ts=9, vc=9},recordTs=-9223372036854775808
数据=WaterSensor{id='s1', ts=10, vc=10},recordTs=-9223372036854775808
数据=WaterSensor{id='s1', ts=3, vc=3},recordTs=-9223372036854775808
数据=WaterSensor{id='s1', ts=11, vc=11},recordTs=-9223372036854775808
数据=WaterSensor{id='s1', ts=12, vc=12},recordTs=-9223372036854775808
key=s1 的窗口[1970-01-01 08:00:00,1970-01-01 08:00:10]包含6条数据===>[WaterSensor{id='s1', ts=1, vc=1}, WaterSensor{id='s1', ts=2, vc=2}, WaterSensor{id='s1', ts=5, vc=5}, WaterSensor{id='s1', ts=7, vc=7}, WaterSensor{id='s1', ts=9, vc=9}, WaterSensor{id='s1', ts=3, vc=3}]
key=s1 的窗口[1970-01-01 08:00:10,1970-01-01 08:00:20]包含3条数据===>[WaterSensor{id='s1', ts=10, vc=10}, WaterSensor{id='s1', ts=11, vc=11}, WaterSensor{id='s1', ts=12, vc=12}]

可以看到我们数据的事件时间达到10s时,窗口仍然没有关闭,此时依然可以接受迟到的数据,直到大于(等待时间+窗口关闭时间 = 12s)的数据来的时候,才会触发窗口计算关闭。

3、内置水位线原理
1)乱序流中水位线的生成原理

对于我们上面的乱序流中生成水位线原理,我们可以查看 <WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)) 的底层源码:

 2)有序流中水位线的生成原理

同样,我们查看 <WaterSensor>forMonotonousTimestamps() 方法的源码:

它也是返回一个对象,我们继续查看: 

我们发现,有序水位线它的底层仍然是乱序水位线,只不过它的等待时间为 0ms 。

总结

内置水位线的生成原理:

  • 都是周期性生产的:默认是 200ms(可以通过 env.getConfig().setAutoWatermarkInterval() 查看默认的水位线生成周期)
  • 有序流:watermark = 当前最大事件时间 - 0 ms
  • 乱序流:watermark = 当前最大事件时间 - 等待时间(也叫乱序程度) -1 ms
4、自定义水位线策略
1)周期性水位线生产策略

周期时间我们一般是不去随便修改的,默认为 200 ms。

下面我们模仿 Flink 的内置乱序流水位线策略来自定义一个水位线生成器:

public class MyPeriodWatermarkGenerator<T> implements WatermarkGenerator<T> {private long maxTs; // 保存到当前为止最大的事件时间private long delayTs;   // 保存等待时间public MyPeriodWatermarkGenerator(long delayTs) {this.maxTs = Long.MIN_VALUE + this.delayTs + 1;this.delayTs = delayTs;}/*** 每条数据来都会调用一次,用来提取最大的事件时间* @param event* @param eventTimestamp 提取到的事件时间* @param output*/@Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput output) {maxTs = Math.max(maxTs,eventTimestamp);System.out.println("调用 onEvent 方法,获取当前最大的时间戳="+maxTs);}/*** 周期性调用: 生成 watermark* @param output*/@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(maxTs - delayTs - 1));System.out.println("调用onPeriodicEmit方法,生成watermark="+(maxTs - delayTs - 1));}
}

测试:

// 这里为了测试 一般不去修改水位线生成的周期时间
env.getConfig().setAutoWatermarkInterval(2000);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("localhost", 9999).map(new WaterSensorFunction())// todo 指定 watermark 策略,我们直接使用实现好的.assignTimestampsAndWatermarks(WatermarkStrategy// 指定自定义的watermark生成器.<WaterSensor>forGenerator(ctx -> new MyPeriodWatermarkGenerator<>(3000))// 指定如何从数据中提取事件时间.withTimestampAssigner((WaterSensor sensor, long recordTimestamp)-> {System.out.println("数据=" + sensor + ",recordTs=" + recordTimestamp);return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms}));

我们可以发现,onPeriodEmit()方法是每周期执行一次。

2)断点式水位线生成器

断点式和周期式唯一的不同就是发送水位线的方法,上面的周期式中,我们使用 onPeriodicEmit()方法来周期性地发送水位线,而断电式则由 onEvent() 来发送水位线,也就是只要有新的一条数据来,它就会更新水位线。具体代码只需要修改以下部分:

@Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput output) {maxTs = Math.max(maxTs,eventTimestamp);output.emitWatermark(new Watermark(maxTs - delayTs -1));System.out.println("调用 onEvent 方法,获取当前最大的时间戳="+maxTs+"生成watermark="+(maxTs - delayTs - 1));}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 啥也不用干}

对于我们之前的 Kafka 数据源,我们现在可以指定它的水位线生产策略了:

env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2000L)),"kafkaSource").print();

注意:水位线策略的设置只需要设置一次!Kafka 数据源不需要设置时间戳读取器(也就是如何从数据源读取事件时间),因为对于 Kafka 数据源,框架可以直接从 Source 中获取事件时间。

2.4、水位线的传递

        我们知道,水位线是数据流中插入的一个标记,用来表示事件时间的进展。它随着数据一起在任务间传递。

        在直通式(forward)传输的情况下,数据和水位线都是按照本身的顺序依次传递、依次处理的。一旦水位线到达了算子任务,该任务就会将它内部的时钟设为这个水位线的时间戳。

        然而,实际应用中往往上下游都有多个并行子任务,为了统一推进事件时间的进展,要求上游任务处理完水位线、时钟改变之后,要把当前的水位线再次发出,广播给所有的下游子任务。这样,后续任务就不需要依赖原始数据中的时间戳(避免数据经过转化处理后发生改变),也可以知道当前事件时间了。

        还有一个问题就是,在“重分区”(redistributing)的传输模式下,一个任务有可能会收到来自不同分区上游子任务的数据。而不同分区的子任务时钟并不同步(有的子任务处理的数据的事件时间早,有的任务处理的的数据的事件时间晚,所以也就使得每个子任务的水位线时间戳有的快有的慢,也就使得不同子任务的逻辑时钟不同步),所以同一时刻发给下游任务的水位线可能并不相同。这个时候下游就要确定到底按照谁发来的水位线来确定为当前事件的最新进展,答案是最小的水位线,因为我们水位线的本质就是 “保证当前时间之前的数据,都已经到齐了”。

        此外,多并行度情况下,我们的一条数据通常只会去往一个分区(分区就是子任务),但是我们的水位线是特殊的,它会广播到所有下游节点,来推进整个事件的进展。还需要注意的是,多并行度的情况下往往会对我们的水位线有影响,比如我们设置的等待时间为 3s,但当事件时间为 13 的数据到来后,它并不会立即关闭窗口,因为在多并行度下,水位线的更新是取最小的(取的是两个上游任务中的最小),比如:

上游并行任务(等待3s)            水位线
map1 -> 1-> 一条数据无法取最小
map2 -> 3-> 取最小=1       -2
map1 -> 5      -> 取最小=3        0  
map2 -> 7-> 取最小=5        2
map1 -> 13  -> 取最小=7        4   
map2 -> 14-> 取最小=13       10   

2.5、设置空闲等待(Idleness)

        在多个并行度的情况下,我们知道,水位线的更新需要至少通过两个上游并行任务的数据的事件时间来比较。而加入一个上游中只有一条数据会出现什么情况呢:

上游任务(等待3s)        事件时间             水位线
map1 -> 1-> 一条数据无法取最小
map2 -> 2-> 取最小=1                        -2
map1 -> 3    -> 取最小=2                        -1
map1 -> 5-> 一条数据无法取最小(还需要一条map2的数据)
map1 -> 7-> 一条数据无法取最小(还需要一条map2的数据)
map1 -> 13-> 一条数据无法取最小(还需要一条map2的数据)

可以看到,这样就会造成我们的逻辑时钟(水位线)迟迟无法推进,怎么解决呢?就是当我们的一个上游并行任务不再有数据到来时,我们下游任务不再等待。

public class WatermarkIdlenessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 自定义分区器 把奇数和偶数分区到两个不同的map子任务// 输入的数字就是事件时间*1000msSingleOutputStreamOperator<Integer> socketDS = env.socketTextStream("localhost", 9999).partitionCustom(new MyPartitioner(), num -> num)    //根据自己来进行分区.map(Integer::parseInt)// todo 指定 watermark 策略.assignTimestampsAndWatermarks(WatermarkStrategy// 使用有序流的watermark生成器 升序.<Integer>forMonotonousTimestamps()// 指定如何从数据中提取事件时间.withTimestampAssigner((num, ts) -> num * 1000L)// 空闲等待时间 5s.withIdleness(Duration.ofSeconds(5)));SingleOutputStreamOperator<String> process = socketDS.keyBy(num -> num % 2).window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<Integer, String, Integer, TimeWindow>() {@Overridepublic void process(Integer key, Context context, Iterable<Integer> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String start = sdf.format(new Date(startTs));String end = sdf.format(new Date(endTs));long size = elements.spliterator().estimateSize();out.collect("key=" + key + " 的窗口[" + start + "," + end + "]包含" + size + "条数据===>" + elements.toString());}});process.print();env.execute();}
}

上面的代码中,我们的并行度为2,由于数据源是 Socket ,所以 Source算子并行度只能为 1;而 输入的数据由于我们指定了 MyPartitioner 所以它会按照把奇数和偶数分到不同的 map算子;

        在水位线传递的过程中,当上游没有偶数传递时,处理奇数的process算子需要等待偶数数据到来才能确定窗口的关闭时间。这是因为水位线的生成是基于事件时间的,而事件时间是根据数据本身的时间戳来计算的。处理奇数的process算子虽然只处理奇数数据,但是它需要等待偶数数据到来以便根据偶数数据的时间戳来确定窗口的关闭时间。如果处理奇数的process算子不等待偶数数据到来就关闭窗口,那么可能会出现数据丢失或计算结果不正确的情况。

2.6、迟到数据的处理

        之前我们说,通过设置等待时间可以解决一定的数据乱序问题,但并不是 100% 的解决,因为往往不会把等待时间设置的太久(会造成计算的延迟),所以考虑到一些数据乱序程度无法预知,光靠等待时间是不行的(会造成结果不准确)。解决数据乱序问题我们除了设置等待时间,其实还有两招:设置窗口延迟关闭 和 使用侧输出流接收延迟数据。

2.6.1、设置窗口延迟关闭

        我们可以在 window() 方法之后 .allowedLateness(Time.seconds(2)) 来设置关窗时间为 2s。窗口的触发计算和关闭是两码事,我们之前都是触发计算后直接关闭,这里我们设置延迟关闭 2s,也就是说,当有数据的事件时间达到窗口最大值,窗口被触发计算一次,但不会立即关闭,而是允许再多等一会,但是如果出现有比窗口最大关闭时间还要大2s的数据来时,窗口直接关闭。

public class WaterMarkAllowLaterDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("localhost", 9999).map(new WaterSensorFunction())// todo 指定 watermark 策略,我们直接使用实现好的.assignTimestampsAndWatermarks(WatermarkStrategy// 指定watermark的生成: 泛型方法,需要指定数据类型,乱序的watermark 需要设置等待时间.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))   // 等待2s// 指定如何从数据中提取事件时间.withTimestampAssigner((WaterSensor sensor, long recordTimestamp)-> {System.out.println("数据=" + sensor + ",recordTs=" + recordTimestamp);return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms}));SingleOutputStreamOperator<String> process = sensorDS.keyBy(WaterSensor::getId).window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.seconds(2))   //设置运行窗口延迟关闭2s.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String start = sdf.format(new Date(startTs));String end = sdf.format(new Date(endTs));long size = elements.spliterator().estimateSize();out.collect("key=" + key + " 的窗口[" + start + "," + end + "]包含" + size + "条数据===>" + elements.toString());}});process.print();env.execute();}
}

测试输入:

s1,1,1
s1,2,2
s1,10,10
s1,12,12
s1,6,6
s1,3,3
s1,14,14
s1,5,5
s1,3,3

运行结果:

key=s1 的窗口[1970-01-01 08:00:00,1970-01-01 08:00:10]包含2条数据===>[WaterSensor{id='s1', ts=1, vc=1}, WaterSensor{id='s1', ts=2, vc=2}]
key=s1 的窗口[1970-01-01 08:00:00,1970-01-01 08:00:10]包含3条数据===>[WaterSensor{id='s1', ts=1, vc=1}, WaterSensor{id='s1', ts=2, vc=2}, WaterSensor{id='s1', ts=6, vc=6}]
key=s1 的窗口[1970-01-01 08:00:00,1970-01-01 08:00:10]包含4条数据===>[WaterSensor{id='s1', ts=1, vc=1}, WaterSensor{id='s1', ts=2, vc=2}, WaterSensor{id='s1', ts=6, vc=6}, WaterSensor{id='s1', ts=3, vc=3}]
key=s1 的窗口[1970-01-01 08:00:10,1970-01-01 08:00:20]包含3条数据===>[WaterSensor{id='s1', ts=10, vc=10}, WaterSensor{id='s1', ts=12, vc=12}, WaterSensor{id='s1', ts=14, vc=14}]

可以看到,当数据 "s1,12,12" 到来时,窗口触发计算一次,但没有立即关闭,所以之后迟到的 "s1,6,6," 和 "s1,3,3" 仍然可以触发计算,但是当大于窗口最大关闭时间+2s(允许迟到的时间)的数据 "s1,14,14" 到来后,窗口彻底关闭,之后到来的 "s1,5,5" 和 "s1,3,3" 无法进行计算。 

2.6.2、使用侧输出流接收延迟数据

流式数据没有 100% 的完美,数据迟到不可能彻底解决,为了尽可能让结果正确,让极端迟到的数据仍然能够计算,我们还可以使用侧输出流。

public class WaterMarkAllowLaterDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("localhost", 9999).map(new WaterSensorFunction())// todo 指定 watermark 策略,我们直接使用实现好的.assignTimestampsAndWatermarks(WatermarkStrategy// 指定watermark的生成: 泛型方法,需要指定数据类型,乱序的watermark 需要设置等待时间.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))   // 等待2s// 指定如何从数据中提取事件时间.withTimestampAssigner((WaterSensor sensor, long recordTimestamp)-> {System.out.println("数据=" + sensor + ",recordTs=" + recordTimestamp);return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms}));// 定义侧输出流OutputTag<WaterSensor> lateData = new OutputTag<>("lateData", Types.POJO(WaterSensor.class));SingleOutputStreamOperator<String> process = sensorDS.keyBy(WaterSensor::getId).window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.seconds(2))   //设置运行窗口延迟关闭2s.sideOutputLateData(lateData)  // 关窗后的迟到数据放到侧输出流.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String start = sdf.format(new Date(startTs));String end = sdf.format(new Date(endTs));long size = elements.spliterator().estimateSize();out.collect("key=" + key + " 的窗口[" + start + "," + end + "]包含" + size + "条数据===>" + elements.toString());}});process.print();// 从主流获取侧输出流并打印process.getSideOutput(lateData).printToErr();env.execute();}
}

测试输入:

s1,1,1
s1,2,2
s1,12,12
s1,5,5
s1,7,7
s1,14,14
s1,1,1
s1,2,2

运行结果:

2.7、迟到数据总结

2.7.1、乱序和迟到的区别
  • 乱序:数据的顺序乱了,事件时间小的数据 比 事件时间大的数据 晚来
  • 迟到:数据的事件时间 < 水位线时间,窗口关闭了才来
2.7.2、迟到数据的处理
  1. 设置乱序等待时间
  2. 如果开窗,设置窗口允许迟到,延迟关闭窗口
  3. 关窗后的数据放到侧输出流

对数据的延迟时间要做到心中有数

  • 等待时间,设置一个不是特别大的,一般都是秒级,在 乱序和延迟中做取舍
  • 允许迟到时间(窗口延迟关闭时间),置考虑大部分的迟到数据
  • 极端迟到数据放到侧输出流,最后单独拿出来合并一下就好了

        耗费三四天时间终于把这一块学完了,时间语义是非常重要的内容,需要好好理解记忆,也要知道怎么通过代码实现。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/211537.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

webpack学习-1.起步

webpack学习-1.起步 1.基础设置2.配置文件的引入3.总结 1.基础设置 首先 webpack是干嘛的呢&#xff0c;用官网的一张图 Webpack 是一个现代的静态模块打包工具。它主要用于将前端应用程序中的各种资源&#xff08;例如 JavaScript、CSS、图片等&#xff09;打包成一个或多个…

Linux 进程地址空间

文章目录 进程地址空间进程地址空间结构页表虚拟内存写时拷贝 进程地址空间 进程地址空间难以定义&#xff0c;因为它更像是一个中间件。 程序从磁盘中加载到内存&#xff0c;程序的执行需要硬件资源&#xff0c;所以每个程序启动时会创建至少一条进程&#xff0c;进程作为组…

销售人员如何拓展客户?

销售这个职业每年都会有很多新人来&#xff0c;也有很多老人转行。大多数人转行的原因无非就是找不到客户&#xff0c;没有完成业绩导致工资不理想&#xff0c;性格不合适无法开口交流不会社交等等。 既然有很多人因为找不到客户而放弃&#xff0c;那么对于一个销售来说&#…

k8s官方镜像代理加速

背景 大家可能在云原生领域需要部署周边的一些生态组件时&#xff0c;在国内遇到无法正常拉取镜像&#xff0c;显得就有点苦恼&#xff0c;不过没关系&#xff0c;常见的${{ registry_name }} 例如 “gcr.io”&#xff0c;“registry.k8s.io” Failed to pull image “registry…

[ffmpeg] aac 音频编码

aac 介绍 aac 简单说就是音频的一种压缩编码器&#xff0c;相同音质下压缩比 mp3好&#xff0c;目前比较常用。 aac 编码支持的格式 aac 支持的 sample_fmts: 8 aac 支持的 samplerates: 96000 88200 64000 48000 44100 32000 24000 22050 16000 12000 11025 8000 7350 通…

git push 报错 error: src refspec master does not match any 解决

git报错 ➜ *** git:(main) git push -u origin "master" error: src refspec master does not match any error: failed to push some refs to https://gitee.com/***/***.git最新版的仓库初始化后 git 主分支变成了 main 方法 1.把 git 默认分支名改回 master …

maven生命周期回顾

目录 文章目录 **目录**两种最常用打包方法&#xff1a;生命周期&#xff1a; 两种最常用打包方法&#xff1a; 1.先 clean&#xff0c;然后 package2.先 clean&#xff0c;然后install 生命周期&#xff1a; 根据maven生命周期&#xff0c;当你执行mvn install时&#xff0c…

Python函数

1.函数 1.1 函数概述 函数定义和优势 不同形状正方形打印 # 2个 for i in range(0, 2):for j in range(0, 2):print("*", end"")print() # 3个 for i in range(0, 3):for j in range(0, 3):print("*", end"")print() # 4个 for i …

Linux:dockerfile编写搭建nginx练习(8)

dockerfile是创建镜像的一种&#xff0c;通过已有镜像的基础上再在上面部署一些别的。 在这个基础镜像上搭建&#xff0c;我这个是一个空的centos镜像 我这里用http的yum仓库存放了nginx和rpm包 创建dockerfile vim Dockerfile写入#设置基础镜像 FROM centos#维护该镜像的用户…

redis------在java中操作redis

Redis&#xff08;非关系型数据库&#xff09;简介 redis下载 点击即可进入redis中文网进行下载 百度网盘windows版本 提取码 DMH6 redis主要特点 基于内存存储&#xff0c;读写性能高 适合存储热点数据&#xff08;热点商品、资讯、新闻&#xff09; 企业应用广泛 redis不同…

SQL Server 2016(创建数据库)

1、实验环境。 某公司有一台已经安装了SQL Server 2016的服务器&#xff0c;现在需要新建数据库。 2、需求描述。 创建一个名为"db_class"的数据库&#xff0c;数据文件和日志文件初始大小设置为10MB&#xff0c;启用自动增长&#xff0c;数据库文件存放路径为C:\db…

Gti GUI添加标签

通过Git Gui打开项目&#xff0c;通过菜单打开分支历史&#xff0c;我这里是名为"develop"的分支 选中需要打标签的commit&#xff0c;右键-Create tag即可 但貌似无法删除标签&#xff0c;只能通过git bash

linux NAT网卡配置static

由于是内网&#xff0c;资料无法拷贝&#xff0c;借助参考资料&#xff0c;整理发出。 镜像安装 基本操作。 查看VM配置 图1&#xff0c;有几个信息。一个是NAT借用了网卡里的VMnet8适配器。 子网IP是从192.168.142.0 子网掩码255.255.255.255&#xff0c;对应下面配置的N…

CoreDNS实战(五)-接入prometheus监控

1 背景 Prometheus插件作为coredns的Plugins&#xff0c;默认情况下是内置在coredns中&#xff0c;如果是自己编译安装的版本&#xff0c;需要注意在编译安装的时候的plugin.cfg文件中添加了prometheus:metrics&#xff0c;这样才能确保编译成功。 # 首先我们检查一下运行的版…

【从零认识ECS云服务器 | 快速上线个人网站】二、使用ECS云服务器

第二章 使用ECS 2.1 获取ECS 方式一&#xff1a;通过试用中心免费领取ECS实例 满足以下全部条件的阿里云用户&#xff0c;可免费试用云服务器ECS&#xff1a; 阿里云注册会员用户并完成阿里云企业认证或个人认证用户。申请用户是云服务器ECS产品的新用户&#xff0c;可以申…

【链表Linked List】力扣-2 两数相加

目录 题目描述 解题过程 题目描述 给你两个 非空 的链表&#xff0c;表示两个非负的整数。它们每位数字都是按照 逆序 的方式存储的&#xff0c;并且每个节点只能存储 一位 数字。 请你将两个数相加&#xff0c;并以相同形式返回一个表示和的链表。 你可以假设除了数字 0 …

Vue学习计划-Vue2--Vue核心(二)Vue代理方式

Vue data中的两种方式 对象式 data:{}函数式 data(){return {} }示例&#xff1a; <body><div id"app">{{ name }} {{ age}} {{$options}}<input type"text" v-model"value"></div><script>let vm new Vue({el: …

[JavaScript前端开发及实例教程]计算器井字棋游戏的实现

计算器&#xff08;网页内实现效果&#xff09; HTML部分 <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>My Calculator&l…

【头歌系统数据库实验】实验6 SQL的多表查询-2

目录 第1关&#xff1a;查询每个选手的信息及其提交的解答信息&#xff0c;没做题的选手不显示 第2关&#xff1a;查询做了1001题且耗时大于500&#xff08;time&#xff09;的选手信息 第3关&#xff1a;查询所有选手信息及其提交的解答信息&#xff0c;没做题的选手也要显…

css所有属性介绍

文章目录 1️⃣ CSS属性介绍1.1 CSS3 动画属性&#xff08;Animation&#xff09;1.2 CSS 背景属性&#xff08;Background&#xff09;1.3 CSS 边框属性&#xff08;Border 和 Outline&#xff09;1.4 Box 属性1.5 Color 属性1.6 Content for Paged Media 属性1.7 CSS 尺寸属性…