Flink学习连载文章8--时间语义

Time的分类 (时间语义)

EventTime:事件(数据)时间,是事件/数据真真正正发生时/产生时的时间

IngestionTime:摄入时间,是事件/数据到达流处理系统的时间

ProcessingTime:处理时间,是事件/数据被处理/计算时的系统的时间

EventTime的重要性

假设,你正在去往地下停车场的路上,并且打算用手机点一份外卖。选好了外卖后,你就用在线支付功能付款了,这个时候是11点59分(EventTime)。恰好这时,你走进了地下停车库,而这里并没有手机信号。因此外卖的在线支付并没有立刻成功,而支付系统一直在Retry重试“支付”这个操作。
当你找到自己的车并且开出地下停车场的时候,已经是12点01分了(ProcessingTime)。这个时候手机重新有了信号,手机上的支付数据成功发到了外卖在线支付系统,支付完成。在上面这个场景中你可以看到,
支付数据的事件时间是11点59分,而支付数据的处理时间是12点01分问题:
如果要统计12之前的订单金额,那么这笔交易是否应被统计?
答案:
应该被统计,因为该数据的真真正正的产生时间为11点59分,即该数据的事件时间为11点59分,
事件时间能够真正反映/代表事件的本质! 所以一般在实际开发中会以事件时间作为计算标准。还可以通过钉钉打卡、饭卡机 等 举例子。一条错误日志的内容为:
2020-11-11 23:59:58 error NullPointExcep --事件时间
进入Flink的时间为2020-11-11 23:59:59      --摄入时间
到达Window的时间为2020-11-12 00:00:01     --处理时间
问题:
对于业务来说,要统计每天的的故障日志个数,哪个时间是最有意义的?
答案:
EventTime事件时间,因为bug真真正正产生的时间就是事件时间,只有事件时间才能真正反映/代表事件的本质! 

总结:
1.事件时间确实重要, 因为它能够代表事件/数据的本质,是事件/数据真真正正发生/产生的时间
2.按照事件时间进去处理/计算,会存在一定的难度, 因为数据可能会因为网路延迟等原因, 发生乱序或延迟到达, 那么最后的计算结果就有可能错误或数据丢失
3.需要有技术来解决上面的问题,使用Watermark技术来解决! 

Watermark是什么?-水印,水位线

为什么会有WaterMark?

当flink 以 EventTime 模式处理流数据时,它会根据数据里的时间戳来处理基于时间的算子。但是由于网络、分布式等原因,会导致数据乱序的情况。如下图所示

只要使用event time,就必须使用watermark,在上游指定,比如:source、map算子后.

Watermark的核心本质可以理解成一个延迟触发机制。

我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的,虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络等原因,导致乱序的产生,所谓乱序,就是指Flink接收到的事件的先后顺序不是严格按照事件的Event Time顺序排列的。

Watermark就是给数据额外添加的一列时间戳!

Watermark = 当前最大的事件时间 - 最大允许的延迟时间(或最大允许的乱序度时间)

假如明天出去玩,09:00集合,最多允许迟到10分钟。
08:50 胜赛来了    08:50 - 10 = 08:40 
09:05 步迅来了    09:05 - 10 = 08:55 
09:35 青林来了    watermark = 09:35 - 10 = 09:25
能否上到车上的条件是:watermark <= 时间点

Watermark能解决什么问题,如何解决的?

有了Watermark 就可以在一定程度上解决数据乱序或延迟达到问题!

不添加watermark ,窗口如何触发:

1)窗口有数据

2)窗口的结束时间到了。

班车:到了时间点立即发车,来了数据也不要。

有了Watermark就可以根据Watermark来决定窗口的触发时机,满足下面的条件才触发:

1.窗口有数据

2.Watermark >= 窗口的结束时间

满足以上条件则触发窗口计算!

以前窗口触发:系统时间到了窗口结束时间就触发

现在窗口触发:Watermark >= 窗口的结束时间

而Watermark = 当前最大的事件时间 - 最大允许的延迟时间(或最大允许的乱序度时间)

就意味着, 通过Watermark改变了窗口的触发时机了, 那么接下来我们看如何改变的/如何解决前面的问题的

需要记住:

Watermark = 当前最大的事件时间 - 最大允许的延迟时间(或最大允许的乱序度时间)

窗口触发时机 : Watermark >= 窗口的结束时间

水印(watermark)就是一个时间戳,Flink可以给数据流添加水印,可以理解为:收到一条消息后,额外给这个消息添加了一个时间字段,这就是添加水印,一般人为添加的消息的水印都会比当前消息的事件时间一些

