Flink中的时间和窗口

在批处理统计中,我们可以等待一批数据都到齐后,统一处理。但是在实时处理统计中,我们是来一条就得处理一条,那么我们怎么统计最近一段时间内的数据呢?引入“窗口”。

所谓的“窗口”,一般就是划定的一段时间范围,也就是“时间窗”;对在这范围内的数据进行处理,就是所谓的窗口计算。所以窗口和时间往往是分不开的。

一.窗口(Window)

1.1 窗口的概念

Flink是一种流式计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。想要更加方便高效地处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(Window)。

正确理解:在Flink中,窗口其实并不是一个“框”,应该把窗口理解成一个“桶”。在Flink中,窗口可以把流切割成有限大小的多个“存储桶” (bcket): 每个数据都会分发到对应的桶中,当到达窗口结束时间时,就对每个桶中收集的数据进行计算处理。

且Flink 中的窗口并不是事先创建好的,而是动态创建的。当有落在窗口范围中的数据到达时才会创建对应的窗口。

例如需要将数据按照时间进行统计计算,就可以将数据按小时进行分桶,0点~1点放在一个桶中,1点~两点放到一个桶中。

窗口是由窗口分配器和窗口函数组成的。

1.2 窗口的分类

Flink 中除了最简单的时间窗口外,还可以使用各种不同类型的窗口来实现需求。

1.2.1 按照驱动(度量)类型分

窗口其实截取有界流的一种方式,如何定义截取的开始时机和结束时机,这就叫做窗口的驱动类型。

(1) 时间窗口(Time Window)

时间窗口就是以时间点来定义窗口的开始和结束,截取出的就是某一时间段的数据。到达结束时间,窗口则不再继续收集数据,触发计算输出结果,并将窗口销毁关闭。

时间窗口并不是以第一条数据来的时间+窗口长度为一个窗口,而且整数向下取整。

例如:

一个基于时间的窗口,且窗口长度为7。

1分12秒一条数据达到,其实这条数据不属于 [ 12 ,19 ),而是属于 [ 10,20 )。

(2) 计数窗口(Count Window)

计数窗口基于元素的个数来截取数据,到达固定的个数时就触发计算并关闭窗口。每个窗口截取数据的个数,就是窗口的大小。

1.2.2 按照窗口分配数据的规则分类

根据分配数据的规则,窗口的具体实现可以分为4类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)、会话窗口(Session Window),以及全局窗口(Global Window)。

(1)  滚动窗口(Tumbling Window)

滚动窗口有固定的大小,是一种对数据的“均匀切分”的划分方式。窗口间不会重叠,也不会产生间隔,每个数据只会属于一个窗口。

滚动窗口可以根据时间和数据个数定义,需要的参数就是窗口大小(window size)。例如可以定义长度为1小时的滚动窗口,则每小时会进行一次统计,也可以定义一个长度为10的滚动计数窗口,则每10个数会进行一次统计。

应用:对每个时间段做聚合统计。

(2) 滑动窗口(Sliding Window)

滑动窗口的大小也是固定的,当窗口间并不一定是无缝连接的,可以错开一定的位置。

定义滑动窗口的参数有两个:除去窗口大小 (window size)之外,还有一个“滑动步长”(window slide)它其实就代表了窗口计算的频率。窗口在结束时间触发计算输出结果,那么滑动步长就代表了计算频率。

4b19a6aefdd84b69b8f1df4b61692ad2.png

当滑动步长小于窗口大小时,滑动窗口就会出现重叠,这时数据也可能会被同时分配到多个窗口中。而具体的个数就由窗口大小和滑动步长的比值 (size/slide) 来决定。
滚动窗口也可以看作是一种特殊的滑动窗口一-窗口大小等于滑动步长 (size =slide)。

滑动窗口适合计算结果更新频率非常高的场景。

同样的,滑动窗口也支持以时间和数据个数来定义。

(3) 会话窗口(Session Window)

会话窗口,是基于“会话” (session) 来来对数据进行分组的。会话窗口只能基于时间来定义

会话窗口中,最重要的参数就是会话的超时时间,也就是两个会话窗口之间的最小距离。如果两条数据达到的间隔小于定义的会话超时时间,那为保持会话,数据都属于同一个窗口;如果两条数据达到的间隔大于定义的会话超时时间,则为两个不同的会话,数据也就不在一个窗口。

(4) 全局窗口(Global Window) 

这种窗口全局有效,会把相同key的所有数据都分配到同一个窗口中。这种窗口没有结束的时候,
默认是不会做触发计算的。如果希望它能对数据进行计算处理,还需要自定义“触发器”
(Tiigger)

1.2.3 四种时间窗口的演示

(1)  滚动窗口

 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("xxx.xxx.xxx.xxx", 1234).map(new MyMapFunctionImpl());KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(WaterSensor::getId);// 1、指定 窗口分配器 使用滚动窗口,窗口长度为10s,每10s的数据在一个窗口内WindowedStream<WaterSensor, String, TimeWindow> sensorWs = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));SingleOutputStreamOperator<String> process = sensorWs.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {/**** @param s The key for which this window is evaluated. 该窗口的 Key* @param context The context in which the window is being evaluated. 窗口上下文* @param elements The elements in the window being evaluated.  窗口中所有的数据* @param out A collector for emitting elements.    采集器* @throws Exception*/@Overridepublic void process(String s,ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context,Iterable<WaterSensor> elements,Collector<String> out) throws Exception {long start = context.window().getStart();long end = context.window().getEnd();String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss SSS");String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss SSS");long count = elements.spliterator().estimateSize();out.collect("窗口的开始时间:" + windowStart + "--窗口的结束时间:" + windowEnd + " --key 为" + s + "的窗口数据包含[" + elements.toString() + "] " + count + "条数据");}});process.print();env.execute();}

输入:

[root@VM-55-24-centos ~]# nc -lk 1234
s1,1,1
s1,2,2
s1,3,3
s1,4,4
s1,5,5
s1,6,6

结果: 

e4dcf37acee1497c8d0413ebe1be4e0c.png

 (2) 滑动窗口

// 2、 滑动窗口,窗口长度为10s,滑动步长为 5s (窗口重叠 5s)
WindowedStream<WaterSensor, String, TimeWindow> sensorWs = sensorKS.window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)));

输入:

[root@VM-55-24-centos ~]# nc -lk 1234
s1,1,1
s1,2,2
s1,3,3
s1,4,4
s1,5,5
s1,6,6
s1,7,7

 输出:

787271a483cf4d4a9e486c48cd1ad49f.png

(3) 会话窗口 

// 3、会话窗口,会话超时时间为 10s
WindowedStream<WaterSensor, String, TimeWindow> sensorWs = sensorKS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)));

输入:

[root@VM-55-24-centos ~]# nc -lk 1234
s1,1,1
s1,2,2
s1,3,3// 等待10ss1,4,4
s1,5,5
s1,6,6// 等待十秒s1,7,7

输出:

eb361de4008d49e8beccc2ef89766dd4.png

 (4) 动态会话窗口

// 4、动态会话窗口,可以动态指定会话超时时间
WindowedStream<WaterSensor, String, TimeWindow> sensorWs = sensorKS.window(ProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<WaterSensor>() {@Overridepublic long extract(WaterSensor element) {// 根据数据中的属性自定义指定会话超时间,会话单位是毫秒// 以数据中的 vc * 1000 毫秒为会话超时间return element.getVc() * 1000;}}
));

1.2.4 两种计数窗口的演示

(1) 滚动窗口

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("xxx.xxx.xxx.xxx", 1234).map(new MyMapFunctionImpl());KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(WaterSensor::getId);// 计数窗口// 滚动窗口:每3条为一个窗口WindowedStream<WaterSensor, String, GlobalWindow> sensorWs = sensorKS.countWindow(3);SingleOutputStreamOperator<String> process = sensorWs.process(new ProcessWindowFunction<WaterSensor, String, String, GlobalWindow>() {@Overridepublic void process(String s, ProcessWindowFunction<WaterSensor, String, String, GlobalWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {out.collect("窗口包含==>" + elements.toString() + "==>" + elements.spliterator().estimateSize() + "条数据");}});process.print();env.execute();
}

 输入:

[root@VM-55-24-centos ~]# nc -lk 1234
s1,1,1
s1,2,2
s1,3,3,
s1,4,4
s1,5,5
s1,6,6

输出:

e77d950e982b4042815b8a99a2d0aa67.png

(2) 滑动窗口

// 滑动窗口:窗口长度为3,滑动步长为1
WindowedStream<WaterSensor, String, GlobalWindow> sensorWs = sensorKS.countWindow(3,1);

输入:

[root@VM-55-24-centos ~]# nc -lk 8877
s1,1,1
s1,2,2
s1,3,3
s1,4,4

输出:

1e0709e265f94186a07e11391a4cd2af.png

1.3 窗口API概览

(1) 按键分区(Keyed)和非按键分区(Non-Keyed)

在定义窗口操作之前,首先要确定数据流有没有进行 KeyBy 操作。

(1.1) 非按键分区(Non-Keyed Windows)

如果没有进行keyBy,那么原始的DataStream就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了1。

基于DataStream调用.windowAll()定义窗口

stream.windowAll(...)

(1.2) 按键分区窗口(Keyed Windows)

