尚硅谷Flink(三)时间、窗口

1

🎰🎲🕹️

🎰时间、窗口

🎲窗口

🕹️是啥

Flink 是一种流式计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。想要更加方便高效地处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(Window)。 

在Flink中,窗口其实并不是一个“框”,应该把窗口理解成一个“桶”。在Flink中,窗口可以把流切割成有限大小的多个“存储桶”(bucket);每个数据都会分发到对应的桶中,当到达窗口结束时间时,就对每个桶中收集的数据进行计算处理。

Flink 中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口。另外,这里我们认为到达窗口结束时间时,窗口就触发计算并关闭

事实上“触发计算”和“窗口关闭”两个行为也可以分开

🕹️分类

窗口本身是截取有界数据的一种方式,所以窗口一个非常重要的信息其实就是“怎样截取数据”。换句话说,就是以什么标准来开始和结束数据的截取,我们把它叫作窗口的“驱动类型”。

1)时间窗口(Time Window)
时间窗口以时间点来定义窗口的开始(start)和结束(end),所以截取出的就是某一时间段的数据。到达结束时间时,窗口不再收集数据,触发计算输出结果,并将窗口关闭销毁。所以可以说基本思路就是“定点发车”。

(2)计数窗口(Count Window)
计数窗口基于元素的个数来截取数据,到达固定的个数时就触发计算并关闭窗口。每个窗口截取数据的个数,就是窗口的大小。基本思路是“人齐发车”。

根据分配数据的规则,窗口的具体实现可以分为 4 类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)、会话窗口(Session Window),以及全局窗口(Global Window)

  • 滚动窗口有固定的大小,是一种对数据进行“均匀切片”的划分方式。窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态。这是最简单的窗口形式,每个数据都会被分配到一个窗口,而且只会属于一个窗口。比如我们可以定义一个长度为1小时的滚动时间窗口,那么每个小时就会进行一次统计;或者定义一个长度为10的滚动计数窗口,就会每10个数进行一次统计。

滚动窗口也可以看作是一种特殊的滑动窗口——窗口大小等于滑动步长(size = slide)。

  • 滑动窗口的大小也是固定的。但是窗口之间并不是首尾相接的,而是可以“错开”一定的位置。定义滑动窗口的参数有两个:除去窗口大小(window size)之外,还有一个“滑动步长”(window slide),它其实就代表了窗口计算的频率。窗口在结束时间触发计算输出结果,那么滑动步长就代表了计算频率。

  • 会话窗口,是基于“会话”(session)来来对数据进行分组的。会话窗口只能基于时间来定义。会话窗口中,最重要的参数就是会话的超时时间,也就是两个会话窗口之间的最大距离。如果相邻两个数据到来的时间间隔(Gap)小于指定的大小(size),那说明还在保持会话,它们就属于同一个窗口;如果gap大于size,那么新来的数据就应该属于新的会话窗口,而前一个窗口就应该关闭了。

  • “全局窗口”,这种窗口全局有效,会把相同key的所有数据都分配到同一个窗口中。这种窗口没有结束的时候,默认是不会做触发计算的。如果希望它能对数据进行计算处理,还需要自定义“触发器”(Trigger)。

🕹️api

在定义窗口操作之前,首先需要确定,到底是基于按键分区(Keyed)的数据流KeyedStream 来开窗,还是直接在没有按键分区的 DataStream 上开窗。也就是说,在调用窗口算子之前,是否有 keyBy 操作。 

(1)按键分区窗口(Keyed Windows) 
经过按键分区 keyBy 操作后,数据流会按照key 被分为多条逻辑流(logical streams),这就是KeyedStream。基于KeyedStream 进行窗口操作时,窗口计算会在多个并行子任务上同时执行。相同 key 的数据会被发送到同一个并行子任务,而窗口操作会基于每个 key 进行单独的处理。所以可以认为,每个 key 上都定义了一组窗口,各自独立地进行统计计算。

stream.keyBy(...) .window(...)

(2)非按键分区(Non-Keyed Windows) 
如果没有进行 keyBy,那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了1。 

stream.windowAll(...) 

注意:对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll
本身就是一个非并行的操作。 