窗口是否关闭,按照水印时间来判断,但原有事件时间不会被修改,窗口的边界依旧是事件时间来决定。

  • 水印并不会影响原有Eventtime
  • 当数据流添加水印后,会按照水印时间来触发窗口计算
  • 一般会设置水印时间,比Eventtime小一些(一般几秒钟)
  • 当接收到的水印时间>= 窗口的endTime且窗口内有数据,则触发计算

水印(水印时间)的计算:事件时间– 设置的水印长度 = 水印时间

比如,事件时间是10分30秒, 水印长度是2秒,那么水印时间就是10分28秒

Watermark图解原理

总结:

Watermark 是一个单独计算出来的时间戳
Watermark = 当前最大的事件时间 - 最大允许的延迟时间(乱序度)
Watermark可以通过改变窗口的触发时机 在 一定程度上解决数据乱序或延迟达到的问题
Watermark >= 窗口结束时间 时 就会触发窗口计算(窗口中得有数据)
延迟或乱序严重的数据还是丢失, 但是可以通过调大 最大允许的延迟时间(乱序度) 来解决, 或 使用后面要学习的侧道输出流来单独收集延迟或乱序严重的数据,保证数据不丢失!

多并行度的水印触发

在多并行度下,每个并行有一个水印

比如并行度是6,那么程序中就有6个watermark

分别属于这6个并行度(线程)

那么,触发条件以6个水印中最小的那个为准

比如, 有个窗口是0-5

其中5个并行度的水印都超过了5

但有一个并行度的水印是3

那么,不管另外5个并行度中的水印达到了多大,都不会触发

因为6个并行度中的6个水印,最小的是3,不满足大于等于窗口结束5的条件

在测试水印的时候,记得把并行度设置为1 ,好看结果,否则,结果不太容易看出来。

Watermark代码演示

需求

实时模拟生成订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)

要求每隔5s,计算5秒内,每个用户的订单总金额

并添加Watermark来解决一定程度上的数据延迟和数据乱序问题。

不使用水印的时候【不能使用eventtime时间语义】,进行开发:

package com.bigdata.day05;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;import java.time.Duration;
import java.util.Random;
import java.util.UUID;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2023-11-23 10:07:21** 实时模拟生成订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)* 要求每隔5s,计算5秒内,每个用户的订单总金额* 并添加Watermark来解决一定程度上的数据延迟和数据乱序问题。**/public class _01WatermarkDemo {@Data  // set get toString@AllArgsConstructor@NoArgsConstructorpublic static class OrderInfo2{private String orderId;private int uid;private int money;private long timeStamp;}public static class MySource implements SourceFunction<OrderInfo2> {boolean flag = true;@Overridepublic void run(SourceContext ctx) throws Exception {// 源源不断的产生数据Random random = new Random();while(flag){OrderInfo2 orderInfo = new OrderInfo2();orderInfo.setOrderId(UUID.randomUUID().toString());orderInfo.setUid(random.nextInt(3));orderInfo.setMoney(random.nextInt(101));orderInfo.setTimeStamp(System.currentTimeMillis());ctx.collect(orderInfo);Thread.sleep(1000);// 间隔1s}}// source 停止之前需要干点啥@Overridepublic void cancel() {flag = false;}}public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSource<OrderInfo2> orderSourceStream = env.addSource(new MySource());//3. transformation-数据处理转换// 每个用户的订单总额KeyedStream<OrderInfo2, Integer> keyedStream = orderDSWithWatermark.keyBy(orderInfo2 -> orderInfo2.getUid());SingleOutputStreamOperator<OrderInfo2> result1 = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum("money");result1.print();//4. sink-数据输出//5. execute-执行env.execute();}
}

假如出现如下错误:

Exception in thread "main" org.apache.flink.api.common.typeutils.CompositeType$InvalidFieldReferenceException: Cannot reference field by field expression on GenericType<com.bigdata.day05.OrderInfo2>Field expressions are only supported on POJO types, tuples, and case classes. (See the Flink documentation on what is considered a POJO.)at org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:224)at org.apache.flink.streaming.api.functions.aggregation.SumAggregator.<init>(SumAggregator.java:53)at org.apache.flink.streaming.api.datastream.WindowedStream.sum(WindowedStream.java:688)at com.bigdata.day05._01WatermarkDemo.main(_01WatermarkDemo.java:103)