经过按键分区keyBy操作后,数据流会按照key被分为多条逻辑流(logical streams),这就是KeyedStream。基于KeyedStream进行窗口操作时,窗口计算会在多个并行子任务上同时执行。相同key的数据会被发送到同一个并行子任务,而窗口操作会基于每个key进行单独的处理。所以可以认为,每个key上都定义了一组窗口,各自独立地进行统计计算。

例如有 Key分别为红、黄、蓝的三种数据,需要按照时间分桶,则在1点~2点之间,红、黄、蓝会各自单独创建一个桶,桶与桶之间互不干扰,到下一个时间点,则会各自创建对应的桶。

需要先对DataStream调用.keyBy()进行按键分区,然后再调用.window()定义窗口。

stream.keyBy(...).window(...)

(2) 代码中窗口API的调用

窗口操作主要有两个部分:窗口分配器(Window Assigners)窗口函数(Window Functions)

stream.keyBy(<key selector>).window(<window assigner>).aggregate(<window function>)

其中.window()方法需要传入一个窗口分配器,它指明了窗口的类型;而后面的.aggregate()方法传入一个窗口函数作为参数,它用来定义窗口具体的处理逻辑。窗口分配器有各种形式,而窗口函数的调用方法也不只.aggregate()一种

1.4 窗口分配器

定义窗口分配器(Window Assigners)是构建窗口算子的第一步,它的作用就是定义数据应该被“分配”到哪个窗口。所以可以说,窗口分配器其实就是在指定窗口的类型

窗口按照驱动类型可以分成时间窗口计数窗口,而按照具体的分配规则,又有滚动窗口滑动窗口会话窗口全局窗口四种。除去需要自定义的全局窗口外,其他常用的类型Flink中都给出了内置的分配器实现,我们可以方便地调用实现各种需求。

窗口分配器就是根据是否进行了 KeyBy 操作,直接调用 window() / windowAll()。

... sensorKS = sensorDS.keyBy(WaterSensor::getId);
// 1.1 基于时间的窗口
// 滚动窗口,窗口长度为10s,每10s的数据在一个窗口内
sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
// 滑动窗口,窗口长度为10s,滑动步长为2s(窗口重叠2s)
sensorKS.window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(2)));
//  会话窗口,会话间隔为10s
sensorKS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)));// 1.2 基于计数的窗口
//  滚动窗口,窗口长度为10个元素(每10个元素在一个窗口内)
sensorKS.countWindow(10); 
// 滑动窗口,窗口长度为10个元素,滑动步长为2个元素
sensorKS.countWindow(10,2); 
// 全局窗口,计数窗口的底层实现,自定义窗口时使用
sensorKS.window(GlobalWindows.create()); 

1.5 窗口函数

第一步用窗口分配器将数据收集在窗口中后,则需要定义窗口函数对窗口收集的数据进行计算操作。

db205ab642014be2b3041472f1e8a7f7.png

窗口函数根据处理的方式可以分为两类:增量聚合函数全窗口函数

1.5.1 增量聚合函数(ReduceFunction / AggregateFunction)

窗口将数据收集起来,最基本的处理操作就是进行聚合。每来一条数据,就在之前的结果上聚合一次,这就是“增量聚合”。

典型的增量聚合函数有两个:ReduceFunctionAggregateFunction

(1) 规约函数(ReduceFunction)

需求案例:读取 Socket 的水位数据,计算每30s中的VC 累加和 , 并在窗口触发时输出结果。

/*** 窗口函数:增量聚合 Reduce*/
public class WindowReduceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("xxx.xxx.xxx.xxx", 1234).map(new MyMapFunctionImpl());KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(WaterSensor::getId);// 1、指定 窗口分配器 使用滚动窗口,窗口长度为30s,每30s的数据在一个窗口内WindowedStream<WaterSensor, String, TimeWindow> sensorWs = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(30)));// 2、窗口函数: 增量聚合 Reduce/***  窗口函数的 Reduce :*      1、相同的 Key 的第一条数据来的时候,不会调用 Reduce 方法*      2、增量聚合的意思是:数据来一条就基于上次的结果计算一次,但不会输出*      3、在窗口触发(结束)时,才会输出窗口的最终计算结果*/SingleOutputStreamOperator<WaterSensor> reduce = sensorWs.reduce(new ReduceFunction<WaterSensor>() {@Overridepublic WaterSensor reduce(WaterSensor v1, WaterSensor v2) throws Exception {System.out.println("调用 Reduce 方法:上一条数据:" + v1 + "-----当前数据:" + v2);return new WaterSensor(v1.getId(), v1.getTs(), v1.getVc() + v2.getVc());}});// 输出窗口计算结果reduce.print();env.execute();}
}

输入:

/***  30s内输入完成*     *  因为使用的KeyBy,相同Key才会被分配到一个窗口中,所以id要一致  *      */
[root@VM-55-24-centos ~]# nc -lk 1234
s1,1,1
s1,2,2
s1,3,3
s1,4,4
s1,5,5
s1,6,6

结果:

调用 Reduce 方法:上一条数据:WaterSensor{id='s1', ts=1, vc=1}-----当前数据:WaterSensor{id='s1', ts=2, vc=2}
调用 Reduce 方法:上一条数据:WaterSensor{id='s1', ts=1, vc=3}-----当前数据:WaterSensor{id='s1', ts=3, vc=3}
调用 Reduce 方法:上一条数据:WaterSensor{id='s1', ts=1, vc=6}-----当前数据:WaterSensor{id='s1', ts=4, vc=4}
调用 Reduce 方法:上一条数据:WaterSensor{id='s1', ts=1, vc=10}-----当前数据:WaterSensor{id='s1', ts=5, vc=5}
调用 Reduce 方法:上一条数据:WaterSensor{id='s1', ts=1, vc=15}-----当前数据:WaterSensor{id='s1', ts=6, vc=6}
WaterSensor{id='s1', ts=1, vc=21}

Reduce小结:

  1. 相同的 Key 的第一条数据来的时候,不会调用 Reduce 方法

  2. 增量聚合的意思是:数据来一条就基于上次的结果计算一次,但不会输出

  3. 在窗口触发(结束)时,才会输出窗口的最终计算结果

(2) 聚合函数(AggregateFunction)

ReduceFunction可以解决大多数归约聚合的问题,而AggregateFunction可以看作是ReduceFunction的通用版本,所以AggregateFunction使用更为灵活,在ReduceFunction中,输入类型、中间状态存储类型、输出类型都必要保持类型一致,而AggregateFunction有三种类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT),三种类型都可以不同

与上面需求一致:

/*** 窗口函数:增量聚合 Aggregate */
public class WindowAggregateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("xxx.xxx.xxx.xxx", 1234).map(new MyMapFunctionImpl());KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(WaterSensor::getId);// 1、指定 窗口分配器 使用滚动窗口,窗口长度为30s,每30s的数据在一个窗口内WindowedStream<WaterSensor, String, TimeWindow> sensorWs = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(30)));// 2、窗口函数: 增量聚合 Aggregate/*** 1、本窗口的第一条数据达到时,创建窗口、初始化累加器* 2、增量聚合:数据来一条计算一次(调用add方法)* 3、窗口输出时调用一次getResult方法* 4、输入类型、累加器、输出类型 三者可以类型不一致*/SingleOutputStreamOperator<String> aggregate = sensorWs.aggregate(new AggregateFunction<WaterSensor, Integer, String>() { // 输入类型, 累加器类型(存储中间计算值), 输出类型/***  初始化累加器*/@Overridepublic Integer createAccumulator() {System.out.println("初始化累加器");return 0;}/***  具体的聚合逻辑*/@Overridepublic Integer add(WaterSensor waterSensor, Integer integer) {System.out.println("调用add方法 当前数据:" + waterSensor);return integer + waterSensor.getVc();}/***  窗口触发时获取最终计算结果并输出*/@Overridepublic String getResult(Integer integer) {System.out.println("调用getResult方法");return integer.toString();}@Overridepublic Integer merge(Integer integer, Integer acc1) {// 只有会话窗口才会调用System.out.println("调用merge方法");return null;}});// 输出窗口计算结果aggregate.print();env.execute();}
}

输入:

/***  30s内输入完成 */[root@VM-55-24-centos ~]# nc -lk 1234
s1,1,1
s1,2,2
s1,3,3
s1,4,4
s1,5,5
s1,6,6
s1,7,7

输出结果:

初始化累加器
调用add方法 当前数据:WaterSensor{id='s1', ts=1, vc=1}
调用add方法 当前数据:WaterSensor{id='s1', ts=2, vc=2}
调用add方法 当前数据:WaterSensor{id='s1', ts=3, vc=3}
调用add方法 当前数据:WaterSensor{id='s1', ts=4, vc=4}
调用add方法 当前数据:WaterSensor{id='s1', ts=5, vc=5}
调用add方法 当前数据:WaterSensor{id='s1', ts=6, vc=6}
调用add方法 当前数据:WaterSensor{id='s1', ts=7, vc=7}
调用getResult方法
28

Aggregate 小结:

1.三个需实现的接口,一个会话窗口需实现的接口      

createAccumulator():创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。

merge():合并两个累加器,并将合并后的状态作为一个累加器返回。(会话窗口使用)

getResult():从累加器中提取聚合的输出结果。

add():将输入的元素添加到累加器中。

2.本窗口的第一条数据达到时,创建窗口、初始化累加器

3.增量聚合:数据来一条计算一次(调用add方法)

4.窗口输出时调用一次getResult方法

5.输入类型、累加器、输出类型 三者可以类型不一致