stream.keyBy(<key selector>) 
       .window(<window assigner>) 
       .aggregate(<window function>) 

 其中.window()方法需要传入一个窗口分配器,它指明了窗口的类型;而后面的.aggregate()方法传入一个窗口函数作为参数,它用来定义窗口具体的处理逻辑。窗口分配器有各种形式,而窗口函数的调用方法也不只.aggregate()一种

🕹️窗口分配器

        // 窗口分配器 指定用什么窗口 时间、计数————滚动、滑动、会话KS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));KS.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)));KS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)));KS.window(GlobalWindows.create());KS.countWindow(5);  // 窗口数据长度5KS.countWindow(5, 2);  // 滑动

🕹️窗口函数

定义了窗口分配器,我们只是知道了数据属于哪个窗口,可以将数据收集起来了;至于收集起来到底要做什么, 其实还完全没有头绪。所以在窗口分配器之后,必须再接上一个定义窗口如何进行计算的操作,这就是所谓的“窗口函数”(window functions)。

增量聚合Reduce

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> stream = env.socketTextStream("localhost", 7777);KeyedStream<WaterSensor, String> KS = stream.map(new MapFunction<String, WaterSensor>() {@Overridepublic WaterSensor map(String s) throws Exception {String[] list = s.split(",");return new WaterSensor(list[0], Long.valueOf(list[1]), Integer.valueOf(list[2]));}}).keyBy(value -> value.id);// 窗口分配器 指定用什么窗口 时间、计数————滚动、滑动、会话WindowedStream<WaterSensor, String, TimeWindow> window = KS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));window.reduce(new ReduceFunction<WaterSensor>() {@Overridepublic WaterSensor reduce(WaterSensor t1, WaterSensor t2) throws Exception {System.out.println("reduce, t1: t2 = "+t1+": "+t2);return new WaterSensor(t1.getId(), t1.getTs(), t1.getVc()+t2.getVc());}}).print();env.execute();}

Aggregate

ReduceFunction 可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合 状态的类型、输出结果的类型都必须和输入数据类型一样。

AggregateFunction 可以看作是 ReduceFunction 的通用版本,这里有三种类型:

输入类型 (IN)、累加器类型(ACC)和输出类型(OUT)。输入类型 IN 就是输入流中元素的数据类型;累加器类型 ACC 则是我们进行聚合的中间状态类型;而输出类型OUT当然就是最终计算结果 的类型了。

接口中有四个方法:

⚫ createAccumulator():创建一个累加器,这就是为聚合创建了一个初始状态,每个聚 合任务只会调用一次。

⚫ add():将输入的元素添加到累加器中。

⚫ getResult():从累加器中提取聚合的输出结果。

⚫ merge():合并两个累加器,并将合并后的状态作为一个累加器返回

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> stream = env.socketTextStream("localhost", 7777);KeyedStream<WaterSensor, String> KS = stream.map(new MapFunction<String, WaterSensor>() {@Overridepublic WaterSensor map(String s) throws Exception {String[] list = s.split(",");return new WaterSensor(list[0], Long.valueOf(list[1]), Integer.valueOf(list[2]));}}).keyBy(value -> value.id);// 窗口分配器 指定用什么窗口 时间、计数————滚动、滑动、会话WindowedStream<WaterSensor, String, TimeWindow> window = KS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));//  类型形参://  <IN> – The type of the values that are aggregated (input values)//  <ACC> – The type of the accumulator (intermediate aggregate state).//  <OUT> – The type of the aggregated resultSingleOutputStreamOperator<String> aggregate = window.aggregate(new AggregateFunction<WaterSensor, Integer, String>() {@Overridepublic Integer createAccumulator() {System.out.println("createAccumulator()");return 0;}@Overridepublic Integer add(WaterSensor value, Integer accumulator) {System.out.println("add");return value.getVc() + accumulator;}@Overridepublic String getResult(Integer accumulator) {return "getResult " + accumulator.toString();}@Overridepublic Integer merge(Integer a, Integer b) {// 会话窗口才用得到System.out.println("用不到的merge");return null;}});aggregate.print();env.execute();}

全窗口函数

有些场景下,我们要做的计算必须基于全部的数据才有效,这时做增量聚合就没什么意 义了;另外,输出的结果有可能要包含上下文中的一些信息(比如窗口的起始时间),这是增 量聚合函数做不到的。