说明这个pojo 必须是public 的,否则不解析。

修正过的代码:

package com.bigdata.time;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data  // set get toString
@AllArgsConstructor
@NoArgsConstructor
public class OrderInfo{private String orderId;private int uid;private int money;private long timeStamp;
}
package com.bigdata.time;import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.Date;
import java.util.Random;
import java.util.UUID;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2024-11-26 10:30:28**/class MySource extends RichSourceFunction<OrderInfo> {boolean flag = true;@Overridepublic void run(SourceContext<OrderInfo> ctx) throws Exception {while(flag){OrderInfo orderInfo = new OrderInfo();Random random = new Random();int userId = random.nextInt(10);int money = random.nextInt(100);long timeStamp = System.currentTimeMillis() - random.nextInt(3000);orderInfo.setOrderId(UUID.randomUUID().toString());orderInfo.setUid(userId);orderInfo.setMoney(money);orderInfo.setTimeStamp(timeStamp);ctx.collect(orderInfo);Thread.sleep(20);}}@Overridepublic void cancel() {flag = false;}
}
public class _01_OrderDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);//2. source-加载数据DataStreamSource<OrderInfo> streamSource = env.addSource(new MySource());//3. transformation-数据处理转换streamSource.keyBy(new KeySelector<OrderInfo, Integer>() {@Overridepublic Integer getKey(OrderInfo orderInfo) throws Exception {return orderInfo.getUid();}}).window(TumblingProcessingTimeWindows.of(Time.seconds(5)))//.sum("money").print();.apply(new WindowFunction<OrderInfo, String, Integer, TimeWindow>() {@Overridepublic void apply(Integer userId, TimeWindow window, Iterable<OrderInfo> input, Collector<String> out) throws Exception {String beginTime = DateFormatUtils.format(window.getStart(), "yyyy-MM-dd HH:mm:ss");String endTime = DateFormatUtils.format(window.getEnd(), "yyyy-MM-dd HH:mm:ss");int sum = 0;for (OrderInfo orderInfo : input) {sum += orderInfo.getMoney();}out.collect(beginTime+","+endTime+","+userId +","+sum);}}).print();//4. sink-数据输出//5. execute-执行env.execute();}
}

需求升级:

实时模拟生成订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)

* 要求每隔5s,计算5秒内,每个用户的订单总金额

* 并添加Watermark来解决一定程度上的数据延迟和数据乱序问题。

假如你添加了 eventTime 缺没有添加水印的代码,会报如下错误:

Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?at org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:83)at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:302)

代码演示-开发版

生成 Watermark | Apache Flink

package com.bigdata.time;import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2024-11-26 10:30:28**/public class _02_OrderDemoWithWaterMark {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);//2. source-加载数据DataStreamSource<OrderInfo> streamSource = env.addSource(new MySource());//3. transformation-数据处理转换streamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<OrderInfo>() {// long 是时间戳吗?是秒值还是毫秒呢?年月日时分秒的的字段怎么办呢?@Overridepublic long extractTimestamp(OrderInfo orderInfo, long recordTimestamp) {// 这个方法的返回值是毫秒,所有的数据只要不是这个毫秒值,都需要转换为毫秒return orderInfo.getTimeStamp();}})).keyBy(new KeySelector<OrderInfo, Integer>() {@Overridepublic Integer getKey(OrderInfo orderInfo) throws Exception {return orderInfo.getUid();}}).window(TumblingEventTimeWindows.of(Time.seconds(5)))//.sum("money").print();.apply(new WindowFunction<OrderInfo, String, Integer, TimeWindow>() {@Overridepublic void apply(Integer userId, TimeWindow window, Iterable<OrderInfo> input, Collector<String> out) throws Exception {String beginTime = DateFormatUtils.format(window.getStart(), "yyyy-MM-dd HH:mm:ss");String endTime = DateFormatUtils.format(window.getEnd(), "yyyy-MM-dd HH:mm:ss");int sum = 0;for (OrderInfo orderInfo : input) {sum += orderInfo.getMoney();}out.collect(beginTime+","+endTime+","+userId +","+sum);}}).print();//4. sink-数据输出//5. execute-执行env.execute();}
}

通过静态方法forBoundedOutOfOrderness提供,入参接收一个Duration类型的时间间隔,也就是我们可以接受的最大的延迟时间.使用这种延迟策略的时候需要我们对数据的延迟时间有一个大概的预估判断。

WatermarkStrategy#forBoundedOutOfOrderness(Duration maxOutOfOrderness)

