flink 最后一个窗口一直没有新数据,窗口不关闭问题
- 自定义实现 WatermarkStrategy接口
自定义实现 WatermarkStrategy接口
代码:
public static class WatermarkDemoFunction implements WatermarkStrategy<JSONObject>{private Tuple2<Long,Boolean> state = Tuple2.of(0L,true);@Overridepublic WatermarkGenerator<JSONObject> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<JSONObject>() {private long maxWatermark;@Overridepublic void onEvent(JSONObject waterSensor, long l, WatermarkOutput watermarkOutput) {maxWatermark = Math.max(maxWatermark,waterSensor.getLong("ts"));state.f0 = System.currentTimeMillis();System.out.println("maxWatermark is " + maxWatermark);state.f1 = false;}@Overridepublic void onPeriodicEmit(WatermarkOutput watermarkOutput) {//乱序时间long outOfTime = 3000L;if (maxWatermark - outOfTime <=0){} else {// 10s内没有数据则关闭当前窗口System.out.println("System.currentTimeMillis() - state.f0:" + (System.currentTimeMillis() - state.f0));System.out.println("state.f1:" + state.f1);if (System.currentTimeMillis() - state.f0 >= 9000L && !state.f1){watermarkOutput.emitWatermark(new Watermark(maxWatermark + 6000L));state.f1 = true;System.out.println("触发窗口,maxWatermark + 6000L:" + (maxWatermark + 6000L));} else {System.out.println("正常发送水印");watermarkOutput.emitWatermark(new Watermark(maxWatermark - outOfTime));}}}};}}
代码部分逻辑说明
若设置了自动生成watermark 参数,根据打印日志,设置对应的时间(多久没新数据写入,触发窗口计算)
env.getConfig().setAutoWatermarkInterval(5000);
使用自定义的watermark:
参考:https://blog.csdn.net/lr131425/article/details/127422833