1.5.2 全窗口函数(full window functions)

全窗口函数与增量聚合函数不同,增量聚合函数是数据来一条处理一条,而全窗口函数是将数据全部收集起来,等到窗口触发时才统一计算。并且全窗口函数的可以获取更多的信息,例如窗口的上下文信息(比如窗口的结束时间)。

在Flink中,全窗口函数也有两种:WindowFunction(不推荐)ProcessWindowFunction

(1) 窗口函数(WindowFunction

WindowFunction 是老版本的通用窗口函数接口,但是没有提供更多的信息,也没有提供高级的功能,所以不推荐使用,可以被ProcessWindowFunction全覆盖。

stream.keyBy(<key selector>).window(<window assigner>).apply(new MyWindowFunction());

(2)  处理窗口函数(ProcessWindowFunction)

ProcessWindowFunction是Window API中最底层的通用窗口函数接口。除了可以拿到窗口中的所有数据之外,ProcessWindowFunction还可以获取到一个“上下文对象”(Context)。这个上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。这就使得ProcessWindowFunction更加灵活、功能更加丰富,其实就是一个增强版的WindowFunction。

/*** 窗口函数:全窗口函数 Process*/
public class WindowProcessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("xxx.xxx.xxx.xxx", 1234).map(new MyMapFunctionImpl());KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(WaterSensor::getId);// 1、指定 窗口分配器 使用滚动窗口,窗口长度为10s,每10s的数据在一个窗口内WindowedStream<WaterSensor, String, TimeWindow> sensorWs = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(30)));SingleOutputStreamOperator<String> process = sensorWs.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {/**** @param s The key for which this window is evaluated. 该窗口的 Key* @param context The context in which the window is being evaluated. 窗口上下文* @param elements The elements in the window being evaluated.  窗口中所有的数据* @param out A collector for emitting elements.    采集器* @throws Exception*/@Overridepublic void process(String s, ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long start = context.window().getStart();long end = context.window().getEnd();String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss SSS");String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss SSS");long count = elements.spliterator().estimateSize();out.collect("窗口的开始时间:" + windowStart + "--窗口的结束时间:" + windowEnd + " --key 为" + s + "的窗口数据包含[" + elements.toString() + "] " + count + "条数据");}});process.print();env.execute();
}

输入:

[root@VM-55-27-centos ~]# nc -lk 1234
s1,1,1
s1,2,2
s1,3,3
s1,4,4
s1,5,5

输出:

窗口的开始时间:2023-11-18 11:13:30 000--窗口的结束时间:2023-11-18 11:13:00 000 
--key 为s1的窗口数据包含
[
[WaterSensor{id='s1', ts=1, vc=1}, 
WaterSensor{id='s1', ts=2, vc=2}, 
WaterSensor{id='s1', ts=3, vc=3},
WaterSensor{id='s1', ts=4, vc=4}, 
WaterSensor{id='s1', ts=5, vc=5}]
] 
5条数据

1.5.3 增量聚合和全窗口函数的结合使用

增量函数的优点是数据来一条处理一条,只存储中间计算值,所以占用的空间少。而全窗口需要储存窗口内的所有数据,最后再进行统一计算,但可以上下文获取到更多的窗口信息。在实际开发中,则可以结合这两者的优点。

在调用WindowedStream的.reduce()和.aggregate()方法时,不止可以传入一个ReduceFunction或AggregateFunction进行增量聚合,其实还可以传入第二个参数:一个全窗口函数,可以是WindowFunction或者ProcessWindowFunction。

public class WindowAggregateAndProcessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("xxx.xxx.xxx.xxx", 1234).map(new MyMapFunctionImpl());KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(WaterSensor::getId);// 1、指定 窗口分配器 使用滚动窗口,窗口长度为10s,每10s的数据在一个窗口内WindowedStream<WaterSensor, String, TimeWindow> sensorWs = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(30)));/*** 增量聚合函数 与 全窗口函数 一起使用*/SingleOutputStreamOperator<String> aggregate = sensorWs.aggregate(new MyAggregateFunc(),new MyProcessFunc());// 输出窗口计算结果aggregate.print();env.execute();}private static class MyAggregateFunc implements AggregateFunction<WaterSensor, Integer, String>{@Overridepublic Integer createAccumulator() {System.out.println("初始化累加器");return 0;}@Overridepublic Integer add(WaterSensor waterSensor, Integer integer) {System.out.println("调用add方法 当前数据:" + waterSensor);return integer + waterSensor.getVc();}@Overridepublic String getResult(Integer integer) {System.out.println("调用getResult方法");return integer.toString();}@Overridepublic Integer merge(Integer integer, Integer acc1) {System.out.println("调用merge方法");return null;}}private static class MyProcessFunc extends ProcessWindowFunction<String,String,String, TimeWindow> {@Overridepublic void process(String s, ProcessWindowFunction<String, String, String, TimeWindow>.Context context, Iterable<String> elements, Collector<String> out) throws Exception {long start = context.window().getStart();long end = context.window().getEnd();String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss SSS");String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss SSS");long count = elements.spliterator().estimateSize();out.collect("窗口的开始时间:" + windowStart + "--窗口的结束时间:" + windowEnd + " --key 为" + s + "的窗口数据包含" + elements + " " + count + "条数据");}}
}

输入:

[root@VM-55-24-centos ~]# nc -lk 1234
s1,1,1
s1,2,2
s1,3,3
s1,4,4
s1,5,5

输出:

初始化累加器
调用add方法 当前数据:WaterSensor{id='s1', ts=1, vc=1}
调用add方法 当前数据:WaterSensor{id='s1', ts=2, vc=2}
调用add方法 当前数据:WaterSensor{id='s1', ts=3, vc=3}
调用add方法 当前数据:WaterSensor{id='s1', ts=4, vc=4}
调用add方法 当前数据:WaterSensor{id='s1', ts=5, vc=5}
调用getResult方法
窗口的开始时间:2023-11-18 11:51:30 000--窗口的结束时间:2023-11-18 11:52:00 000 --key 为s1的窗口数据包含[15] 1条数据

增量聚合和全窗口函数的结合使用的效果:增量聚合结束后将计算结果(只有一条)发给全窗口函数进行处理。

二.时间语义

2.1 Flink中的时间语义

数据在网络传输中会存在一定的延迟,也意味着数据从生产到Flink真正处理的时间也存在延迟。数据被生产的时刻则被称为“事件时间”,数据被Flink真正处理的时刻被称为“处理时间”,到底以哪一种时间作为衡量标准,就是所谓的“时间语义”。

在实际应用中,事件时间往往会被作为参数传递,例如MySQL数据表的 create_time 字段,或者是前端传来的时间戳。使用事件时间更能保证数据的准确性。

从 Flink 1.12 版本开始,事件时间为默认的时间语义

三. 水位线(Watermark)

3.1事件时间和窗口

在窗口的处理过程中,我们可以基于数据的时间戳,自定义一个“逻辑时钟”。这个时钟代表的是数据的时间进展,而不会随着系统时间而自动流逝,而是靠新数据的时间戳来推动的,且只会向前推进

这样的好处在于,在 Flink 的计算过程中可以完全不依赖系统时间,不论何时进行统计处理,都可以靠着逻辑时钟保证窗口计算的正确性。

3.2 什么是水位线

在Flink中,用来衡量事件时间进展的标记,就被称作“水位线”(Watermark)。

具体实现上,水位线其实就是一个时间戳,作为数据流的标记,用来指示当前的事件时间,当某个数据到来之后,就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。

水位线是会被 Flink 存储的。

(1) 有序流中的水位线 

1、理想状态(数据量小):数据可以按照生成顺序进入流中,每条数据产生一个水位线。

844bbbf2d8f0402d9ee361521850ecd6.png

2、实际应用中,如果当前数据量非常大,数据间的时间差非常小, 如果也按照每条数据产生一个水位线则会非常影响效率。所以为了提高效率,一般可以每隔一段时间生成一个水位线。        

dc5c70f9e28c425caa719d8d13a6dd13.png

(2) 乱序流中的水位线 

在分布式系统中,由于网络传输的不确定性,可能导致数据达到的时间并不是有序的,这就是“乱序数据”。

585812db7c7e4806845f0a453cdabc9f.png

1、乱序流+数据量小

还是靠数据来驱动,来一条数据就提取其时间戳作为水位线插入,不过现在是乱序数据,在生成水位线前,需要先判断当前数据的时间戳是否大于之前的水位线,如果大于才生成新的水位线,否则就不生成新的水位线。也就是说,只有数据的时间戳比当前时钟大,才能推动时钟前进,这时才插入水位线。

bdc9e199a3854d2bb1b19807d1764d99.png

2、乱序流+数据量大

数据量大则可以周期性的生成水位线来提升效率。并且保存之前数据的最大时间戳,需要插入水位线时,可以将这个最大时间戳作为水位线插入。

eb31ed56b366482f999058bd5fc26ca0.png

3、乱序流+迟到数据

迟到数据指的是,例如有一个第9秒生产的数据,但是第11秒才到达Flink,那么则会落到[ 10 , 20 ) 的窗口中。

我们无法正确处理“迟到”的数据。为了让窗口能够正确收集到迟到的数据,我们也可以等上一段时间,比如2秒;也就是用当前已有数据的最大时间戳减去2秒,就是要插入的水位线的时间戳。这样的话,9秒的数据到来之后,事件时钟不会直接推进到9秒,而是进展到了7秒。必须等到11秒的数据到来之后,事件时钟才会进展到9秒,这时迟到数据也都已收集齐,0~9秒的窗口就可以正确计算结果了。