我们实现一个延迟3秒的固定延迟水印,可以这样做:

DataStream dataStream = ...... ;
dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)));

Flink对于迟到数据的处理

水印:对于迟到数据不长

allowedLateness: 迟到时间很长

侧道输出:对于迟到时间特别长

对于延迟数据的理解:

水印机制(水位线、watermark)机制可以帮助我们在短期延迟下,允许乱序数据的到来。

这个机制很好的处理了那些因为网络等情况短期延迟的数据,让窗口等它们一会儿。

但是水印机制无法长期的等待下去,因为水印机制简单说就是让窗口一直等在那里,等达到水印时间才会触发计算和关闭窗口

这个等待不能一直等,因为会一直缓着数据不计算。

一般水印也就是几秒钟最多几分钟而已(看业务)

那么,在现实世界中,延迟数据除了有短期延迟外,长期延迟也是很常见的。

比如:

l 客户端断网,等了好几个小时才恢复

l 车联网系统进入隧道后没有信号无法上报数据

l 手机欠费没有网

等等,这些场景下数据的迟到就不是简单的网络堵塞造成的几秒延迟了

而是小时、天级别的延迟

对于水印来说,这样的长期延迟数据是无法很好处理的。

那么有没有什么办法去处理这些长期延迟的数据呢?让其可以找到其所属的窗口正常完成计算,哪怕晚了几个小时。

这个场景的解决方式就是:延迟数据处理机制(allowedLateness方法)

水印:乱序数据处理(时间很短的延迟)

延迟处理:长期延迟数据的处理机制。

延迟数据的处理:

waterMark和Window机制解决了流式数据的乱序问题,对于因为延迟而顺序有误的数据,可以根据eventTime进行业务处理,对于延迟的数据Flink也有自己的解决办法,

主要的办法是给定一个允许延迟的时间,在该时间范围内仍可以接受处理延迟数据

设置允许延迟的时间是通过allowedLateness(lateness: Time)设置

保存延迟数据则是通过sideOutputLateData(outputTag: OutputTag[T])保存

获取延迟数据是通过DataStream.getSideOutput(tag: OutputTag[X])获取

1)allowedLateness(lateness: Time)

当我们对流设置窗口后得到的WindowedSteam对象就可以使用allowedLateness方法

该方法传入一个Time值,设置允许的长期延迟(迟到)的时间。

和watermark不同。

未设置allowedLateness(为0),当watermark满足条件,会触发窗口的 执行 + 关闭

当设置了allowedLateness,当watermark满足条件后,只会触发窗口的执行,不会触发窗口关闭

也就是,watermark满足条件后会正常触发窗口计算,将已有的数据完成计算。

但是,不会关闭窗口。如果在allowedLateness允许的时间内仍有这个窗口的数据进来,那么每进来一条,会和已经计算过的(被watermark触发的)数据一起在计算一次

水印:短期延迟,达到条件后触发计算并且关闭窗口(触发+关闭同时进行)

水印+allowedLateness : 短期延迟+ 等待长期延迟效果, 达到水印条件后,会触发窗口计算,但是不关闭窗口。事件时间延迟达到水印+allowedLateness之和后会关闭窗口。

2) 侧输出-SideOutput

Flink 通过watermark在短时间内允许了乱序到来的数据

通过延迟数据处理机制,可以处理长期迟到的数据。

但总有那么些数据来的晚的太久了。允许迟到1天的设置,它迟到了2天才来。

对于这样的迟到数据,水印无能为力,设置allowedLateness也无能为力,那对于这样的数据Flink就只能任其丢掉了吗?

不会,Flink的两个迟到机制尽量确保了数据不会错过了属于他们的窗口,但是真的迟到太久了,Flink也有一个机制将这些数据收集起来

保存成为一个DataStream,然后,交由开发人员自行处理。

那么这个机制就叫做侧输出机制(Side Output)

侧输出机制:可以将错过水印又错过allowedLateness允许的时间的数据,单独的存放到一个DataStream中,然后开发人员可以自定逻辑对这些超级迟到数据进行处理。

处理主要使用两个方式:

对窗口对象调用sideOutputLateData(OutputTag outputTag)方法,将数据存储到一个地方

对DataStream对象调用getSideOutput(OutputTag outputTag)方法,取出这些被单独处理的数据的DataStream

注意,取到的是一个DataStream,这意味着你可以对这些超级迟到数据继续写 如keyBy, window等处理逻辑。