全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口 要输出结果的时候再取出数据进行计算。WindowFunction 和 ProcessWindowFunction。

1)窗口函数(WindowFunction)

WindowFunction 字面上就是“窗口函数”,它其实是老版本的通用窗口函数接口。我们 可以基于 WindowedStream 调用.apply()方法,传入一个 WindowFunction 的实现类。不过 WindowFunction 能提供的上下文信息较少,也没有更高级的功能。事实上,它的作 用可以被ProcessWindowFunction 全覆盖,所以之后可能会逐渐弃用。

2)处理窗口函数(ProcessWindowFunction)

ProcessWindowFunction 是 Window API 中最底层的通用窗口函数接口。之所以说它“最 底层”,是因为除了可以拿到窗口中的所有数据之外,ProcessWindowFunction 还可以获取到 一个“上下文对象”(Context)。这个上下文对象非常强大,不仅能够获取窗口信息,还可以 访问当前的时间和状态信息。这里的时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。这就使得 ProcessWindowFunction 更加灵活、功能更加丰富,其 实就是一个增强版的 WindowFunction。 事实上,ProcessWindowFunction 是 Flink 底层 API——处理函数(process function)中的 一员,关于处理函数我们会在后续章节展开讲解。

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> stream = env.socketTextStream("localhost", 7777);KeyedStream<WaterSensor, String> KS = stream.map(new MapFunction<String, WaterSensor>() {@Overridepublic WaterSensor map(String s) throws Exception {String[] list = s.split(",");return new WaterSensor(list[0], Long.valueOf(list[1]), Integer.valueOf(list[2]));}}).keyBy(value -> value.id);WindowedStream<WaterSensor, String, TimeWindow> window = KS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));/*** 类型形参:* <IN> – The type of the input value.* <OUT> – The type of the output value.* <KEY> – The type of the key.* <W> – The type of Window that this window function can be applied on.*/SingleOutputStreamOperator<String> process = window.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {/**** @param s The key 分组的key* @param context The context in which the window is being evaluated.* @param elements The elements in the window being evaluated.* @param out A collector for emitting elements.* @throws Exception*/@Overridepublic void process(String s, ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {// 上下文可以拿到的东西long start = context.window().getStart();long end = context.window().getEnd();String StartTime = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");String EndTime = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect(s + "窗口[" + StartTime + "———" + EndTime + "] 有 " + count + " 条数据");}});process.print();/*** 16> s1窗口[2023-10-16 10:33:30.000———2023-10-16 10:33:35.000] 有 1 条数据* 16> s1窗口[2023-10-16 10:33:40.000———2023-10-16 10:33:45.000] 有 3 条数据* 16> s1窗口[2023-10-16 10:33:50.000———2023-10-16 10:33:55.000] 有 6 条数据* 16> s1窗口[2023-10-16 10:33:55.000———2023-10-16 10:34:00.000] 有 7 条数据*/env.execute();}

agg、pro合体

   public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> stream = env.socketTextStream("localhost", 7777);KeyedStream<WaterSensor, String> KS = stream.map(new MapFunction<String, WaterSensor>() {@Overridepublic WaterSensor map(String s) throws Exception {String[] list = s.split(",");return new WaterSensor(list[0], Long.valueOf(list[1]), Integer.valueOf(list[2]));}}).keyBy(value -> value.id);WindowedStream<WaterSensor, String, TimeWindow> window = KS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));SingleOutputStreamOperator<String> process = window.aggregate(new MyAgg(), new MyProcess());process.print();env.execute();}public static class MyAgg implements AggregateFunction<WaterSensor, Integer, String>{@Overridepublic Integer createAccumulator() {System.out.println("createAccumulator()");return 0;}@Overridepublic Integer add(WaterSensor value, Integer accumulator) {System.out.println("add");return value.getVc() + accumulator;}@Overridepublic String getResult(Integer accumulator) {return "getResult " + accumulator.toString();}@Overridepublic Integer merge(Integer a, Integer b) {// 会话窗口才用得到System.out.println("用不到的merge");return null;}}public static class MyProcess extends ProcessWindowFunction<String, String, String, TimeWindow> {@Overridepublic void process(String s, ProcessWindowFunction<String, String, String, TimeWindow>.Context context, Iterable<String> elements, Collector<String> out) throws Exception {// 上下文可以拿到的东西long start = context.window().getStart();long end = context.window().getEnd();String StartTime = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");String EndTime = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect(s + "窗口[" + StartTime + "———" + EndTime + "] 有 " + count + " 条数据"+elements.toString());}}

🕹️触发器、移除器*

上述已经默认实现

触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”,本质上就是执行窗 口函数,所以可以认为是计算得到结果并输出的过程。

基于WindowedStream调用.trigger()方法,就可以传入一个自定义的窗口触发器(Trigger)。

stream.keyBy(...) .window(...) .trigger(new MyTrigger())

移除器主要用来定义移除某些数据的逻辑。基于 WindowedStream 调用.evictor()方法,就 可以传入一个自定义的移除器(Evictor)。Evictor是一个接口,不同的窗口类型都有各自预实 现的移除器。

stream.keyBy(...) .window(...).evictor(new MyEvictor())

🎲时间语义(瞎起名)

到底是以那种时间作为衡量标准,就是所谓的“时间语义”。

在实际应用中,事件时间语义(真正产生的时间)会更为常见。一般情况下,业务日志数据中都会记录数据生成的时间戳(timestamp),它就可以作为事件时间的判断基础。

从 Flink1.12版本开始,Flink已经将事件时间作为默认的时间语义了。

🎲水位线

在窗口的处理过程中,我 们 可以基于数据的时间戳,自定义 一 个“逻辑时钟”。这个时钟的时间不会自动流逝;它的时间进 展,就是靠着新到数据的时间戳 来推动的

这样的好处在于,计算的过程可以完全不依赖处理时间(系统时间),不论什么时候进行统计 处理,得到的结果都是正确的。而一般实时流处理的场景中,事件时间可以基本与处理时间保持同 步,只是略微有一点延迟,同时保证了窗口计算的正确性。

在 Flink 中,用来衡量事件时间进展的标记,就被称作“水位线”(Watermark)。

具体实现上,水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点, 主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某 个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。

1)有序流中的水位线

理想状态(数据量小),数据应该按照生成的先后顺序进入流中,每条数据产生一个水位线;

实际应用中,如果当前数据量非常大,且同时涌来的数据时间差会非常小(比如几毫秒),往 往对处理计算也没什么影响。所以为了提高效率,一般会每隔一段时间生成一个水位线。

2)乱序流中的水位线