3841584f0eed4cb297e2f907fd60f366.png

现在我们知道,水位线就代表了当前的事件时间时钟,而且可以在数据的时间戳基础上加一些延迟来保证不丢数据,这一点对于乱序流的正确处理非常重要 

3.3 水位线和窗口的工作原理

在 Flink 中,窗口其实并不是一个固定位置的框,而是理解为一个“”,在Flink中,窗口可以把流切割成有限大小的多个“存储桶”(对应窗口);每个数据都会分发到对应的桶中,当到达窗口结束时间时,就对每个桶中收集的数据进行计算处理。

678638f52b944bafb1f751d399acc9a5.png

3.4 生成水位线

3.4.1 生成水位线的总体原则

完美的水位线是“绝对正确”的,也就是一个水位线一旦出现,就表示这个时间之前的数据已经全部到齐、之后再也不会出现了。不过如果要保证绝对正确,就必须等足够长的时间,这会带来更高的延迟。

如果希望处理更快、实时性更强,则可以将水位线延迟设置得低些,不过这样会导致很多迟到数据被窗口遗漏,计算结果不准确;如果计算结果的准确性有要求,则可以将水位线延迟设置得高些,这样会导致处理延迟增加。

所以Flink中的水位线,其实是流处理中对低延迟和结果正确性的一个权衡机制,而且把控制的权力交给了程序员,我们可以在代码中定义水位线的生成策略。

3.4.2 水位线生成策略

在 Flink 的 DataStream API 中,有一个单独用于生成水位线的方法:.assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间。

DataStream<Event> stream = env.addSource(new DataSource());DataStream<Event> withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(<WatermarkStrategy>);

说明:WatermarkStrategy作为参数,这就是所谓的“水位线生成策略”。WatermarkStrategy是一个接口,该接口中包含了一个“时间戳分配器”TimestampAssigner和一个“水位线生成器”WatermarkGenerator。

3.4.3 Flink内置水位线

(1) 有序流(时间戳单调递增)中内置水位线设置

对于有序流,主要特点就是时间戳单调增长,而不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景,直接调用WatermarkStrategy.forMonotonousTimestamps()方法就可以实现。

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("xxx.xxx.xxx.xxx", 1234).map(new MyMapFunctionImpl());// ***定义 WaterMark 策略WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy = WatermarkStrategy// 单调递增的事件时间,没有延迟时间.<WaterSensor>forMonotonousTimestamps()// 指定 时间戳分配器,从数据中提取.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {/**** @param waterSensor 当前数据* @param l* @return*/@Overridepublic long extractTimestamp(WaterSensor waterSensor, long l) {System.out.println("当前数据:" + waterSensor + " ==> l:" + l);// 从数据中返回的时间戳(毫秒))return waterSensor.getTs() * 1000L;}});// ***指定 watermark策略SingleOutputStreamOperator<WaterSensor> sensorWithWaterMark = sensorDS.assignTimestampsAndWatermarks(waterSensorWatermarkStrategy);sensorWithWaterMark.keyBy(WaterSensor::getId)// ***使用事件时间的窗口,而非处理时间的窗口 10s的滚动窗口.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s,ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context,Iterable<WaterSensor> elements,Collector<String> out) throws Exception {long start = context.window().getStart();long end = context.window().getEnd();String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss SSS");String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss SSS");long count = elements.spliterator().estimateSize();out.collect("窗口的开始时间:" + windowStart + "--窗口的结束时间:" + windowEnd + " --key 为" + s + "的窗口数据包含[" + elements.toString() + "] " + count + "条数据");}}).print();env.execute();
}

输入:

[root@VM-55-24-centos ~]# nc -lk 1234
s1,1,1
s1,2,2
s1,3,3
s1,4,4
s1,7,7
s1,9,9
s1,10,10
s1,15,15
s1,20,20

输出:

当前数据:WaterSensor{id='s1', ts=1, vc=1} ==> l:-9223372036854775808
当前数据:WaterSensor{id='s1', ts=2, vc=2} ==> l:-9223372036854775808
当前数据:WaterSensor{id='s1', ts=3, vc=3} ==> l:-9223372036854775808
当前数据:WaterSensor{id='s1', ts=4, vc=4} ==> l:-9223372036854775808
当前数据:WaterSensor{id='s1', ts=7, vc=7} ==> l:-9223372036854775808
当前数据:WaterSensor{id='s1', ts=9, vc=9} ==> l:-9223372036854775808
当前数据:WaterSensor{id='s1', ts=10, vc=10} ==> l:-9223372036854775808
窗口的开始时间:1970-01-01 08:00:00 000--窗口的结束时间:1970-01-01 08:00:10 000 --key 为s1的窗口数据包含[[WaterSensor{id='s1', ts=1, vc=1}, WaterSensor{id='s1', ts=2, vc=2}, WaterSensor{id='s1', ts=3, vc=3}, WaterSensor{id='s1', ts=4, vc=4}, WaterSensor{id='s1', ts=7, vc=7}, WaterSensor{id='s1', ts=9, vc=9}]] 6条数据当前数据:WaterSensor{id='s1', ts=15, vc=15} ==> l:-9223372036854775808
当前数据:WaterSensor{id='s1', ts=20, vc=20} ==> l:-9223372036854775808
窗口的开始时间:1970-01-01 08:00:10 000--窗口的结束时间:1970-01-01 08:00:20 000 --key 为s1的窗口数据包含[[WaterSensor{id='s1', ts=10, vc=10}, WaterSensor{id='s1', ts=15, vc=15}]] 2条数据

有序流的水位线设置非常简单,就是把数据中表示事件时间的属性返回

(2) 乱序流中内置水位线设置

由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间。这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟时间的结果再-1。调用WatermarkStrategy. forBoundedOutOfOrderness()传入最大延迟时间。

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("xxx.xxx.xxx.xxx", 1234).map(new MyMapFunctionImpl());// ***定义 WaterMark 策略WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy = WatermarkStrategy// 乱序的事件时间,需设置最大等待时间(当前窗口水位线 = 当前窗口最大事件时间 - 等待时间).<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 设置最大等待时间为3s// 指定 时间戳分配器,从数据中提取.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {/**** @param waterSensor 当前数据* @param l* @return*/@Overridepublic long extractTimestamp(WaterSensor waterSensor, long l) {System.out.println("当前数据:" + waterSensor + " ==> l:" + l);// 返回的时间戳(毫秒))return waterSensor.getTs() * 1000L;}});// ***指定 watermark策略SingleOutputStreamOperator<WaterSensor> sensorWithWaterMark = sensorDS.assignTimestampsAndWatermarks(waterSensorWatermarkStrategy);sensorWithWaterMark.keyBy(WaterSensor::getId)// ***使用事件时间的窗口,而非处理时间的窗口 10s的滚动窗口.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s,ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context,Iterable<WaterSensor> elements,Collector<String> out) throws Exception {long start = context.window().getStart();long end = context.window().getEnd();String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss SSS");String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss SSS");long count = elements.spliterator().estimateSize();out.collect("窗口的开始时间:" + windowStart + "--窗口的结束时间:" + windowEnd + " --key 为" + s + "的窗口数据包含[" + elements.toString() + "] " + count + "条数据");}}).print();env.execute();}

输入:

[root@VM-55-24-centos ~]# nc -lk 1234
s1,1,1    // 水位线 = -2
s1,2,2    // 水位线 = -1
s1,6,6    // 水位线 = 3
s1,8,8    // 水位线 = 5
s1,5,5    // 水位线 = 2
s1,9,9    // 水位线 = 6
s1,10,10  // 水位线 = 7
s1,7,7    // 水位线 = 4
s1,12,12  // 水位线 = 9
s1,3,3    // 水位线 = 0
s1,13,13  // 水位线 = 10 窗口触发关闭

输出:

当前数据:WaterSensor{id='s1', ts=1, vc=1} ==> l:-9223372036854775808
当前数据:WaterSensor{id='s1', ts=2, vc=2} ==> l:-9223372036854775808
当前数据:WaterSensor{id='s1', ts=6, vc=6} ==> l:-9223372036854775808
当前数据:WaterSensor{id='s1', ts=8, vc=8} ==> l:-9223372036854775808
当前数据:WaterSensor{id='s1', ts=5, vc=5} ==> l:-9223372036854775808
当前数据:WaterSensor{id='s1', ts=9, vc=9} ==> l:-9223372036854775808
当前数据:WaterSensor{id='s1', ts=10, vc=10} ==> l:-9223372036854775808
当前数据:WaterSensor{id='s1', ts=7, vc=7} ==> l:-9223372036854775808
当前数据:WaterSensor{id='s1', ts=12, vc=12} ==> l:-9223372036854775808
当前数据:WaterSensor{id='s1', ts=3, vc=3} ==> l:-9223372036854775808
当前数据:WaterSensor{id='s1', ts=13, vc=13} ==> l:-9223372036854775808
窗口的开始时间:1970-01-01 08:00:00 000--窗口的结束时间:1970-01-01 08:00:10 000 --key 为s1的窗口数据包含[[WaterSensor{id='s1', ts=1, vc=1}, WaterSensor{id='s1', ts=2, vc=2}, WaterSensor{id='s1', ts=6, vc=6}, WaterSensor{id='s1', ts=8, vc=8}, WaterSensor{id='s1', ts=5, vc=5}, WaterSensor{id='s1', ts=9, vc=9}, WaterSensor{id='s1', ts=7, vc=7}, WaterSensor{id='s1', ts=3, vc=3}]] 8条数据