使用方式:
先定义OutputTag对象(注意,必须new一个匿名内部类形式的OutputTag对象的实例)
然后调用sideOutputLateData方法
// side output   OutputTag对象必须是匿名内部类的形式创建出来, 本质上得到的是OutputTag对象的一个匿名子类
OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("side output"){};
WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow> sideOutputLateData =allowedLateness.sideOutputLateData(outputTag);
用法:
DataStream<Tuple2<String, Long>> sideOutput = result.getSideOutput(outputTag);
// 对得到的保存超级迟到数据的DataStream进行处理
sideOutput.print("late>>>");

代码演示-完美版/企业版

前面的案例中已经可以使用Watermark 来解决一定程度上的数据延迟和数据乱序问题

但是对于延迟/迟到/乱序严重的数据还是会丢失,所以接下来

使用Watermark + AllowedLateness + SideOutput ,即使用侧道输出机制来单独收集延迟/迟到/乱序严重的数据,避免数据丢失!

package com.bigdata.day05;import com.bigdata.day04.OrderInfo;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;
import java.util.Random;
import java.util.UUID;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2024-05-15 17:06:04**/class MyOrderSource2 implements SourceFunction<OrderInfo> {@Overridepublic void run(SourceContext<OrderInfo> ctx) throws Exception {Random random = new Random();while(true){OrderInfo orderInfo = new OrderInfo();orderInfo.setOrderId(UUID.randomUUID().toString().replace("-",""));// 在这个地方可以模拟迟到数据long orderTime = System.currentTimeMillis() - 1000*random.nextInt(100);orderInfo.setOrdertime(orderTime);int money = random.nextInt(10);System.out.println("订单产生的时间:"+ DateFormatUtils.format(orderTime,"yyyy-MM-dd HH:mm:ss")+",金额:"+money);orderInfo.setMoney(money);orderInfo.setUserId(random.nextInt(2));ctx.collect(orderInfo);Thread.sleep(500);}}@Overridepublic void cancel() {}
}
public class Demo01 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);// 每隔五秒统计每个用户的前面5秒的订单的总金额//2. source-加载数据DataStreamSource<OrderInfo> streamSource = env.addSource(new MyOrderSource2());//-2.告诉Flink最大允许的延迟时间/乱序时间为多少SingleOutputStreamOperator<OrderInfo> orderDSWithWatermark = streamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3))//-3.告诉Flink哪一列是事件时间.withTimestampAssigner((order, time) -> order.getOrdertime()));OutputTag<OrderInfo> outputTag = new OutputTag<OrderInfo>("side output"){};//3. transformation-数据处理转换SingleOutputStreamOperator<String> result = orderDSWithWatermark.keyBy(orderInfo -> orderInfo.getUserId()).window(TumblingEventTimeWindows.of(Time.seconds(5))).allowedLateness(Time.seconds(4)).sideOutputLateData(outputTag).apply(new WindowFunction<OrderInfo, String, Integer, TimeWindow>() {@Overridepublic void apply(Integer key,  // 代表分组key值     五旬老太守国门TimeWindow window, // 代表窗口对象Iterable<OrderInfo> input, // 分组过之后的数据 [1,1,1,1,1]Collector<String> out  // 用于输出的对象) throws Exception {SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");long start = window.getStart();long end = window.getEnd();int sum = 0;// 专门存放迟到的订单时间for (OrderInfo orderInfo : input) {sum += orderInfo.getMoney();}out.collect(key + ",窗口开始:" + dateFormat.format(new Date(start)) + ",结束时间:" + dateFormat.format(new Date(end)) + "," + sum);//out.collect(key+",窗口开始:"+start+",结束时间:"+end+","+sum);}});result.print("流中的数据,包含迟到的数据:");result.getSideOutput(outputTag).print("严重迟到的数据:");//4. sink-数据输出//5. execute-执行env.execute();}
}
订单产生的时间:2024-05-16 11:19:00,金额:1
订单产生的时间:2024-05-16 11:18:13,金额:3
严重迟到的数据:> f96d34c438ce400eb21a25328fe772ee,1,3,2024-05-16 11:18:13
订单产生的时间:2024-05-16 11:19:10,金额:3
2024-05-16 11:19:00
流中的数据,包含迟到的数据:> 1,窗口开始:2024-05-16 11:19:00,结束时间:2024-05-16 11:19:05,1,迟到的订单时间:
订单产生的时间:2024-05-16 11:18:19,金额:8
严重迟到的数据:> cf85339600c647c99856841021466c5d,1,8,2024-05-16 11:18:19
订单产生的时间:2024-05-16 11:18:19,金额:9
严重迟到的数据:> f087ee9c9eaa4e3f9eac06a907b47fb4,1,9,2024-05-16 11:18:19
订单产生的时间:2024-05-16 11:17:48,金额:3
严重迟到的数据:> 22f48bb693874a99b80177f6764f0912,0,3,2024-05-16 11:17:48
订单产生的时间:2024-05-16 11:19:23,金额:1
2024-05-16 11:19:10
流中的数据,包含迟到的数据:> 1,窗口开始:2024-05-16 11:19:10,结束时间:2024-05-16 11:19:15,3,迟到的订单时间:
订单产生的时间:2024-05-16 11:19:19,金额:7
2024-05-16 11:19:19
流中的数据,包含迟到的数据:> 1,窗口开始:2024-05-16 11:19:15,结束时间:2024-05-16 11:19:20,7,迟到的订单时间:
订单产生的时间:2024-05-16 11:18:25,金额:2
严重迟到的数据:> 9b91cfd413784e4da9fb7f16b68186d0,0,2,2024-05-16 11:18:25
订单产生的时间:2024-05-16 11:18:48,金额:3
严重迟到的数据:> 62bdd621dd4d43b2b9f401949c1c5686,1,3,2024-05-16 11:18:48
订单产生的时间:2024-05-16 11:18:38,金额:1
严重迟到的数据:> e6fe834c88d043ef94b66853ad67dc46,1,1,2024-05-16 11:18:38
订单产生的时间:2024-05-16 11:18:39,金额:5
严重迟到的数据:> 472b35fc32744c39990d13bd490ae131,1,5,2024-05-16 11:18:39
订单产生的时间:2024-05-16 11:18:19,金额:0
严重迟到的数据:> 49270e8cf00445f38a4fbcfebedfdc0a,0,0,2024-05-16 11:18:19
订单产生的时间:2024-05-16 11:19:07,金额:8
严重迟到的数据:> a337af0e6f1c46e48e2ed2ec99d6dc53,0,8,2024-05-16 11:19:07
订单产生的时间:2024-05-16 11:19:22,金额:8
订单产生的时间:2024-05-16 11:19:01,金额:3
严重迟到的数据:> 5bf845744c9d486f97fbd8106deb22bb,0,3,2024-05-16 11:19:01
订单产生的时间:2024-05-16 11:18:42,金额:2
严重迟到的数据:> f861897e49a54b589863efd6866ce61f,0,2,2024-05-16 11:18:42
订单产生的时间:2024-05-16 11:18:15,金额:7
严重迟到的数据:> 37a8d94dc8b94854963ddcbefa3d00dc,0,7,2024-05-16 11:18:15
订单产生的时间:2024-05-16 11:17:56,金额:6
严重迟到的数据:> ab81cf409c604bb398d43b15f59d7e53,1,6,2024-05-16 11:17:56
订单产生的时间:2024-05-16 11:18:24,金额:3
严重迟到的数据:> f85598988f4b43599e719b873d930440,1,3,2024-05-16 11:18:24
订单产生的时间:2024-05-16 11:17:52,金额:2
严重迟到的数据:> 03ade6f3972d441289bd745f979c961a,1,2,2024-05-16 11:17:52
订单产生的时间:2024-05-16 11:19:20,金额:9
订单产生的时间:2024-05-16 11:18:07,金额:1
严重迟到的数据:> d8b8992ea2b74a5fb10bffb198917c8f,0,1,2024-05-16 11:18:07
订单产生的时间:2024-05-16 11:18:34,金额:2
严重迟到的数据:> a8855d90701449588efc4dfd48df1dd3,0,2,2024-05-16 11:18:34
订单产生的时间:2024-05-16 11:19:11,金额:3
严重迟到的数据:> a7e245864b2c4c03b463b121277309eb,0,3,2024-05-16 11:19:11
订单产生的时间:2024-05-16 11:18:11,金额:9
严重迟到的数据:> cb99f1b5c1e24e5bb6ccacb9f0f9ea78,0,9,2024-05-16 11:18:11
订单产生的时间:2024-05-16 11:18:36,金额:7Process finished with exit code 130