😅乱序 + 数据量小:在分布式系统中,数据在节点间传输,会因为网络传输延迟的不确定性,导致顺序发生改变,这就是 所谓的“乱序数据”。 

情况是数据乱序,也就是说,只有数据的时间戳比当前时钟大,才能推动时钟前进,这时才插入水位线。

😅乱序 + 数据量大:如果考虑到大量数据同时到来的处理效率,我们同样可以周期性地生成水位线。这时 只需要保存一下之前所有数据中的最大时间戳,需要插入水位线时,就直接以它作为时间戳生成新的水位 线。

😅我们无法正确处理“迟到”的数据。为了让窗口能够正确收集到迟到的数据,我们也可 以等上一段时间,比如2秒;也就是用当前已有数据的最大时间戳减去2秒,就是要插入的水位线的时间戳。 这样的话,9秒的数据到来之后,事件时钟不会直接推进到9秒,而是进展到了7秒;必须等到11秒的数据到来 之后,事件时钟才会进展到9秒,这时迟到数据也都已收集齐,0~9秒的窗口就可以正确计算结果了

🕹️生成水位线原则

完美的水位线是“绝对正确”的,也就是一个水位线一旦出现,就表示这个时间之前的 数据已经全部到齐、之后再也不会出现了。不过如果要保证绝对正确,就必须等足够长的时间,这会带来更高的延迟。

如果我们对准确性完全不考虑、一味地追求处理速度,可以直接使用处理时间语义, 这在理论上可以得到最低的延迟。

所以 Flink中的水位线,其实是流处理中对低延迟和结果正确性的一个权衡机制,而且把 控制的权力交给了程序员,我们可以在代码中定义水位线的生成策略。

🕹️内置水位线 