(3) 内置水位线生成原理

  • 都是周期性生成水位线的,默认200ms
  • 有序流水位线生成:当前最大事件时间 - 1ms
  • 乱序流水位线生成:当前最大事件时间 - 最大等待时间 - 1ms
  • 有序流就是一种特殊的乱序流,最大等待时间为0

3.4.4 自定义水位线生成器 

(1) 周期性水位线生成器(Periodic Generator)

需要自定义周期性水位线生成器则可以实现WatermarkGenerator<T>中的onEvent 记录事件时间、onPeriodicEmit 周期性生成水位线。

public class MyPeriodWatermarkGenerator<T> implements WatermarkGenerator<T> {private Long maxTs; // 当前最大时间戳private Long delayTime; // 等待时间public MyPeriodWatermarkGenerator(Long delayTime) {this.delayTime = delayTime;maxTs = Long.MIN_VALUE - this.delayTime - 1 ;}/*** 每条数据来都会调用一次该方法。主要用于记录、更新当前最大的时间戳* @param t* @param l* @param watermarkOutput*/@Overridepublic void onEvent(T t, long l, WatermarkOutput watermarkOutput) {maxTs = Math.max(maxTs , l);}/*** 周期性调用,主要用于周期性生成 Watermark* @param watermarkOutput*/@Overridepublic void onPeriodicEmit(WatermarkOutput watermarkOutput) {watermarkOutput.emitWatermark(new Watermark(maxTs));}
}

在选择水位线生成器时,则可以使用这个自定义水位线生成器。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();SingleOutputStreamOperator<WaterSensor> sensorDS = env.addSource(new DataSource()).map(new MyMapFunctionImpl());WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy = WatermarkStrategy.<WaterSensor>forGenerator(new WatermarkStrategy<WaterSensor>() {@Overridepublic WatermarkGenerator<WaterSensor> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {// 使用自定义水位生成器并设置等待时间return new MyPeriodWatermarkGenerator<>(3000l);}}) .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {// 事件时间提取器@Overridepublic long extractTimestamp(WaterSensor waterSensor, long l) {return waterSensor.getTs() * 1000L;}
});

(2) 断点式水位线生成器(Punctuated Generator)

断点式生成器会不停地检测onEvent()中的事件,当发现带有水位线信息的事件时,就立即生成水位线。我们把生成水位线的逻辑写在onEvent方法当中即可。

/*** 断点式水位线*/
public class MyPuntuatedWatermarkGenerator<T> implements WatermarkGenerator<T> {private Long maxTs; // 当前最大时间戳private Long delayTime; // 等待时间public MyPuntuatedWatermarkGenerator(Long delayTime) {this.delayTime = delayTime;maxTs = Long.MIN_VALUE - this.delayTime - 1 ;}/*** 每条数据来都会调用一次该方法。主要用于记录、更新当前最大的时间戳,并且立即更新当前水位线* @param t* @param l* @param watermarkOutput*/@Overridepublic void onEvent(T t, long l, WatermarkOutput watermarkOutput) {maxTs = Math.max(maxTs , l);watermarkOutput.emitWatermark(new Watermark(maxTs));}/*** 周期性调用,断点式不需要* @param watermarkOutput*/@Overridepublic void onPeriodicEmit(WatermarkOutput watermarkOutput) {watermarkOutput.emitWatermark(new Watermark(maxTs));}
}

(3) 在数据源中生成水位线

我们可以直接在自定义的数据源中抽取事件时间,然后生成水位线。这里要注意的是,在自定义数据源中发送了水位线以后,就不能再在程序中使用assignTimestampsAndWatermarks方法来生成水位线了。在自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks方法生成水位线二者只能取其一。

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("input/words.txt")).build();// 在数据源中发送水位线 乱序流-延迟3senv.fromSource(fileSource,WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)),"file").print();env.execute();}

3.5  水位线的传递

水位线并不能代表整个程序的处理进度,而是只能代表某个子任务的处理进度,因为水位线会随着数据往下游传递,也就是说不同节点的处理进度是不一样的。

在多并行度下的流处理中,上游任务处理完水位线、时钟改变之后,要把当前的水位线再次发出,广播给所有的下游子任务。而当一个任务接收到多个上游并行任务传递来的水位线时,会以最小的那个作为当前任务的事件时钟,向下游传递。

34230cd1ed604473a9384184af97db50.png

例子:并行度为2,算子链为:source -> map -> 水位线乱序 延迟3s ->  时间滚动窗口 大小为10

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("xxx.xxx.xxx.xxx", 1234).map(new MyMapFunctionImpl());// ***定义 WaterMark 策略WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy = WatermarkStrategy// 乱序的事件时间,需设置最大等待时间(当前窗口水位线 = 当前窗口最大事件时间 - 等待时间).<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 设置最大等待时间为3s// 指定 时间戳分配器,从数据中提取.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {/**** @param waterSensor 当前数据* @param l* @return*/@Overridepublic long extractTimestamp(WaterSensor waterSensor, long l) {System.out.println("当前数据:" + waterSensor + " ==> l:" + l);// 返回的时间戳(毫秒))return waterSensor.getTs() * 1000L;}});// ***指定 watermark策略SingleOutputStreamOperator<WaterSensor> sensorWithWaterMark = sensorDS.assignTimestampsAndWatermarks(waterSensorWatermarkStrategy);sensorWithWaterMark.keyBy(WaterSensor::getId)// ***使用事件时间的窗口,而非处理时间的窗口 10s的滚动窗口.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s,ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context,Iterable<WaterSensor> elements,Collector<String> out) throws Exception {long start = context.window().getStart();long end = context.window().getEnd();String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss SSS");String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss SSS");long count = elements.spliterator().estimateSize();out.collect("窗口的开始时间:" + windowStart + "--窗口的结束时间:" + windowEnd + " --key 为" + s + "的窗口数据包含[" + elements.toString() + "] " + count + "条数据");}}).print();env.execute();
}

 输入:

[root@VM-55-24-centos ~]# nc -lk 1234
s1,1,1
s1,2,2
s1,5,5
s1,7,7
s1,9,9
s1,10,10
s1,11,11
s1,12,12
s1,13,13
s1,14,14

输出:

当前数据:WaterSensor{id='s1', ts=1, vc=1} ==> l:-9223372036854775808
当前数据:WaterSensor{id='s1', ts=2, vc=2} ==> l:-9223372036854775808
当前数据:WaterSensor{id='s1', ts=5, vc=5} ==> l:-9223372036854775808
当前数据:WaterSensor{id='s1', ts=7, vc=7} ==> l:-9223372036854775808
当前数据:WaterSensor{id='s1', ts=9, vc=9} ==> l:-9223372036854775808
当前数据:WaterSensor{id='s1', ts=10, vc=10} ==> l:-9223372036854775808
当前数据:WaterSensor{id='s1', ts=11, vc=11} ==> l:-9223372036854775808
当前数据:WaterSensor{id='s1', ts=12, vc=12} ==> l:-9223372036854775808
当前数据:WaterSensor{id='s1', ts=13, vc=13} ==> l:-9223372036854775808
当前数据:WaterSensor{id='s1', ts=14, vc=14} ==> l:-9223372036854775808
2> 窗口的开始时间:1970-01-01 08:00:00 000--窗口的结束时间:1970-01-01 08:00:10 000 --key 为s1的窗口数据包含[[WaterSensor{id='s1', ts=1, vc=1}, WaterSensor{id='s1', ts=2, vc=2}, WaterSensor{id='s1', ts=5, vc=5}, WaterSensor{id='s1', ts=7, vc=7}, WaterSensor{id='s1', ts=9, vc=9}]] 5条数据

可以看到,在并行度为2下,水位线为10 (13-3) 时,并没有触发窗口输出,这就是因为在多并行度下,一个任务接收到多个上游并行任务传递来的水位线时,会以最小的那个作为当前任务的事件时钟,向下游传递。

分析:

[root@VM-55-24-centos ~]# nc -lk 1234
s1,1,1   // 水位线:-2
s1,2,2   // 水位线: -1,多并行度下,向下传递最小的水位线 -2
s1,5,5   // 水位线: 2, 多并行度下,向下传递最小的水位线 -1
s1,7,7   // 水位线: 4, 多并行度下,向下传递最小的水位线  2
s1,10,10 // 水位线: 7, 多并行度下,向下传递最小的水位线  4
s1,11,11 // 水位线: 8, 多并行度下,向下传递最小的水位线  7
s1,12,12 // 水位线: 9, 多并行度下,向下传递最小的水位线  8
s1,13,13 // 水位线: 10,多并行度下,向下传递最小的水位线  9
s1,14,14 // 水位线: 11,多并行度下,向下传递最小的水位线  10 触发[ 0 , 10 )窗口输出

3.5.1 水位线的空闲等待

在多个上游并行任务中,如果有其中一个没有数据,由于当前Task是以最小的那个作为当前任务的事件时钟,就会导致当前Task的水位线无法推进,就可能导致窗口无法触发。这时候可以设置空闲等待。