虽然我们添加了延迟的效果,就是说侧道输出,侧道输出不能触发窗口的执行,窗口的执行只能通过水印时间触发 ,允许迟到的数据,不放入到当前窗口中,而是作为一个触发条件看到,它需要放入到它对应的窗口中。

只考虑 1 个并行度的问题

订单发生的真实事件:窗口5秒,间隔5秒,允许迟到 3秒 最晚允许迟到4秒

10:44:00 第一个区间就应该是10:44:00 10:44:05

10:44:01

10:44:02

10:44:03

10:44:04

10:44:05

10:44:07 第一个区间就应该是10:44:05 10:44:10

10:44:22 第一个区间就应该是10:44:20 10:44:25

10:44:30

10:44:28

10:44:20

通过上面这个图可以知道,44:07没有办法触发00~05的执行,但是07不放入00~05区间,而是放入10:44:05 10:44:10

44:22 一个数据触发了两个区间的执行 00~05 05~10

假如有一个订单时44:10产生的,放入哪个区间?应该放入10~15这个区间

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/481357.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

自定义类型: 结构体、枚举 、联合

目录 结构体 结构体类型的声明 匿名结构体 结构的自引用 结构体变量的定义和初始化 结构体成员变量的访问 结构体内存对齐 结构体传参 位段 位段类型的声明 位段的内存分配 位段的跨平台问题 位段的应用 枚举 枚举类型的定义 枚举的优点 联合体(共用体) 联合…