对于有序流,主要特点就是时间戳单调增长,所以永远不会出现迟到数据的问题。这是 周期性生成水位线的最简单的场景,直接调用 WatermarkStrategy.forMonotonousTimestamps() 方法就可以实现

注意并行度输出

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<String> stream = env.socketTextStream("localhost", 7777);SingleOutputStreamOperator<WaterSensor> DS = stream.map(new MapFunction<String, WaterSensor>() {@Overridepublic WaterSensor map(String s) throws Exception {String[] list = s.split(",");return new WaterSensor(list[0], Long.valueOf(list[1]), Integer.valueOf(list[2]));}});// TODO 1.定义Watermark策略WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy// 1.1 指定watermark生成:升序的watermark,没有等待时间.<WaterSensor>forMonotonousTimestamps()// 1.2 指定 时间戳分配器,从数据中提取.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {// 返回的时间戳,要 毫秒System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);return element.getTs() * 1000L;}});// TODO 2. 指定 watermark策略SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = DS.assignTimestampsAndWatermarks(watermarkStrategy);sensorDSwithWatermark.keyBy(sensor -> sensor.getId())// TODO 3.使用 事件时间语义 的窗口.window(TumblingEventTimeWindows.of(Time.seconds(5))).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}}).print();env.execute();/*** 数据=WaterSensor{id='s1', ts=1, vc=1},recordTs=-9223372036854775808* 数据=WaterSensor{id='s1', ts=2, vc=1},recordTs=-9223372036854775808* 数据=WaterSensor{id='s1', ts=3, vc=1},recordTs=-9223372036854775808* 数据=WaterSensor{id='s1', ts=4, vc=1},recordTs=-9223372036854775808* 数据=WaterSensor{id='s1', ts=5, vc=1},recordTs=-9223372036854775808* key=s1的窗口[1970-01-01 08:00:00.000,1970-01-01 08:00:05.000)包含4条数据===>[WaterSensor{id='s1', ts=1, vc=1}, WaterSensor{id='s1', ts=2, vc=1}, WaterSensor{id='s1', ts=3, vc=1}, WaterSensor{id='s1', ts=4, vc=1}]* 数据=WaterSensor{id='s1', ts=6, vc=1},recordTs=-9223372036854775808* 数据=WaterSensor{id='s1', ts=7, vc=1},recordTs=-9223372036854775808* 数据=WaterSensor{id='s1', ts=9, vc=1},recordTs=-9223372036854775808* 数据=WaterSensor{id='s1', ts=10, vc=1},recordTs=-9223372036854775808* key=s1的窗口[1970-01-01 08:00:05.000,1970-01-01 08:00:10.000)包含4条数据===>[WaterSensor{id='s1', ts=5, vc=1}, WaterSensor{id='s1', ts=6, vc=1}, WaterSensor{id='s1', ts=7, vc=1}, WaterSensor{id='s1', ts=9, vc=1}]*/}