例子:将数据以奇偶的规则放在不同的 KeyBy 分区,观察水位线推进

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// !并行度为2
env.setParallelism(2);SingleOutputStreamOperator<Integer> socketDs = env.socketTextStream("xxx.xxx.xxx.xxx", 1234)// 重分区:当前数据 % 下游算子并行度.partitionCustom(new MyPartitioner(), r -> r)// 将输入的字符串转为整型.map(r -> Integer.parseInt(r)).assignTimestampsAndWatermarks(WatermarkStrategy.<Integer>forMonotonousTimestamps() // 单调递增的事件时间.withTimestampAssigner((r, ts) -> r * 1000l) // 水位线提取器,将数据*1000作为水位线);socketDs.keyBy( r -> r % 2) // 将数据奇偶划分,在两个不同的分区.window(TumblingEventTimeWindows.of(Time.seconds(10))) // 开窗:十秒的滑动窗口.process(new ProcessWindowFunction<Integer, String, Integer, TimeWindow>() {@Overridepublic void process(Integer integer, ProcessWindowFunction<Integer, String, Integer, TimeWindow>.Context context, Iterable<Integer> elements, Collector<String> out) throws Exception {long start = context.window().getStart();long end = context.window().getEnd();String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss SSS");String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss SSS");long count = elements.spliterator().estimateSize();out.collect("窗口的开始时间:" + windowStart + "--窗口的结束时间:" + windowEnd + " --key 为" + integer + "的窗口数据包含[" + elements.toString() + "] " + count + "条数据");}}).print();env.execute();

输入奇数:

[root@VM-55-24-centos ~]# nc -lk 1234
3
5
7
9
11
13

 结果:控制台并无任何输出,也就意味着 [ 0 , 10 )窗口并没有被触发。

这是因为在多个上游并行任务中,当前task会以最小的那个作为当前任务的事件时钟,而将数据分为奇偶,则奇数在一个分区,偶数在一个分区,只输入奇数,那么另一个分区为空,就会导致当前水位线一直是 Long.MIN_VALUE ,从而无法正常推进水位线。

19f6e7de126d4c9fa94ed0c19c1abde0.png

解决这个问题则可以通过设置空闲窗口时间withIdleness

WatermarkStrategy
.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
// 设置空闲窗口时间为3s 窗口空闲3s则将空窗口的水位线标记为闲置
.withIdleness(Duration.ofSeconds(3));

官方介绍:

There are two places in Flink applications where a WatermarkStrategy can be used: 1) directly on sources and 2) after non-source operation.

The first option is preferable, because it allows sources to exploit knowledge about shards/partitions/splits in the watermarking logic. Sources can usually then track watermarks at a finer level and the overall watermark produced by a source will be more accurate. Specifying a WatermarkStrategy directly on the source usually means you have to use a source specific interface/ Refer to Watermark Strategies and the Kafka Connector for how this works on a Kafka Connector and for more details about how per-partition watermarking works there.

The second option (setting a WatermarkStrategy after arbitrary operations) should only be used if you cannot set a strategy directly on the source:

如果其中一个输入分割/分区/碎片有一段时间不携带事件,这意味着水印生成器也不能获得任何新的信息来为水印做基础。我们称之为空闲输入或空闲源。这是一个问题,因为您的一些分区可能仍然带有事件。在这种情况下,水印将被保留,因为它被计算为所有不同的平行水印的最小值。

为了解决这个问题,你可以使用一个 WatermarkStrategy 来检测闲置状态并将输入标记为闲置。为此,WatermarkStrategy 提供了一个方便的帮助器:

WatermarkStrategy
.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withIdleness(Duration.ofMinutes(1));

3.6 迟到数据的处理

迟到数据与乱序不同:乱序指的是数据到达Flink时的事件时间并不一定是顺序的;而迟到数据指的是当前数据的事件时间小于当前水位线,例如上一个窗口已经关闭了,属于上一个窗口的数据才达到。

3.6.1 推迟水位线推进

在水位线产生时,设置一个乱序容忍度(延迟时间),推迟系统时间的推进,保证窗口计算被延迟执行,为乱序的数据争取更多的时间进入窗口。

// 水位线生成延时10s
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));

3.6.2 设置窗口延迟关闭

 窗口的生命周期是:触发计算 -> 销毁/关闭窗口。设置窗口延迟时间其实就是延长窗口的销毁/关闭时间。

设置窗口延迟时间也就是允许数据迟到。当触发了窗口计算后,会先计算当前的结果,但是此时并不会关闭窗口。以后每来一条迟到数据,就触发一次这条数据所在窗口计算(增量计算)。直到wartermark 超过了窗口结束时间+推迟时间,此时窗口会真正关闭。

未设置窗口延迟关闭:

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("xxx.xxx.xxx.xxx", 1234).map(new MyMapFunctionImpl());// ***定义 WaterMark 策略WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy = WatermarkStrategy// 乱序的事件时间,需设置最大等待时间(当前窗口水位线 = 当前窗口最大事件时间 - 等待时间).<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 设置最大等待时间为3s// 指定 时间戳分配器,从数据中提取.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {/**** @param waterSensor 当前数据* @param l* @return*/@Overridepublic long extractTimestamp(WaterSensor waterSensor, long l) {System.out.println("当前数据:" + waterSensor + " ==> l:" + l);// 返回的时间戳(毫秒))return waterSensor.getTs() * 1000L;}});// ***指定 watermark策略SingleOutputStreamOperator<WaterSensor> sensorWithWaterMark = sensorDS.assignTimestampsAndWatermarks(waterSensorWatermarkStrategy);sensorWithWaterMark.keyBy(WaterSensor::getId)// ***使用事件时间的窗口,而非处理时间的窗口 10s的滚动窗口.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s,ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context,Iterable<WaterSensor> elements,Collector<String> out) throws Exception {long start = context.window().getStart();long end = context.window().getEnd();String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss SSS");String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss SSS");long count = elements.spliterator().estimateSize();out.collect("窗口的开始时间:" + windowStart + "--窗口的结束时间:" + windowEnd + " --key 为" + s + "的窗口数据包含[" + elements.toString() + "] " + count + "条数据");}}).print();env.execute();
}

输入:

[root@VM-55-24-centos ~]# nc -lk 1234
s1,1,1
s1,5,5
s1,10,10
s1,12,12
s1,13,13
s1,3,3

 输出:

当前数据:WaterSensor{id='s1', ts=1, vc=1} ==> l:-9223372036854775808
当前数据:WaterSensor{id='s1', ts=5, vc=5} ==> l:-9223372036854775808
当前数据:WaterSensor{id='s1', ts=10, vc=10} ==> l:-9223372036854775808
当前数据:WaterSensor{id='s1', ts=12, vc=12} ==> l:-9223372036854775808
当前数据:WaterSensor{id='s1', ts=13, vc=13} ==> l:-9223372036854775808
窗口的开始时间:1970-01-01 08:00:00 000--窗口的结束时间:1970-01-01 08:00:10 000 --key 为s1的窗口数据包含[[WaterSensor{id='s1', ts=1, vc=1}, WaterSensor{id='s1', ts=5, vc=5}]] 2条数据
当前数据:WaterSensor{id='s1', ts=3, vc=3} ==> l:-9223372036854775808

可以看到,当 s1,13,13 到来时,触发了 [ 0 , 10 ) 的窗口关闭,随后来的 s1,3,3 并不会再被 [ 0 , 10 ) 窗口计算。

设置窗口延迟关闭:

sensorWithWaterMark.keyBy(WaterSensor::getId).window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.seconds(3)) // 允许窗口延迟3s关闭

输入:

[root@VM-55-24-centos ~]# nc -lk 1234
s1,2,2
s1,8,8
s1,13,13
s1,14,14
s1,5,5

输出:

当前数据:WaterSensor{id='s1', ts=2, vc=2} ==> l:-9223372036854775808
当前数据:WaterSensor{id='s1', ts=8, vc=8} ==> l:-9223372036854775808
当前数据:WaterSensor{id='s1', ts=13, vc=13} ==> l:-9223372036854775808
窗口的开始时间:1970-01-01 08:00:00 000--窗口的结束时间:1970-01-01 08:00:10 000 --key 为s1的窗口数据包含[[WaterSensor{id='s1', ts=2, vc=2}, WaterSensor{id='s1', ts=8, vc=8}]] 2条数据
当前数据:WaterSensor{id='s1', ts=14, vc=14} ==> l:-9223372036854775808
当前数据:WaterSensor{id='s1', ts=5, vc=5} ==> l:-9223372036854775808
窗口的开始时间:1970-01-01 08:00:00 000--窗口的结束时间:1970-01-01 08:00:10 000 --key 为s1的窗口数据包含[[WaterSensor{id='s1', ts=2, vc=2}, WaterSensor{id='s1', ts=8, vc=8}, WaterSensor{id='s1', ts=5, vc=5}]] 3条数据

1.窗口允许迟到,则在关窗前,每一条迟到的数据达到,都会被窗口触发计算输出。

2.窗口真正被关闭后,迟到数据则无法再进入窗口。

3.6.3 使用侧流接收迟到的数据

利用之前的推迟水位线推进或设置窗口延迟关闭的方法,对于真正关窗的迟到数据都无法进行处理,Flink 提供了 sideOutputLateData() 将关窗后的迟到数据放入侧输出流