【WPS】【EXCEL】将单元格中字符按照分隔符拆分按行填充到其他单元格

问题&#xff1a;实现如下图的效果 解答&#xff1a; 一、函数 IFERROR(TRIM(MID(SUBSTITUTE($A$2,",",REPT(" ",LEN($A$2))),(ROW(A1)-1)*LEN($A$2)1,LEN($A$2))),"") 二、在单元格C2中填写如下函数 三、全选要填充的单元格并且按CTRLD 函数…

BiGRU:双向门控循环单元在序列处理中的深度探索

一、引言 在当今的人工智能领域&#xff0c;序列数据的处理是一个极为重要的任务&#xff0c;涵盖了自然语言处理、语音识别、时间序列分析等多个关键领域。循环神经网络&#xff08;RNN&#xff09;及其衍生结构在处理序列数据方面发挥了重要作用。然而&#xff0c;传统的 RN…

卸载 Archiconda

一、卸载创建的虚拟环境 # 1.查看所创建的虚拟环境 conda env list# 2.一 一删除创建的虚拟环境&#xff0c;name 替换为自己创建的虚拟环境的名字 conda remove --name name --all二、卸载archidonda rm -rf ~/archiconda3三、删除conda的环境变量 外链图片转存失败,源站可…

【Java基础面试题001】Java中序列化和反序列化是什么?

在Java中&#xff0c;序列化和反序列化是用于将对象的状态保存和恢复的重要机制。 序列化 是将Java对象转换为字节流的过程&#xff0c;这样Java对象才可以网络传输、持久化存储还有缓存。Java提供了java.io.Serializable接口来支持序列化&#xff0c;只要类实现了这个接口&a…

前端学习week8——vue.js

Vue.js 基础 Vue 核心概念&#xff1a;了解 Vue 的响应式系统、组件、指令&#xff08;如 v-if、v-for、v-model 等&#xff09;。Vue 项目管理&#xff1a;学习 Vue CLI 或 Vite&#xff0c;掌握项目创建、管理和打包。推荐学习顺序&#xff1a;Vue 基础 → 组件化开发 → Vu…

Excel如何限制单元格内可选择的下拉框内容?

先选择想要的表格区域&#xff1a; 如果想要选中如下所示&#xff1a;C2格子及其下面所有的格子&#xff08;则&#xff1a;点击一下C2格子&#xff0c;然后按一下键盘&#xff1a;SHIFT CTRL ↓&#xff09; 然后在【sheet2】表&#xff0c;先填写好下拉框可选择的内容&am…

uniapp实现列表页面,实用美观

咨询列表页面 组件 <template><view><view class"news_item" click"navigator(item.id)" v-for"item in list" :key"item.id"><image :src"item.img_url"></image><view class"righ…

Linux学习笔记11 系统启动初始化,服务和进程管理(下)

前文 前文介绍了系统启动初始化程序&#xff0c;介绍了systemd的基础知识。这里主要看一下我们systemd的单元管理和常用的命令以及示例。 Linux学习笔记10 系统启动初始化&#xff0c;服务和进程管理&#xff08;上&#xff09;-CSDN博客 systemd单元管理 启动服务 这很常…

哈希表,哈希桶的实现