由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间。这时生成 水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的结果,相当于把表调慢,当前 时钟会滞后于数据的最大时间戳。调用 WatermarkStrategy. forBoundedOutOfOrderness()方法就 可以实现。这个方法需要传入一个 maxOutOfOrderness 参数,表示“最大乱序程度”,它表示 数据流中乱序数据时间戳的最大差值;如果我们能确定乱序程度,那么设置对应时间长度的 延迟,就可以等到所有的乱序数据了。

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<String> stream = env.socketTextStream("localhost", 7777);SingleOutputStreamOperator<WaterSensor> DS = stream.map(new MapFunction<String, WaterSensor>() {@Overridepublic WaterSensor map(String s) throws Exception {String[] list = s.split(",");return new WaterSensor(list[0], Long.valueOf(list[1]), Integer.valueOf(list[2]));}});// TODO 1.定义Watermark策略WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy// 1.1 指定watermark生成:乱的watermark,没有等待时间.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 1.2 指定 时间戳分配器,从数据中提取.withTimestampAssigner((element, recordTimestamp) -> {// 返回的时间戳,要 毫秒System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);return element.getTs() * 1000L;});// TODO 2. 指定 watermark策略SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = DS.assignTimestampsAndWatermarks(watermarkStrategy);sensorDSwithWatermark.keyBy(sensor -> sensor.getId())// TODO 3.使用 事件时间语义 的窗口.window(TumblingEventTimeWindows.of(Time.seconds(5))).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}}).print();env.execute();}

 周期性水位生成器

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<String> stream = env.socketTextStream("localhost", 7777);SingleOutputStreamOperator<WaterSensor> DS = stream.map(new MapFunction<String, WaterSensor>() {@Overridepublic WaterSensor map(String s) throws Exception {String[] list = s.split(",");return new WaterSensor(list[0], Long.valueOf(list[1]), Integer.valueOf(list[2]));}});// TODO 1.定义Watermark策略WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy// 1.1 指定watermark生成:乱的watermark,没有等待时间.<WaterSensor>forGenerator(new WatermarkStrategy<WaterSensor>() {@Overridepublic WatermarkGenerator<WaterSensor> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new MyWaterStrategy<>(3000L);}})// 1.2 指定 时间戳分配器,从数据中提取.withTimestampAssigner((element, recordTimestamp) -> {// 返回的时间戳,要 毫秒System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);return element.getTs() * 1000L;});// TODO 2. 指定 watermark策略SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = DS.assignTimestampsAndWatermarks(watermarkStrategy);sensorDSwithWatermark.keyBy(sensor -> sensor.getId())// TODO 3.使用 事件时间语义 的窗口.window(TumblingEventTimeWindows.of(Time.seconds(5))).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}}).print();env.execute();}public static class MyWaterStrategy<T> implements WatermarkGenerator<T> {private long delay;private long maxTs;public MyWaterStrategy(long delay) {this.delay = delay;this.maxTs = Long.MIN_VALUE+this.delay+1;}@Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput output) {maxTs = Math.max(maxTs, eventTimestamp);}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(maxTs- delay -1));}}

🕹️并行,水位线传递  

而当一个任务接收到多个上游并行任务传递来的水位线时,应该以 最小的那个作为当前任务的事件时钟。

在多个上游并行任务中,如果有其中一个没有数据,由于当前 Task 是以最小的那个作为 当前任务的事件时钟,就会导致当前 Task 的水位线无法推进,就可能导致窗口无法触发。这 时候可以设置空闲等待。

.withIdleness(Duration.ofSecond(3))

迟到数据处理:

Flink的窗口,也允许迟到数据。当触发了窗口计算后,会先计算当前的结果,但是此时 并不会关闭窗口。

以后每来一条迟到数据,就触发一次这条数据所在窗口计算(增量计算)。直到 wartermark 超过了窗口结束时间+推迟时间,此时窗口会真正关闭。

.window(TumblingEventTimeWindows.of(Time.seconds(5)))

.allowedLateness(Time.seconds(3))

允许迟到只能运用在 event time 上

🎲时间的合流

        可以发现,根据某个 key 合并两条流,与关系型数据库中表的 join 操作非常相近。事实 上,Flink 中两条流的 connect 操作,就可以通过 keyBy 指定键进行分组后合并,实现了类似 于 SQL 中的 join 操作;另外 connect 支持处理函数,可以使用自定义实现各种需求,其实已 经能够处理双流 join 的大多数场景。

        不过处理函数是底层接口,所以尽管 connect能做的事情多,但在一些具体应用场景下还 是显得太过抽象了。比如,如果我们希望统计固定时间内两条流数据的匹配情况,那就需要 自定义来实现——其实这完全可以用窗口(window)来表示。为了更方便地实现基于时间的 合流操作,Flink 的 DataStrema API 提供了内置的 join 算子。

🕹️窗口联结(Window Join)

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// TODOSingleOutputStreamOperator<Tuple2<String, Integer>> DS1 = env.fromElements(Tuple2.of("a", 1),Tuple2.of("a", 2),Tuple2.of("b", 7),Tuple2.of("b", 5),Tuple2.of("c", 3)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts) -> (value.f1 * 1000L)));SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> DS2 = env.fromElements(Tuple3.of("a", 1, 1),Tuple3.of("a", 8, 1),Tuple3.of("b", 8, 1),Tuple3.of("b", 5, 1),Tuple3.of("c", 3, 1)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts) -> value.f1 * 1000L));DataStream<String> join = DS1.join(DS2).where(x -> x.f0)// ds1的keyBy.equalTo(x -> x.f0)// ds2的keyBy.window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new JoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {/**** @param first The element from first input.* @param second The element from second input.* @return* @throws Exception*/@Overridepublic String join(Tuple2<String, Integer> first, Tuple3<String, Integer, Integer> second) throws Exception {return first + "-------" + second;}});join.print();env.execute();}