.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))
.sideOutputLateData(lateWS)

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("xxx.xxx.xxx.xxx", 1234).map(new MyMapFunctionImpl());// ***定义 WaterMark 策略WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy = WatermarkStrategy// 乱序的事件时间,需设置最大等待时间(当前窗口水位线 = 当前窗口最大事件时间 - 等待时间).<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 设置最大等待时间为3s// 指定 时间戳分配器,从数据中提取.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {/**** @param waterSensor 当前数据* @param l* @return*/@Overridepublic long extractTimestamp(WaterSensor waterSensor, long l) {// 返回的时间戳(毫秒))return waterSensor.getTs() * 1000L;}});// ***指定 watermark策略SingleOutputStreamOperator<WaterSensor> sensorWithWaterMark = sensorDS.assignTimestampsAndWatermarks(waterSensorWatermarkStrategy);// ***指定侧输出流存放关窗后的迟到数据OutputTag outputTag = new OutputTag("late-data", Types.POJO(WaterSensor.class));SingleOutputStreamOperator<String> process = sensorWithWaterMark.keyBy(WaterSensor::getId)// ***使用事件时间的窗口,而非处理时间的窗口 10s的滚动窗口.window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.seconds(3)) // 允许窗口延迟3s关闭.sideOutputLateData(outputTag).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s,ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context,Iterable<WaterSensor> elements,Collector<String> out) throws Exception {long start = context.window().getStart();long end = context.window().getEnd();String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss SSS");String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss SSS");long count = elements.spliterator().estimateSize();out.collect("窗口的开始时间:" + windowStart + "--窗口的结束时间:" + windowEnd + " --key 为" + s + "的窗口数据包含[" + elements.toString() + "] " + count + "条数据");}});// 获取侧输出流process.getSideOutput(outputTag).printToErr("测输出流中的迟到数据" + outputTag.getTypeInfo());process.print();env.execute();
}

输入:

[root@VM-55-24-centos ~]# nc -lk 1234
s1,1,1
s1,5,5
s1,10,10
s1,9,9
s1,13,13
s1,15,15
s1,16,16
s1,8,8
s1.12.12

 输出:

窗口的开始时间:1970-01-01 08:00:00 000--窗口的结束时间:1970-01-01 08:00:10 000 --key 为s1的窗口数据包含[[WaterSensor{id='s1', ts=1, vc=1}, WaterSensor{id='s1', ts=5, vc=5}, WaterSensor{id='s1', ts=9, vc=9}]] 3条数据
·测输出流中的迟到数据PojoType<com.lc.bean.WaterSensor, fields = [id: String, ts: Long, vc: Integer]>> WaterSensor{id='s1', ts=8, vc=8}
·测输出流中的迟到数据PojoType<com.lc.bean.WaterSensor, fields = [id: String, ts: Long, vc: Integer]>> WaterSensor{id='s1', ts=12, vc=12}

可以看到,在输入 s1,16,16 时已经关闭了 [ 0 , 10 ) 的窗口,后面迟到的数据放入了侧输出流。 

乱序、迟到的数据处理设置经验:

  1. watermark等待时间不宜设置过大,一般是秒级别,在乱序和延迟之间取舍。
  2. 设置一定的窗口允许迟到,只考虑大部分迟到数据,不考虑极端小部分的迟到数据。
  3. 极端小部分迟到数据,放入侧输出流,获取后做处理。

三.基于时间的合流——双流联结(Join)

可以发现,根据某个key合并两条流,与关系型数据库中表的join操作非常相近。事实上,Flink中两条流的connect操作,就可以通过keyBy指定键进行分组后合并,实现了类似于SQL中的join操作;另外connect支持处理函数,可以使用自定义实现各种需求,其实已经能够处理双流join的大多数场景。

不过处理函数是底层接口,所以尽管connect能做的事情多,但在一些具体应用场景下还是显得太过抽象了。比如,如果我们希望统计固定时间内两条流数据的匹配情况,那就需要自定义来实现——其实这完全可以用窗口(window)来表示。为了更方便地实现基于时间的合流操作,Flink的DataStrema API提供了内置的join算子。

3.1 窗口联结(Window Join)

Flink为基于一段时间的双流合并专门提供了一个窗口联结算子,可以定义时间窗口,并将两条流中匹配公共键(key)的数据放在窗口中进行配对处理。

3.1.1 窗口联结的调用

用法:

stream1.join(stream2).where(<KeySelector>) // 指定流1要连接的Key.equalTo(<KeySelector>) // 指定流2要连接的Key.window(<WindowAssigner>) // 两条流一起开窗.apply(<JoinFunction>) // 处理逻辑

例子:两条流,匹配出同一时间内的Key相同的数据。

/*** Window Join*/
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =         StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 数据流 1SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env.fromElements(new Tuple2<>("a", 1),new Tuple2<>("a", 2),new Tuple2<>("b", 3),new Tuple2<>("b", 4),new Tuple2<>("c", 4)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps() // 单调递增的事件时间.withTimestampAssigner((v, ts) -> v.f1 * 1000));    // 水位线提取器// 数据流 1SingleOutputStreamOperator<Tuple3<String, Integer , Integer>> ds2 = env.fromElements(new Tuple3<>("a", 3 , 11),new Tuple3<>("a", 11 , 22),new Tuple3<>("b", 6 , 3),new Tuple3<>("b", 9 , 13),new Tuple3<>("c", 10 , 12)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer , Integer>>forMonotonousTimestamps()   // 单调递增的事件时间.withTimestampAssigner((v, ts) -> v.f1 * 1000));     // 水位线提取器/*** Window Join* 1、落在同一时间窗口内的数据才能匹配* 2、根据数据中的某个Key进行匹配* 3、只能获取匹配成功的数据* 4、类似 Inner Join*/DataStream<String> join = ds1 // 第一条流.join(ds2) // join 第二条流.where(r1 -> r1.f0) // 第一条流中要匹配的key.equalTo(r2 -> r2.f0) // 第一条流中要匹配的key.window(TumblingEventTimeWindows.of(Time.seconds(10))) // 十秒滚动窗口.apply(new JoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {@Overridepublic String join(Tuple2<String, Integer> v1, Tuple3<String, Integer, Integer> v2) throws Exception {return v1 + "<===匹配===>" + v2;}});env.execute();
}

结果:

(a,1)<===匹配===>(a,3,11)
(a,2)<===匹配===>(a,3,11)
(b,3)<===匹配===>(b,6,3)
(b,3)<===匹配===>(b,9,13)
(b,4)<===匹配===>(b,6,3)
(b,4)<===匹配===>(b,9,13)

 只有Key匹配,且落在同一时间窗口的数据才能匹配。

3.1.2 间隔联结(Interval Join

在有些场景下,我们要处理的时间间隔可能并不是固定的。这时显然不应该用滚动窗口或滑动窗口来处理,因为数据很可能卡在窗口的两侧边缘,例如 a,5 与 a,11 ,虽然Key相匹配,但是a,11属于 [ 10 , 20 ] 的窗口,则无法匹配,显然基于时间的窗口联合无能为力。

为了应对这样的需求,Flink提供了一种叫作“间隔联结”(interval join)的合流操作。顾名思义,间隔联结的思路就是针对一条流的每个数据,开辟出其时间戳前后的一段时间间隔,看这期间是否有来自另一条流的数据匹配。

下方的流a去间隔联结上方的流b,所以基于A的每个数据元素,都可以开辟一个间隔区间。我们这里设置下界为-2毫秒,上界为1毫秒。于是对于时间戳为2的A中元素,它的可匹配区间就是[0, 3],流B中有时间戳为0、1的两个元素落在这个范围内,所以就可以得到匹配数据对(2, 0)和(2, 1)。同样地,A中时间戳为3的元素,可匹配区间为[1, 4],B中只有时间戳为1的一个数据可以匹配,于是得到匹配数据对(3, 1)。

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 数据流 1SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env.socketTextStream("xxx.xxx.xxx", 1234).map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String s) throws Exception {String[] data = s.split(",");return Tuple2.of(data[0],Integer.valueOf(data[1]));}}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps()   // 单调递增的事件时间.withTimestampAssigner((v, ts) -> v.f1 * 1000)); ;// 数据流 2SingleOutputStreamOperator<Tuple3<String, Integer , Integer>> ds2 = env.socketTextStream("xxx.xxx.xxx", 4321).map(new MapFunction<String, Tuple3<String, Integer,Integer>>() {@Overridepublic Tuple3<String, Integer ,Integer> map(String s) throws Exception {String[] data = s.split(",");return Tuple3.of(data[0],Integer.valueOf(data[1]),Integer.valueOf(data[2]));}}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer , Integer>>forMonotonousTimestamps()   // 单调递增的事件时间.withTimestampAssigner((v, ts) -> v.f1 * 1000));     // 水位线提取器;// 两条流分别KeyBy,Key就是关联条件KeyedStream<Tuple2<String, Integer>, String> ks1 = ds1.keyBy(k1 -> k1.f0);KeyedStream<Tuple3<String, Integer, Integer>, String> ks2 = ds2.keyBy(k2 -> k2.f0);// 定义两个侧输出流存放左右流的迟到数据OutputTag<Tuple2<String, Integer>> leftLateTag = new OutputTag<>("left-late", Types.TUPLE(Types.STRING, Types.INT));OutputTag<Tuple3<String, Integer, Integer>> rightLateTag = new OutputTag<>("right-late", Types.TUPLE(Types.STRING, Types.INT,Types.INT));// 调用 Interval join 间隔联合SingleOutputStreamOperator<String> process = ks1.intervalJoin(ks2).between(Time.seconds(-2), Time.seconds(2)) // 上界偏移-2s,下界偏移2s.sideOutputLeftLateData(leftLateTag)   // 第一条流的迟到数据放入侧输出流.sideOutputRightLateData(rightLateTag) // 第二条流的迟到数据放入侧输出流.process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {/**** @param left join左边的流* @param right join右边的流* @param ctx 上下文信息* @param out 采集器* @throws Exception*/@Overridepublic void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>.Context ctx, Collector<String> out) throws Exception {out.collect(left + "<---匹配--->" + right);}});process.getSideOutput(leftLateTag).printToErr("左流迟到数据");process.getSideOutput(rightLateTag).printToErr("右流迟到数据");process.print();env.execute();
}