哈希概念 顺序结构以及平衡树中&#xff0c;元素关键码与其存储位置之间没有对应的关系&#xff0c;因此在查找一个元素 时&#xff0c;必须要经过关键码的多次比较。顺序查找时间复杂度为O(N)&#xff0c;平衡树中为树的高度&#xff0c;即 O(logN)&#xff0c;搜索的效率取决…

Maven install java heap space

Maven install java heap space 打包报错 Maven install java heap space 解决&#xff1a; vm option: -Xms1024m -Xmx1024m如果 vm配置了&#xff0c;还是一样报错&#xff0c;就重新选择JRE看看是否正确&#xff0c;idea会默认自己的环境&#xff0c;导致设置vm无效&…

aws(学习笔记第十五课) 如何从灾难中恢复(recover)

aws(学习笔记第十五课) 如何从灾难中恢复 学习内容&#xff1a; 使用CloudWatch对服务器进行监视与恢复区域(region)&#xff0c;可用区(available zone)和子网(subnet)使用自动扩展(AutoScalingGroup) 1. 使用CloudWatch对服务器进行监视与恢复 整体架构 这里模拟Jenkins Se…

【Maven】依赖管理

4. Maven的依赖管理 在 Java 开发中&#xff0c;项目的依赖管理是一项重要任务。通过合理管理项目的依赖关系&#xff0c;我们可以有效的管理第三方库&#xff0c;模块的引用及版本控制。而 Maven 作为一个强大的构建工具和依赖管理工具&#xff0c;为我们提供了便捷的方式来管…

go语言的成神之路-筑基篇-中间件

目录 单个Gin中间件 中间件简要概述 一、中间件的定义&#xff1a; 二、中间件的使用&#xff1a; 效果展示 多个Gin中间件 示例 Abort阻止后续处理函数 执行流程图 return直接返回 执行流程图 全局注册中间件 注意事项 单个Gin中间件 中间件简要概述 在 gin 框架中…

Xilinx PCIe高速接口入门实战(一)

引言&#xff1a;本文对Xilinx 7 Series Intergrated Block for PCI Express PCIe硬核IP进行简要介绍&#xff0c;主要包括7系列FPGA PCIe硬核资源支持、三IP硬核差异、PCIe硬核资源利用等相关内容。 1. 概述 1.1 7系列FPGA PCIe硬件资源支持 7系列FPGA对PCIe接口最大支持如…

【第三讲】Spring Boot 3.4.0 新特性详解:增强的配置属性支持

Spring Boot 3.4.0 版本在配置属性的支持上进行了显著增强&#xff0c;使得开发者能够更灵活地管理和使用应用程序的配置。新的特性包括对配置属性的改进、类型安全增强、以及对环境变量的更好支持。这些改进旨在提升开发效率和代码可读性&#xff0c;同时简化配置过程。本文将…

如何使用 Chrome 无痕浏览模式访问网站?

无痕浏览&#xff08;Incognito Mode&#xff09;是 Google Chrome 浏览器提供的一种隐私保护功能&#xff0c;它允许用户在一个独立的会话中浏览网页&#xff0c;而不会记录用户的浏览历史、下载历史、表单数据等。这对于希望保护个人隐私或进行临时性匿名浏览的用户来说非常有…

拥抱 OpenTelemetry:阿里云 Java Agent 演进实践

作者&#xff1a;陈承 背景 在 2018 年的 2 月&#xff0c;ARMS Java Agent 的第一个版本正式发布&#xff0c;为用户提供无侵入的的可观测数据采集服务。6 年后的今天&#xff0c;随着软件技术的迅猛发展、业务场景的逐渐丰富、用户规模的快速增长&#xff0c;我们逐渐发现过…

AI数据分析工具(二)

豆包-免费 优点 强大的数据处理能力&#xff1a; 豆包能够与Excel无缝集成&#xff0c;支持多种数据类型的导入&#xff0c;包括文本、数字、日期等&#xff0c;使得数据整理和分析变得更加便捷。豆包提供了丰富的数据处理功能&#xff0c;如数据去重、填充缺失值、转换格式等…

C/C++ 数据结构与算法 【时间复杂度和空间复杂度】【日常学习,考研必备】

一、时间复杂度 定义&#xff1a;时间复杂度描述了算法运行时间随输入大小增长而增长的趋势。它主要关注的是算法中最耗时的部分&#xff0c;并忽略常数因子、低阶项等细节。表示方法&#xff1a;通常使用大O符号&#xff08;Big O notation&#xff09;来表示时间复杂度。例如…