🕹️间隔联结(Interval Join)

Flink 提供了一种叫作“间隔联结”(interval join)的合流操作。 顾名思义,间隔联结的思路就是针对一条流的每个数据,开辟出其时间戳前后的一段时间间 隔,看这期间是否有来自另一条流的数据匹配。

案例需求:在电商网站中,某些用户行为往往会有短时间内的强关联。我们这里举一个 例子,我们有两条流,一条是下订单的流,一条是浏览数据的流。我们可以针对同一个用户, 来做这样一个联结。也就是使用一个用户的下订单的事件和这个用户的最近十分钟的浏览据进行一个联结查询。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/164914.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ddns-go配合aliyun域名解析通过ipv6实现共享桌面

ddns-go配合aliyun域名解析通过ipv6实现共享桌面 前提&#xff1a; 必须拥有ipv6公网IP&#xff0c;测试IPv6 测试 (testipv6.cn) 如果是光猫拨号一点要选择ipv4和ipv6&#xff0c;同时要看光猫是否支持ipv6转发&#xff0c;如果不支持转发也不行&#xff0c;光猫不支持ipv6…

PC电脑 VMware安装的linux CentOs7如何扩容磁盘?

一、VM中进行扩容设置 必须要关闭当前CentOS&#xff0c;不然扩展按钮是灰色的。 输入值必须大于当前磁盘容量。然后点击扩展&#xff0c;等待扩展完成会提示一个弹框&#xff0c;点击确定&#xff0c;继续确定。 二、操作CentOS扩容——磁盘分区 第一步设置完成。那就启动 …

操作系统备考学习 day10

操作系统备考学习 day10 第三章 内存管理3.2 虚拟内存管理3.2.1 虚拟内存的基本概念传统存储管理方式的特征、缺点局部性原理虚拟内存的定义和特征如何实现虚拟内存技术 3.2.2 请求分页管理方式页表机制缺页中断机构地址变换机构 3.2.3 页面置换算法最佳置换算法&#xff08;OP…

ubuntu18.04 RTX3060 rangnet++训练 bonnetal语义分割

代码链接&#xff1a; https://github.com/PRBonn/lidar-bonnetal 安装anaconda环境为 CUDA 11.0&#xff08;11.1也可以&#xff09; anaconda环境如下 numpy1.17.2 torchvision0.2.2 matplotlib2.2.3 tensorflow1.13.1 scipy0.19.1 pytorch1.7.1 vispy0.5.3 opencv_python…

【五:Httprunner的介绍使用】

接口自动化框架封装思想的建立。httprunner&#xff08;热加载&#xff1a;动态参数&#xff09;&#xff0c;去应用 意义不大。 day1 一、什么是Httprunner? 1.httprunner是一个面向http协议的通用测试框架&#xff0c;目前最新的版本3.X。以前比较流行的 2.X的版本。2.它的…

开源的容器运行时项目 Podman

本心、输入输出、结果 文章目录 开源的容器运行时项目 Podman前言Podman 简介Podman 与 Docker 的区别Podman 在使用上和 Docker 有什么区别从构建者角度分析 Podman 在使用上和 Docker 有什么区别从使用者角度分析 Podman 在使用上和 Docker 有什么区别 Podman 常用命令容器镜…

maven 常用知识速记

创建项目 maven archetype:generate依赖范围 有如下依赖示例&#xff1a; <dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.7</version><scope>test</scope> </dependency>其中…

18.项目开发之前端项目搭建测试

项目开发之前端项目搭建测试 解压文件&#xff0c;将前端项目目录&#xff0c;拖拽到HBuilder中 前端项目QuantTrade_vue地址&#xff1a;传送门 后端项目QuantTrade地址&#xff1a; https://pan.baidu.com/s/1GF45B0QepApH8JbRIOLY7w?pwd1016 开启idea的项目&#xff0c;先…

一个适合练手的接口测试实战项目——慕慕生鲜