输入:

[root@VM-12-13-centos ~]# nc -lk 1234
1> a,6
3> a,10
[root@VM-12-13-centos ~]# nc -lk 4321
2> a,5,5
4> a,12,12
5> a,4,4

输出:

(a,6)<---匹配--->(a,5,5)
(a,10)<---匹配--->(a,12,12)
右流迟到数据> (a,4,4)

Interval join
1、只支持事件时间
2、指定上界、下界的偏移,负号代表时间往前,正号代表时间往后
3、process中,只能处理匹配上的数据
4、两条流关联后的watermark,以两条流中最小的为准
5、如果 当前数据的事件时间 < 当前的watermark,就是迟到数据, 主流的process不处理
        => between后,可以指定将 左流 或 右流 的迟到数据 放入侧输出流

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/218880.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据通信网络基础

数据通信网络基础&#xff08;1&#xff09; 一.前言 • 在人类社会的起源和发展过程中&#xff0c;通信就一直伴随着我们。从20世纪七、八十年代开始&#xff0c; 人类社会已进入到信息时代&#xff0c;对于生活在信息时代的我们&#xff0c;通信的必要性更是不言而喻 的。…

System作为系统进程陔如何关闭?

一、简介 system进程是不可以关闭的&#xff0c;它是用来运行一些系统命令的&#xff0c;比如reboot、shutdown等&#xff0c;以及用来运行一些后台程序&#xff0c;比如ntfs-3g、v4l2loopback等。system进程也被用于运行一些内核模块&#xff0c;比如nvidia、atd等。system进程…

pl_vio线特征

pl_vio线特征 0.引言1.LineFeatureTracker核心逻辑解读2.estimator_node中线段的处理2.1.订阅信息解压2.2.线特征管理 3.线段三角化3.1.普吕克线坐标3.2.正交表示 4.线段残差对位姿的导数4.1.直线的观测模型和误差4.2.误差雅克比推导 0.引言 PL-VIO&#xff0c;本文关注线段。…

web服务器之——www服务器的基本配置

目录 一、www简介 1、什么是www 2、www所用的协议 3、WEB服务器 4、主要数据 5、浏览器 二、 网址及HTTP简介 1、HTTP协议请求的工作流程 三、www服务器的类型(静态网站&#xff08;HTML&#xff09;&#xff0c; 动态网站(jsp python,php,perl)) 1、 仅提供…

VM虚拟机打不开原来保存的虚拟机文件夹ubuntu

VMWare虚拟机打不开原来保存的虚拟机文件夹ubuntu 换了电脑把之前的虚拟机克隆的文件夹直接拿来用 报这个错&#xff1a; 指定的文件不是虚拟磁盘 打不开磁盘“D:\ubuntu_iso\ubuntu_location\Ubuntu 64 位-s002.vmdk”或它所依赖的某个快照磁盘。 模块“Disk”启动失败。 未…

docker部署go gin框架 Windows环境

目录 文章目的是什么 环境介绍 Windows 环境下 docker 部署 go gin 详细步骤 运行容器时因为挂载文件可能会出现的问题 直接部署gin&#xff08;跳过运行容器时因为挂载文件可能会出现的问题&#xff09; 文章目的是什么 假设我们学习了 go 语言&#xff0c;在 Windows(本…

Redis List类型

列表类型是用来存储多个有序的字符串&#xff0c;如图所示&#xff0c;a、b、c、d、e 五个元素从左到右组成了一个有序的列表&#xff0c;列表中的每个字符串称为元素 (element)&#xff0c;一个列表最多可以存储2的32次方 -1个元素。在 Redis 中&#xff0c;可以对列表两端插入…

智能优化算法应用:基于松鼠算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于松鼠算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于松鼠算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.松鼠算法4.实验参数设定5.算法结果6.参考文献7.MA…

虚幻学习笔记14—重叠和碰撞事件

一、前言 在开发应用当中两个物体的重叠和碰撞事件会经常用到&#xff0c;在虚幻中哲两个有很大的区别&#xff0c;在官方文档碰撞概述其实已经讲了怎样发生碰撞和重叠&#xff0c;但是还是遗漏不少注意事项合细节&#xff0c;主要文档写的太粗糙了&#xff0c;这也让我在使用的…

CSS的三大特性(层叠性、继承性、优先级---------很重要)

CSS 有三个非常重要的三个特性&#xff1a;层叠性、继承性、优先级。 层叠性 场景&#xff1a;相同选择器给设置相同的样式&#xff0c;此时一个样式就会覆盖&#xff08;层叠&#xff09;另一个冲突的样式。层叠性主要解决样式冲突 的问题 原则&#xff1a;  样式冲突&am…

【JAVA日志框架】JUL,JDK原生日志框架详解。

前言 Java日志体系混乱&#xff1f;Java日志框架系列&#xff0c;清晰简洁整理好整个Java的日志框架体系。第一篇&#xff0c;JDK原生日志框架——JUL。 目录 1.概述 2.日志级别 3.配置 4.继承关系 1.概述 日志框架的核心问题&#xff1a; 日志是用来记录应用的一些运行…

uniapp实现检查版本检测,更新

1.首先需要获取当前app的版本 const systemInfo uni.getSystemInfoSync();// 应用程序版本号// #ifdef APPme.version systemInfo.appWgtVersion;// #endif// #ifdef H5me.version systemInfo.appVersion;// #endif2.在获取到服务器保存的app版本 3.点击按钮验证版本号 //…

Docker单机部署OceanBase

文章目录 说明机器软硬件要求指导文档本次部署环境说明 OceanBase单机部署&#xff08;Docker&#xff09;一&#xff1a;拉取 OceanBase 数据库相关镜像二&#xff1a;启动 OceanBase 数据库实例完整启动日志展示 三&#xff1a;连接实例遇到报错&#xff1a;没有mysql客户端 …

关联规则 python实现Apriori算法

python实现Apriori算法 根据我们上个博客的例子 def load_dataset():# 载入数据集的函数dataset [[A, , D],[B, C, E],[A, B, C, E],[B, E]]return datasetdef create_candidates(dataset):# 生成1项集的候选集函数candidates []for transaction in dataset:for item in t…

电子学会C/C++编程等级考试2021年06月(六级)真题解析

C/C++等级考试(1~8级)全部真题・点这里 第1题:逆波兰表达式 逆波兰表达式是一种把运算符前置的算术表达式,例如普通的表达式2 + 3的逆波兰表示法为+ 2 3。逆波兰表达式的优点是运算符之间不必有优先级关系,也不必用括号改变运算次序,例如(2 + 3) * 4的逆波兰表示法为* +…

Java 数据结构篇-实现堆的核心方法与堆的应用(实现 TOP-K 问题:最小 k 个数)

文章目录 1.0 堆的说明 2.0 堆的成员变量及其构造方法 3.0 实现堆的核心方法 3.1 实现堆的核心方法 - 获取堆顶元素 peek() 3.2 实现堆的核心方法 - 下潜 down(int i) 3.3 实现堆的核心方法 - 交换元素 swap(int i,int j) 3.4 实现堆核心方法 - 删除堆顶元素 poll() 3.5 实现堆…

论文阅读——Semantic-SAM

Semantic-SAM可以做什么&#xff1a; 整合了七个数据集&#xff1a; 一般的分割数据集&#xff0c;目标级别分割数据集&#xff1a;MSCOCO, Objects365, ADE20k 部分分割数据集&#xff1a;PASCAL Part, PACO, PartImagenet, and SA-1B The datasets are SA-1B, COCO panopt…

第15章 《乐趣》Page305~311, 代码精简以后,讨论一下引用含义的问题

将Page305~311的代码精简了一下&#xff0c;讨论一下引用含义的问题&#xff0c;精简之后的代码如下&#xff1a; #include <iostream> #include <SDL2/SDL.h>using namespace std;namespace sdl2 {char const* last_error() {return SDL_GetError(); }struct Ini…

10 新字符设备驱动文件

一、新字符设备驱动原理 因为 register_chrdev 和 unregister_chrdev 两个函数是老版本驱动文件&#xff0c;现在可以用新字符设备驱动 API 函数。 1. 分配和和释放设备号 使用 register_chrdev 函数注册字符设备的时候只需要给定一个主设备号即可&#xff0c;但是这样会带来两…

pytest之allure测试报告03:allure动态自定义报告

1、测试用例模块中引入allure&#xff1a;import allure 2、yaml文件中定义添加title、story的值&#xff1a; 3、测试用例中读取调用。eg:allure.dynamic.title() 4、运行报告查看&#xff1a;成功动态展示yaml文件中配置的story、title