前言 最近很多粉丝找小月要接口测试项目练练手&#xff0c;看看实力&#xff01;今天&#xff0c;它来了 慕慕生鲜&#xff0c;完整版&#xff0c;文末有福利&#xff01; 干货来咯&#xff0c;收藏好&#xff01; 1. 接口测试需求分析 常见接口文档提供的两种方式 ①wor…

四川竹哲电商:抖店怎么修改经营类目?

抖店是抖音推出的一款电商工具&#xff0c;通过抖店可以帮助商家在抖音上开展经营活动。在抖店平台上&#xff0c;商家需要选择经营类目&#xff0c;以便在相应的领域展示商品和提供服务。然而&#xff0c;有时候商家可能需要修改经营类目&#xff0c;以适应经营策略调整或扩大…

Linux进程(三)--进程切换命令行参数

继上回书Linux进程概念&#xff08;二&#xff09;--进程状态&进程优先级&#xff0c;我们在了解了Linux进程状态和优先级的概念&#xff0c;初步掌握了进程状态的相关知识&#xff0c;最终&#xff0c;我们以Linux进程的优先级&#xff0c;引出了一些其他的概念&#xff1…

Python---循环---while循环

Python中的循环 包括 while循环与for循环&#xff0c;本文以while循环为主。 Python中所有的知识点&#xff0c;都是为了解决某个问题诞生的&#xff0c;就好比中文的汉字&#xff0c;每个汉字都是为了解决某种意思表达而诞生的。 1、什么是循环 现实生活中&#xff0c;也有…

(N-128)基于springboot,vue酒店管理系统

开发工具&#xff1a;IDEA 服务器&#xff1a;Tomcat9.0&#xff0c; jdk1.8 项目构建&#xff1a;maven 数据库&#xff1a;mysql5.7 系统分前后台&#xff0c;项目采用前后端分离 前端技术&#xff1a;vueelementUI 服务端技术&#xff1a;springbootmybatis 本系统功…

vueday02——使用NTableData

1.下载naivueui 2.按需导入&#xff0c;不要全局导入 注意不要导入错误组件或者写错组件名称 import { NDataTable } from naive-ui 3.定义表头和数据&#xff01;&#xff01;&#xff01; n-data-table标签必须要使用数据和数据 少一个都不能正确渲染&#xff01;&#xf…

ATA-M4功率放大器都有哪些具体特点及优势

我们的ATA-M系列功率放大器&#xff0c;旨在将它打造为超越ATA-L系列水声功率放大器高频限制的系列产品。其中ATA-M4功率放大器是一款理想的单通道功率放大器。最大输出345Vrms电压&#xff0c;400VA功率&#xff0c;可驱动0~100%的阻性或非阻性负载。输出阻抗匹配多个档位可选…

GCC优化相关

文章目录 优化选项博文链接 单独设置某段代码优化等级博文链接 优化选项 -O/-O0:无优化(默认)-O1:使用能减少目标文件大小以及执行时间并且不会使编译时间明显增加的优化。该模式在编译大型程序的时候会花费更多的时间和内存。在-O1 下&#xff0c;编译会尝试减少代码体积和代码…

内核初始化的过程

内核的启动从入口函数 start_kernel() 开始。在 init/main.c 文件中&#xff0c;start_kernel 相当于内核的 main 函数。打开这个函数&#xff0c;你会发现&#xff0c;里面是各种各样初始化函数 XXXX_init。 在操作系统里面&#xff0c;先要有个创始进程&#xff0c;有一行指令…

JAVA发送消息到RabbitMq

项目中&#xff0c;作为生产者自定义消息发送到RabbitMq。 1.引入rmq依赖 <!-- rabbitmq 依赖 --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.9.0</version></dependen…

SystemVerilog Assertions应用指南 Chapter1.31 在属性中使用形参

可以用定义形参( formal arguments)的方式来重用一些常用的属性。属性“arb”使用了4个形参,并且根据这些形参进行检验。其中还定义了特定的时钟。SVA允许使用属性的形参来定义时钟。这样,属性可以应用在使用不同时钟的相似设计模块中。同样的,时序延迟也可以参数化,这使得属性…

cocos creator 在网页中调试的时候直接代码调试方法

cocos creator 在网页中调试的时候直接代码调试方法