大数据之Flink(四)

11、水位线

11.1、水位线概念

一般实时流处理场景中,事件时间基本与处理时间保持同步,可能会略微延迟。

flink中用来衡量事件时间进展的标记就是水位线(WaterMark)。水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容是一个时间戳,用来指示当前的事件时间。一般使用某个数据的时间戳作为水位线的时间戳。

水位线特性:

  • 水位线是插入到数据流中的一个标记
  • 水位线主要内容是一个时间戳用来表示当前事件时间的进展
  • 水位线是基于数据的时间戳生成的
  • 水位线时间戳单调递增
  • 水位线可通过设置延迟正确处理乱序数据
  • 一个水位线WaterMark(t)表示在当前流中事件时间已经达到了时间戳t,代表t之前的所有数据都到齐了,之后流中不会出现时间戳小于或等于t的数据

以WaterMark等2s为例:
在这里插入图片描述
**注意:**flink窗口并不是静态准备好的,而是动态创建的,当有罗在这个窗口区间范围的数据达到时才创建对应的窗口。当到达窗口结束时间后窗口就触发计算并关闭,触发计算和窗口关闭两个行为也是分开的。

11.2、生成水位线
11.2.1、原则

要性能就设置低水位线或不设置水位线,直接使用处理时间语义可得到最低的延迟,但有可能遗漏数据。

如要保证数据全部到齐可以设置高水位线,但会影响性能,计算会有延迟。

11.2.2、内置水位线

1、有序流中内置水位线设置

直接调用

package waterMark;import bean.WaterSensor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;public class WatermarkMonoDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();;env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777).map(value ->{String[] datas = value.split(",");return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );} ).assignTimestampsAndWatermarks(//指定WaterMark策略WatermarkStrategy//升序的WaterMark,没有等待时间.<WaterSensor>forMonotonousTimestamps()//指定时间戳分配器.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>(){@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {//返回的时间戳为毫秒System.out.println("数据="+element+"recordTS="+recordTimestamp);return element.getTs()*1000L;}}));SingleOutputStreamOperator<String> process = dataStreamSource.keyBy(value -> value.getId())//要使用事件语义的窗口TumblingEventTimeWindows,WaterMark才能起作用,不能用TumblingProcessingTimeWindows.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long start = context.window().getStart();long end = context.window().getEnd();String winStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");String winEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");long l = elements.spliterator().estimateSize();out.collect("key=" + key + "的窗口[" + winStart + "," + winEnd + "]包含" + l + "条数据" + elements.toString());}});process.print();env.execute();}
}

2、乱序流中内置水位线设置

设置等待时间为2秒,即12秒时触发窗口关闭

package waterMark;import bean.WaterSensor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;/*** @Title:* @Author lizhe* @Package sink* @Date 2024/6/5 21:57* @description:*/
public class WatermarkOutOfOrdernessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();;env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777).map(value ->{String[] datas = value.split(",");return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );} ).assignTimestampsAndWatermarks(//指定WaterMark策略WatermarkStrategy//乱序的WaterMark,有等待时间.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))//指定时间戳分配器.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>(){@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {//返回的时间戳为毫秒System.out.println("数据="+element+"recordTS="+recordTimestamp);return element.getTs()*1000L;}}));SingleOutputStreamOperator<String> process = dataStreamSource.keyBy(value -> value.getId())//要使用事件语义的窗口TumblingEventTimeWindows,WaterMark才能起作用,不能用TumblingProcessingTimeWindows.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long start = context.window().getStart();long end = context.window().getEnd();String winStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");String winEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");long l = elements.spliterator().estimateSize();out.collect("key=" + key + "的窗口[" + winStart + "," + winEnd + "]包含" + l + "条数据" + elements.toString());}});process.print();env.execute();}
}

结果:
在这里插入图片描述
可见当发送数据=WaterSensor{id=‘s1’, ts=12, vc=12}recordTS=-9223372036854775808时使得[0,10)窗口关闭,但是WaterSensor{id=‘s1’, ts=12, vc=12}不会在[0,10)窗口中,而是在[10,20)窗口中。

11.2.3、内置WaterMark生成原理
  • 都是周期性生成的,默认是200ms
  • 有序流:WaterMark=当前最大的事件时间-1ms
  • 乱序流:WaterMark=当前最大的事件时间-延迟时间-1ms
11.3、水位线的传递
11.3.1、多并行度下水位线传递

水位线传递以最小的WaterMark为准,否则提前触发关窗造成数据丢失。
在这里插入图片描述
演示WaterMark多并行度下的传递

package waterMark;import bean.WaterSensor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;/*** @Title:* @Author lizhe* @Package sink* @Date 2024/6/5 21:57* @description:*/
public class WatermarkOutOfOrdernessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();//演示WaterMark多并行度下的传递env.setParallelism(2);SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777).map(value ->{String[] datas = value.split(",");return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );} ).assignTimestampsAndWatermarks(//指定WaterMark策略WatermarkStrategy//乱序的WaterMark,有等待时间.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))//指定时间戳分配器.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>(){@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {//返回的时间戳为毫秒System.out.println("数据="+element+"recordTS="+recordTimestamp);return element.getTs()*1000L;}}));SingleOutputStreamOperator<String> process = dataStreamSource.keyBy(value -> value.getId())//要使用事件语义的窗口TumblingEventTimeWindows,WaterMark才能起作用,不能用TumblingProcessingTimeWindows.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long start = context.window().getStart();long end = context.window().getEnd();String winStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");String winEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");long l = elements.spliterator().estimateSize();out.collect("key=" + key + "的窗口[" + winStart + "," + winEnd + "]包含" + l + "条数据" + elements.toString());}});process.print();env.execute();}
}

结果:
在这里插入图片描述
在多并行度下,增加了一个WaterMark的更新操作。当数据WaterSensor{id=‘s1’, ts=12, vc=12}到来时,一个WaterMark,5-2=3,一个WaterMark是12-2=10,因WaterMark取小原则WaterMark是3未更新为10。当数据WaterSensor{id=‘s1’, ts=13, vc=13}到来,WaterMark更新为10,进而触发窗口关闭。

结论:在多并行度下,当触发WaterMark的下一条数据到来时才能进行关窗操作。

11.3.2、水位线空闲等待设置

在多个上游并行任务中,如果有其中一个没有数据,由于当前Task是以最小的那个座位当前任务的事件时钟,就会导致当前Task的水位线无法推进,从而导致窗口无法触发。这时候可以设置空闲等待。

package waterMark;import bean.WaterSensor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;/*** @Title:* @Author lizhe* @Package * @Date 2024/6/5 21:57* @description:*/
public class WatermarkIdlenessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);SingleOutputStreamOperator<Integer> streamOperator = env.socketTextStream("192.168.132.101", 7777)//自定义分区器,数据%分区数,只输入奇数,都只会去往一个子任务.partitionCustom(new Partitioner<String>() {@Overridepublic int partition(String key, int numPartitions) {return Integer.parseInt(key) % numPartitions;}}, value -> value).map(value -> Integer.parseInt(value)).assignTimestampsAndWatermarks(//指定WaterMark策略WatermarkStrategy.<Integer>forMonotonousTimestamps().withTimestampAssigner((r, ts) -> r * 1000)//空闲等待5s.withIdleness(Duration.ofSeconds(5)));//分成两组:奇数一组,偶数一组,开10s的事件时间滚动窗口streamOperator.keyBy(value -> value%2).window(TumblingEventTimeWindows.of(Time.seconds(2))).process(new ProcessWindowFunction<Integer, String, Integer, TimeWindow>() {@Overridepublic void process(Integer integer, Context context, Iterable<Integer> elements, Collector<String> out) throws Exception {long start = context.window().getStart();long end = context.window().getEnd();String winStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");String winEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");long l = elements.spliterator().estimateSize();out.collect("key=" + integer + "的窗口[" + winStart + "," + winEnd + "]包含" + l + "条数据" + elements.toString());}}).print();env.execute();}
}
11.4、迟到数据处理
11.4.1、推迟WaterMark推进

在WaterMark产生时设置一个乱序容忍度,推迟系统时间的推进,保证窗口计算被延迟执行,为乱序的数据争取更多时间进入窗口。

forBoundedOutOfOrderness(Duration.ofSeconds(2))
11.4.2、设置窗口延迟关闭

flink的窗口允许迟到数据。当触发窗口计算后会先计算当前结果,但此时并不会关闭窗口。以后每来一条数据就触发一次窗口计算(增量计算)。直到WaterMark超过了窗口结束时间+推迟时间,窗口才会关闭。

.allowedLateness(Time.seconds(2))
package waterMark;import bean.WaterSensor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;/*** @Title:* @Author lizhe* @Package * @Date 2024/6/5 21:57* @description:*/
public class WatermarkAllowLatenessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777).map(value ->{String[] datas = value.split(",");return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );} ).assignTimestampsAndWatermarks(//指定WaterMark策略WatermarkStrategy//乱序的WaterMark,有等待时间.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))//指定时间戳分配器.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>(){@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {//返回的时间戳为毫秒System.out.println("数据="+element+"recordTS="+recordTimestamp);return element.getTs()*1000L;}}));SingleOutputStreamOperator<String> process = dataStreamSource.keyBy(value -> value.getId())//要使用事件语义的窗口TumblingEventTimeWindows,WaterMark才能起作用,不能用TumblingProcessingTimeWindows.window(TumblingEventTimeWindows.of(Time.seconds(10)))//推迟2秒关窗.allowedLateness(Time.seconds(2)).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long start = context.window().getStart();long end = context.window().getEnd();String winStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");String winEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");long l = elements.spliterator().estimateSize();out.collect("key=" + key + "的窗口[" + winStart + "," + winEnd + "]包含" + l + "条数据" + elements.toString());}});process.print();env.execute();}
}
11.4.3、使用侧流接收迟到数据

使用.sideOutputLateData()函数将迟到数据放到侧输出流

package waterMark;import bean.WaterSensor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.lang.reflect.Type;
import java.time.Duration;/*** @Title:* @Author lizhe* @Package sink* @Date 2024/6/5 21:57* @description:*/
public class WatermarkAllowLatenessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777).map(value ->{String[] datas = value.split(",");return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );} ).assignTimestampsAndWatermarks(//指定WaterMark策略WatermarkStrategy//乱序的WaterMark,有等待时间.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))//指定时间戳分配器.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>(){@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {//返回的时间戳为毫秒System.out.println("数据="+element+"recordTS="+recordTimestamp);return element.getTs()*1000L;}}));OutputTag<WaterSensor> outputTag = new OutputTag<>("late-data", Types.POJO(WaterSensor.class));SingleOutputStreamOperator<String> process = dataStreamSource.keyBy(value -> value.getId())//要使用事件语义的窗口TumblingEventTimeWindows,WaterMark才能起作用,不能用TumblingProcessingTimeWindows.window(TumblingEventTimeWindows.of(Time.seconds(10)))//推迟2秒关窗.allowedLateness(Time.seconds(2))//关窗后的迟到数据放到侧输出流.sideOutputLateData(outputTag).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long start = context.window().getStart();long end = context.window().getEnd();String winStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");String winEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");long l = elements.spliterator().estimateSize();out.collect("key=" + key + "的窗口[" + winStart + "," + winEnd + "]包含" + l + "条数据" + elements.toString());}});process.print();process.getSideOutput(outputTag).print("侧输出流");env.execute();}
}
11.4.4、总结
  1. 乱序与迟到的区别:

    **乱序:**数据的顺序乱了,出现时间早的比时间晚的晚来

    **迟到:**数据的时间戳<当前的WaterMark

  2. 乱序与迟到数据的处理

    • 在WaterMark中指定乱序等待时间
    • 如果开窗设置窗口允许迟到
    • 关窗后的迟到数据放入侧输出流
  3. WaterMark等待时间与窗口允许迟到时间并不能等同和替换

    WaterMark涉及到窗口第一次计算时间,WaterMark等待时间过长会导致计算延迟变大。

    窗口允许迟到时间只是要保证计算结果更加准确,但不应影响数据计算延迟。

    所以二者不能等价代替

  4. WaterMark等待时间与窗口允许迟到时间设置经验

    WaterMark等待时间不能设置过大,一般秒级。窗口允许迟到时间只考虑大部分的迟到数据。极端情况小部分迟到数据使用侧输出流。

    12、基于时间的合流

上面提到的connect合流可满足大部分需求。但统计固定时间内两条流数据的匹配情况,对于connect要使用自定义,但可以使用更简单的Window来表示,flink 内置了API。

12.1、窗口联结Window Join
  1. 落在同一个时间窗口范围内才能匹配
  2. 根据keyby的key来进行匹配关联
  3. 只能拿到匹配上的数据,类似有固定时间范围的inner join
package waterMark;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;/*** @Title: WindowJoinDemo* @Author lizhe* @Package Window Join* @Date 2024/6/8 21:11* @description:*/
public class WindowJoinDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env.fromElements(Tuple2.of("a", 1),Tuple2.of("a", 2),Tuple2.of("b", 3),Tuple2.of("c", 4)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Integer>>() {@Overridepublic long extractTimestamp(Tuple2<String, Integer> element, long recordTimestamp) {return element.f1 * 1000L;}}));SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env.fromElements(Tuple3.of("a", 1,1),Tuple3.of("a", 11,1),Tuple3.of("b", 2,1),Tuple3.of("b", 12,1),Tuple3.of("c", 14,1),Tuple3.of("d", 15,1)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, Integer>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, Integer>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, Integer> element, long recordTimestamp) {return element.f1 * 1000L;}}));DataStream<String> join = ds1.join(ds2).where(r1 -> r1.f0).equalTo(r2 -> r2.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new JoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {/*** 关联上的数据调用join方法* @param first ds1的数据* @param second ds2的数据* @return* @throws Exception*/@Overridepublic String join(Tuple2<String, Integer> first, Tuple3<String, Integer, Integer> second) throws Exception {return first + "----" + second;}});join.print();env.execute();}
}
12.2、间隔联结Interval Join

有时要处理的时间间隔并不固定。要匹配的数据可能刚开卡在窗口边缘两侧造成匹配失败。所有窗口联结并不能满足要求。

间隔联结的思路是针对一条流的每个数据开辟出其时间戳前后的一段时间间隔指定,上下界的偏移,负号代表时间往前,正号代表时间往后,看这期间是否有来自另一条流的匹配。(只支持事件时间语义)
在这里插入图片描述

package waterMark;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;/*** @Title:* @Author lizhe* @Package* @Date 2024/6/8 21:11* @description:*/
public class IntervalJoinDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env.fromElements(Tuple2.of("a", 1),Tuple2.of("a", 2),Tuple2.of("b", 3),Tuple2.of("c", 4)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Integer>>() {@Overridepublic long extractTimestamp(Tuple2<String, Integer> element, long recordTimestamp) {return element.f1 * 1000L;}}));SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env.fromElements(Tuple3.of("a", 1,1),Tuple3.of("a", 11,1),Tuple3.of("b", 2,1),Tuple3.of("b", 12,1),Tuple3.of("c", 14,1),Tuple3.of("d", 15,1)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, Integer>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, Integer>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, Integer> element, long recordTimestamp) {return element.f1 * 1000L;}}));KeyedStream<Tuple2<String, Integer>, String> stream1 = ds1.keyBy(value -> value.f0);KeyedStream<Tuple3<String, Integer, Integer>, String> stream2 = ds2.keyBy(value -> value.f0);stream1.intervalJoin(stream2).between(Time.seconds(-2),Time.seconds(2)).process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {/*** 两条流的数据匹配上才会调用方法* @param left stream1的数据* @param right stream2的数据* @param ctx 上下文* @param out 采集器* @throws Exception*/@Overridepublic void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, Context ctx, Collector<String> out) throws Exception {//进入这个方法是关联上的数据out.collect(left+"----"+right);}}).print();env.execute();}
}

1.17版本支持将该匹配上的迟到数据通过侧输出流输出

如果当前数据的事件时间<当前的WaterMark就是迟到数据,主流的process不处理。

但在between后使用SideOutputLeftLateData(),SideOutputRightLateData()函数将迟到数据放到侧输出流

13、处理函数

DataStream更下层的API,统一称为process算子,接口就是process function(处理函数)

在这里插入图片描述

13.1、基本处理函数

处理函数提供一个定时服务(TimeService),可以通过它访问流中的事件、时间戳、水位线,甚至可以注册定时事件。处理函数集成了AbstractRichFunction,拥有富函数类的所有特性,可以访问状态和其他运行时信息。处理函数可以直接将数据输出的侧输出流。处理函数是最为灵活的处理方法,可实现各种自定义逻辑

分类:

  1. ProcessFunction
  2. KeyedProcessFunction
  3. ProcessWindowFunction
  4. ProcessAllWindowFunction
  5. CoProcessFunction
  6. ProcessJoinFunction
  7. BroadcastProcessFunction
  8. KeyedBroadcastProcessFunction
13.2、按键分区处理函数KeyedProcessFunction

只有在KeyedStream才支持使用TimeService设置定时器。

13.2.1、定时器和定时服务
keyedStream.process(new KeyedProcessFunction<String, WaterSensor, String>() {/*** 来一条数据处理一次* @param value* @param ctx* @param out* @throws Exception*/@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {//数据中提取出来的事件时间,如果没有则为nullLong timestamp = ctx.timestamp();//定时器TimerService timerService = ctx.timerService();//注册定时器:处理时间timerService.registerEventTimeTimer(10L);//注册定时器:事件时间timerService.currentProcessingTime();//删除定时器:事件时间timerService.deleteEventTimeTimer(10L);//删除定时器:处理时间timerService.deleteProcessingTimeTimer(10L);//获取当前处理时间,即系统时间timerService.currentProcessingTime();//获取当前WaterMarktimerService.currentWatermark();}});

事件时间定时器:

package process;import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;import java.time.Duration;/*** @Title: KeyedProcessTimerDemo* @Author lizhe* @Package process* @Date 2024/6/9 12:29* @description:*/
public class KeyedProcessTimerDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> singleOutputStreamOperator = env.socketTextStream("192.168.132.101", 7777).map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));});WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy = WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordTime) -> {return element.getTs() * 1000L;});SingleOutputStreamOperator<WaterSensor> operator = singleOutputStreamOperator.assignTimestampsAndWatermarks(waterSensorWatermarkStrategy);KeyedStream<WaterSensor, String> keyedStream = operator.keyBy(value -> value.getId());keyedStream.process(new KeyedProcessFunction<String, WaterSensor, String>() {/*** 来一条数据处理一次* @param value* @param ctx* @param out* @throws Exception*/@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {//数据中提取出来的事件时间,如果没有则为nullLong timestamp = ctx.timestamp();//定时器TimerService timerService = ctx.timerService();//注册定时器:处理时间timerService.registerEventTimeTimer(5000L);System.out.println("当前时间"+timestamp+",注册了一个5s的定时器");}/*** 时间进展到定时器注册的时间,调用该方法* @param timestamp 当前时间进展* @param ctx 上下文* @param out 采集器* @throws Exception*/@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {super.onTimer(timestamp, ctx, out);System.out.println("现在时间"+timestamp + "定时器触发,key为"+ctx.getCurrentKey());}}).print();env.execute();}
}

输出:
在这里插入图片描述
TimeService会以key和时间戳作为标准,对定时器去重;即对每个key和时间戳最多只有一个定时器,如果注册了多次,onTimer()方法也将被调用一次。

处理时间定时器:

package process;import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;import java.time.Duration;/*** @Title: KeyedProcessTimerDemo* @Author lizhe* @Package process* @Date 2024/6/9 12:29* @description:*/
public class KeyedProcessTimerDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> singleOutputStreamOperator = env.socketTextStream("192.168.132.101", 7777).map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));});WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy = WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordTime) -> {return element.getTs() * 1000L;});SingleOutputStreamOperator<WaterSensor> operator = singleOutputStreamOperator.assignTimestampsAndWatermarks(waterSensorWatermarkStrategy);KeyedStream<WaterSensor, String> keyedStream = operator.keyBy(value -> value.getId());keyedStream.process(new KeyedProcessFunction<String, WaterSensor, String>() {/*** 来一条数据处理一次* @param value* @param ctx* @param out* @throws Exception*/@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {//数据中提取出来的事件时间,如果没有则为nullLong timestamp = ctx.timestamp();//定时器TimerService timerService = ctx.timerService();long currentProcessingTime = timerService.currentProcessingTime();timerService.registerProcessingTimeTimer(currentProcessingTime+5000L);System.out.println("当前时间"+currentProcessingTime+",注册了一个5后的定时器,key为"+ctx.getCurrentKey() );}/*** 时间进展到定时器注册的时间,调用该方法* @param timestamp 当前时间进展* @param ctx 上下文* @param out 采集器* @throws Exception*/@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {super.onTimer(timestamp, ctx, out);System.out.println("现在时间"+timestamp + "定时器触发,key为"+ctx.getCurrentKey());}}).print();env.execute();}
}

总结:

  1. 事件时间定时器通过WaterMark来触发的,WaterMark>=注册时间。

    注意:

    WaterMark=当前最大事件时间-等待时间-1ms,因为-1ms会推迟一条数据。比如5s的定时器,如果等待=3s,WaterMark=8s-3s-1ms=4999ms,不会触发5s的定时器。需要WaterMark=9s-3s-1ms=5999ms才能触发5s的定时器

  2. 在Process中获取当前的WaterMark显示的是上一次的的WaterMark(因为Process还没接收到这条数据对应生成的新WaterMark)

13.3、应用案例

统计一段时间内出现次数最多的水位。统计10s内出现次数最多的两个水位,这两个水位每5s更新一次。

可使用滑动窗口实现按照不同水位进行统计

后面仔细看吧,可能有问题!!!

package process;import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.*;/*** @Title:* @Author lizhe* @Package process* @Date 2024/6/9 12:29* @description:*/
public class TopNDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> operator  = env.socketTextStream("192.168.132.101", 7777).map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordTime) -> {return element.getTs() * 1000L;}));//1、按照vc分组,开窗聚合(增量计算+全量打标签)//开窗聚合后就是普通的流,丢失了窗口信息需要自己打窗口标签(WindowEnd)SingleOutputStreamOperator<Tuple3<Integer, Integer, Long>> aggregate = operator.keyBy(value -> value.getVc()).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).aggregate(new VcCountAgg(), new WindowResult());//2、按照窗口标签keyby,保证同一个窗口时间范围的结果到一起去。排序去TopNaggregate.keyBy(value -> value.f2).process(new TopN(2)).print();env.execute();}public  static class VcCountAgg implements AggregateFunction<WaterSensor,Integer,Integer>{@Overridepublic Integer createAccumulator() {return 0;}@Overridepublic Integer add(WaterSensor value, Integer accumulator) {return ++accumulator;}@Overridepublic Integer getResult(Integer accumulator) {return accumulator;}@Overridepublic Integer merge(Integer a, Integer b) {return null;}}/*** 泛型如下:* 第一个:输入类型=增量函数的输出 count值* 第二个:输出类型=Tuple(vc,count,WindowEnd)带上窗口结束时间的标签* 第三个:key类型,vc,Integer* 第四个:窗口类型*/public  static class WindowResult extends ProcessWindowFunction<Integer, Tuple3<Integer,Integer,Long>,Integer, TimeWindow>{@Overridepublic void process(Integer key, Context context, Iterable<Integer> elements, Collector<Tuple3<Integer, Integer, Long>> out) throws Exception {Integer count = elements.iterator().next();long windowsEnd = context.window().getEnd();out.collect(Tuple3.of(key,count,windowsEnd));}}public  static class TopN extends KeyedProcessFunction<Long,Tuple3<Integer, Integer, Long>,String>{//存不同窗口的统计结果 key=windowEnd value=list数据private Map<Long, List< Tuple3<Integer,Integer,Long>>> dataListMap;//要取的Top的数量private int threshold;public TopN(int threshold) {dataListMap = new HashMap<>();this.threshold = threshold;}@Overridepublic void processElement(Tuple3<Integer, Integer, Long> value, Context ctx, Collector<String> out) throws Exception {//进入这个方法只是一条数据,要排序,要存起来,不同的窗口要分开存//1、存到HashMap中Long windowEnd = value.f2;if (dataListMap.containsKey(windowEnd)){//1.1 包含vc 不是该vc的第一条,直接加到list中List<Tuple3<Integer, Integer, Long>> tuple3List = dataListMap.get(windowEnd);tuple3List.add(value);}else {//1.1 不包含vc是该vc的第一条,需要初始化listList<Tuple3<Integer, Integer, Long>> dataList = new ArrayList<>();dataList.add(value);dataListMap.put(windowEnd,dataList);}//2、注册一个定时器,WindowsEnd+1ms即可(同一个窗口范围应该同时输出的,只不过是一条条调用ProcessElement方法,只需延迟1ms)ctx.timerService().registerProcessingTimeTimer(windowEnd+1);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {super.onTimer(timestamp, ctx, out);//定时器触发,同一个窗口范围的计算结果攒齐了,开始、排序、取TopNLong windowEnd = ctx.getCurrentKey();//1、排序List<Tuple3<Integer, Integer, Long>> dataList = dataListMap.get(windowEnd);dataList.sort(new Comparator<Tuple3<Integer, Integer, Long>>() {@Overridepublic int compare(Tuple3<Integer, Integer, Long> o1, Tuple3<Integer, Integer, Long> o2) {return o2.f1-o1.f1;}});//2、取TopNStringBuilder outStr = new StringBuilder();outStr.append("==========\n");//遍历 排序后的list,取出前threshold个,dataList要是不够dataList个取dataList.size()for (int i = 0; i < Math.min(threshold,dataList.size()); i++) {Tuple3<Integer, Integer, Long> vcCount = dataList.get(i);outStr.append("Top"+(i+1)+"\n");outStr.append("vc="+vcCount.f0+"\n");outStr.append("count="+vcCount.f1 + "\n");outStr.append("窗口结束时间"+ vcCount.f2 + "\n");}//用完的list及时清理dataList.clear();out.collect(outStr.toString());}}
}
13.4、侧输出流

使用侧输出流实现水位告警

package process;import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.time.Duration;/*** @Title:* @Author lizhe* @Package process* @Date 2024/6/9 12:29* @description:*/
public class SideOutputDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy = WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordTime) -> {return element.getTs() * 1000L;});SingleOutputStreamOperator<WaterSensor> singleOutputStreamOperator = env.socketTextStream("192.168.132.101", 7777).map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));}).assignTimestampsAndWatermarks(waterSensorWatermarkStrategy);final OutputTag<String> warnTag = new OutputTag<>("warn", Types.STRING);SingleOutputStreamOperator<WaterSensor> process = singleOutputStreamOperator.keyBy(value -> value.getId()).process(new KeyedProcessFunction<String, WaterSensor, WaterSensor>() {@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {//使用侧输出流告警if (value.getVc() > 10) {ctx.output(warnTag, "当前水位=" + value.getVc() + ",大于阈值10!");}out.collect(value);}});process.print();process.getSideOutput(warnTag).printToErr("warn");env.execute();}
}

14、状态管理

14.1、flink的状态

分为有状态和无状态两种。

无状态的算子任务只要观察每个独立事件,根据当前输入的数据直接转换输出结果。如:map、filter、flatMap。
在这里插入图片描述
有状态算子任务除当前数据外还要其他数据来得到计算结果。“其他数据”就是状态。如:聚合算子、窗口算子。
在这里插入图片描述
状态的分类:

  1. 托管状态和原始状态

    托管状态:由flink统一管理使用时只需要调用相应接口。

    原始状态:自定义的相当于开辟了一块内存自己管理,自己实现状态的序列化和故障恢复。

    通常采用flink托管状态(重点)

  2. 算子状态和按键分区状态

    通过keyby()函数的称为按键分区状态,其他为算子状态

14.2、算子状态

对于一个并行子任务,处理的所有数据都会访问到相同的状态,状态对于同一任务而言是共享的。

算子状态可以用在所有算子上,类似本地变量。

14.3、按键分区状态

状态根据输入流中定义的键来维护和访问,也就keyby后能用。

14.3.1、值状态

状态只保存一个值。

水位相差10则报警

package state;import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;import java.io.Serializable;
import java.time.Duration;/*** @Title: KeyedValueStateDemo* @Author lizhe* @Package state* @Date 2024/6/11 20:54* @description:水位相差10则报警*/
public class KeyedValueStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("192.168.132.101", 7777).map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordTime) -> {return element.getTs() * 1000L;}));sensorDS.keyBy(value -> value.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {ValueState<Integer> lastVcState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);ValueStateDescriptor lastVc = new ValueStateDescriptor<Integer>("lastVc", Integer.class);lastVcState=getRuntimeContext().getState(lastVc);}@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {int lastVc = lastVcState.value() == null ? 0 : lastVcState.value();if (Math.abs(value.getVc()-lastVc)>10) {out.collect("传感器id="+value.getId()+",当前水位值="+value.getVc()+",上一条水水位值="+lastVc+"相差超过10");}lastVcState.update(value.getVc());}}).print();env.execute();}
}
14.3.2、列表状态

将要保存的数据以列表形式进行保存

针对每种传感器输出最高的三个水位值

package state;import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.ArrayList;/*** @Title:* @Author lizhe* @Package state* @Date 2024/6/11 20:54* @description:针对每种传感器输出最高的三个水位值*/
public class KeyedListStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("192.168.132.101", 7777).map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordTime) -> {return element.getTs() * 1000L;}));sensorDS.keyBy(value -> value.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {ListState<Integer> vcListState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcListState = getRuntimeContext().getListState(new ListStateDescriptor<Integer>("vcListState", Integer.class));}@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {vcListState.add(value.getVc());ArrayList<Integer> arrayList = new ArrayList<>();for (Integer vc : vcListState.get()) {arrayList.add(vc);}arrayList.sort((o1,o2)->{return o2-o1;});if (arrayList.size() > 3) {arrayList.remove(3);}out.collect("传感器id="+value.getId()+",最大三个水位值="+arrayList.toString());vcListState.update(arrayList);}}).print();env.execute();}
}
14.3.3、map状态

把键值对最为状态保存起来

统计每种传感器每种水位值出现的次数

package state;import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.ArrayList;
import java.util.Map;/*** @Title:* @Author lizhe* @Package state* @Date 2024/6/11 20:54* @description:统计每种传感器每种水位值出现的次数*/
public class KeyedMapStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("192.168.132.101", 7777).map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordTime) -> {return element.getTs() * 1000L;}));sensorDS.keyBy(value -> value.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {MapState<Integer,Integer> vcCountMapState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcCountMapState = getRuntimeContext().getMapState(new MapStateDescriptor<Integer, Integer>("vcCountMapState", Integer.class, Integer.class));}@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {Integer vc = value.getVc();if (vcCountMapState.contains(vc)){int count = vcCountMapState.get(vc) ;vcCountMapState.put(vc,++count);}else {vcCountMapState.put(vc, 1);}StringBuilder outStr = new StringBuilder();outStr.append("传感器id为"+value.getId()+"\n");for (Map.Entry<Integer, Integer> vcCount : vcCountMapState.entries()) {outStr.append(vcCount.toString()+"\n");}outStr.append("==============");out.collect(outStr.toString());}}).print();env.execute();}
}
14.3.4、规约状态

对添加的数据进行规约,将规约聚合后的值作为状态保存

计算每种传感器的水位和

package state;import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;import java.time.Duration;/*** @Title:* @Author lizhe* @Package state* @Date 2024/6/11 20:54* @description:计算每种传感器的水位和*/
public class KeyedReduceStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("192.168.132.101", 7777).map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordTime) -> {return element.getTs() * 1000L;}));sensorDS.keyBy(value -> value.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {ReducingState<Integer> vcSum;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcSum = getRuntimeContext().getReducingState(new ReducingStateDescriptor<Integer>("vcSum", new ReduceFunction<Integer>() {@Overridepublic Integer reduce(Integer value1, Integer value2) throws Exception {return value1 + value2;}}, Integer.class));}@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {vcSum.add(value.getVc());out.collect("传感器id="+value.getId()+"水位值和="+vcSum.get());}}).print();env.execute();}
}
14.3.5、聚合状态

类似规约状态,相比于规约状态,聚合里有个累加器来表示状态,聚合的状态类型可与输入数据类型不同

计算水位平均值

package state;import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;import java.time.Duration;/*** @Title:* @Author lizhe* @Package state* @Date 2024/6/11 20:54* @description:计算水位平均值*/
public class KeyedAggregateStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("192.168.132.101", 7777).map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordTime) -> {return element.getTs() * 1000L;}));sensorDS.keyBy(value -> value.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {AggregatingState<Integer,Double> vcAggregatingState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcAggregatingState = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor<Integer, Tuple2<Integer, Integer>, Double>("vcAggregatingState", new AggregateFunction<Integer, Tuple2<Integer, Integer>, Double>() {@Overridepublic Tuple2<Integer, Integer> createAccumulator() {return Tuple2.of(0,0);}@Overridepublic Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) {return Tuple2.of(accumulator.f0+value,accumulator.f1 + 1);}@Overridepublic Double getResult(Tuple2<Integer, Integer> accumulator) {return accumulator.f0*1D/accumulator.f1;}@Overridepublic Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {return null;}}, Types.TUPLE(Types.INT, Types.INT)));}@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {vcAggregatingState.add(value.getVc());Double vcAvg = vcAggregatingState.get();out.collect("传感器id="+value.getId()+"平均水位="+vcAvg);}}).print();env.execute();}
}
14.3.6、状态生存时间TTL

状态创建时候,失效时间=当前时间+TTL。可对时效时间进行更新,创建配置对象,调用状态描述器启动TTL

package state;import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;import java.time.Duration;/*** @Title: KeyedValueStateDemo* @Author lizhe* @Package state* @Date 2024/6/11 20:54* @description:*/
public class KeyedStateTTLDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("192.168.132.101", 7777).map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordTime) -> {return element.getTs() * 1000L;}));sensorDS.keyBy(value -> value.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {ValueState<Integer> lastVcState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);StateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.seconds(5))//过期时间5s.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//状态的创建和写入会刷新过期时间.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)//不返回过期的状态值.build();ValueStateDescriptor<Integer> lastVc = new ValueStateDescriptor<>("lastVc", Integer.class);lastVc.enableTimeToLive(stateTtlConfig);lastVcState=getRuntimeContext().getState(lastVc);}@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {Integer value1 = lastVcState.value();out.collect("key="+value.getId()+"状态值"+ value1);lastVcState.update(value.getVc());}}).print();env.execute();}
}
14.4、算子状态

状态分为:列表状态ListState、联合列表状态ListUnionState、广播状态BroadcastState

算子并行实例上定义的状态,作用范围被限定为当前算子任务。

package state;import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.time.Duration;
import java.util.List;/*** @Title: OperatorListDemo* @Author lizhe* @Package state* @Date 2024/6/13 21:50* @description:在map算子中计算数据个数*/
public class OperatorListDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("192.168.132.101", 7777).map(new MyCountMapFunction()).print();env.execute();}public  static  class MyCountMapFunction implements MapFunction<String,Long>, CheckpointedFunction{private  long count =0L;private ListState<Long> state;@Overridepublic Long map(String value) throws Exception {return ++count;}/*** 本地变量持久化:将本地变量拷贝到算子状态* @param context* @throws Exception*/@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {System.out.println("snapshotState");state.clear();state.add(count);}/*** 初始化本地变量:程序恢复时,从状态中将数据添加到本地变量,每个子任务调用一次* @param context* @throws Exception*/@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {System.out.println("initializeState");state = context.getOperatorStateStore().getListState(new ListStateDescriptor<Long>("state", Long.class));if (context.isRestored()){for (Long aLong : state.get()) {count+=aLong;}}}}
}

算子状态List与UnionList区别:

  • list状态:轮询均分给新的子任务
  • UnionList状态:将原先多个子任务状态的合并成一份完整的。给新的并行子任务每人一份完整的

广播状态:算子并行子任务都保持同一份全局状态。

水位超过指定的阈值发送告警,阈值可以动态修改

package state;import bean.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.*;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;/*** @Title: OperatorListDemo* @Author lizhe* @Package state* @Date 2024/6/13 21:50* @description:水位超过指定的阈值发送告警,阈值可以动态修改*/
public class OperatorBroadcastStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS =  env.socketTextStream("192.168.132.101", 7777).map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));});//配置流用来广播配置DataStreamSource<String> configDS = env.socketTextStream("192.168.132.101", 8888);final MapStateDescriptor<String, Integer> descriptor = new MapStateDescriptor<>("configDS", String.class, Integer.class);BroadcastStream<String> broadcastStream = configDS.broadcast(descriptor);BroadcastConnectedStream<WaterSensor, String> connect = sensorDS.connect(broadcastStream);connect.process(new BroadcastProcessFunction<WaterSensor, String, String>() {/*** 数据流的处理方法* @param value* @param ctx* @param out* @throws Exception*/@Overridepublic void processElement(WaterSensor value, ReadOnlyContext ctx, Collector<String> out) throws Exception {ReadOnlyBroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(descriptor);Integer integer = broadcastState.get("threshold");//如果数据流先来,广播流为空,要判空integer=integer==null ?0:integer;if (value.getVc()>integer){out.collect("超过阈值,阈值="+integer);}}/*** 广播后的配置流处理方法* @param value* @param ctx* @param out* @throws Exception*/@Overridepublic void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {BroadcastState<String, Integer> state = ctx.getBroadcastState(descriptor);state.put("threshold",Integer.valueOf(value));}}).print();env.execute();}}
14.5、状态后端

状态的存储、访问以及维护都是由一个可插拔的组件决定的,这个组件为状态后端,主要负责管理本地状态的存储方式和位置。

14.5.1、状态后端分类

状态后端开箱即用,可不改变程序逻辑独立配置。有两种,一种为哈希表状态后端(默认),一种为内嵌RocksDB状态后端。

  1. 哈希表状态后端:状态存在内存,直接把状态当对象,存在TaskManager的JVM堆上,以键值对方式存储。
  2. RocksDB状态后端:RocksDB是kv型数据库,将数据存到硬盘。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/420020.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

机械学习—零基础学习日志(Python做数据分析02)

现在开始使用Python尝试做数据分析。具体参考的网址链接放在了文章末尾。 引言 我通过学习《利用Python进行数据分析》这本书来尝试使用Python做数据分析。书里让下载&#xff0c;anaconda&#xff0c;使用Jupyter来写代码&#xff0c;只是下载一个anaconda的确有点费时间&am…

计算机的发展史和基本结构

好久不见&#xff0c;粉粉们&#xff0c;我是#Y清墨。今天来分享一下最近学习做的笔记。 计算机发展史和四代计算机概述 阶段 年代 电子元件 运算速度&#xff08;每秒/次&#xff09; 第一代 1946-1958 真空电子管 数千至数万 第二代 1958-1964 晶体管 几十万至百万…

王道考研操作系统笔记(一)

虚拟内存的定义和特征&#xff1a; 基于局部性的原理&#xff0c; 在程序装入时&#xff0c;可以将程序中很快用到的部分装入内存&#xff0c;暂时用不到的数据装入外存&#xff0c;就可以让程序开始执行&#xff0c;在程序执行过程中&#xff0c;当所访问的信息不在内存的时…

更高级的主播美颜体验:直播美颜SDK的集成与开发方案详解

本篇文章&#xff0c;小编将详细解析如何通过直播美颜SDK实现更高级的主播美颜体验&#xff0c;并提供集成与开发的最佳方案。 一、直播美颜SDK的核心功能 直播美颜SDK是一种集成包&#xff0c;能够提供各种美颜功能&#xff0c;帮助主播在直播过程中实时调整面部特征&#…

147.最小栈

题目 链接&#xff1a;leetcode链接 思路 这道题目做起来还是比较简单的&#xff0c;使用两个栈就可以实现题目要求。 其中一个栈s实现栈的基本功能&#xff0c;另一个栈mins实现检索最小元素的功能。 来看一下怎么样实现检索最小元素的功能呢&#xff1f; 我们可以这么…

软件测试工程师面试题大全(附答案)

1、什么是兼容性测试? 答&#xff1a;兼容性测试是检查软件在不同软件平台&#xff0c;硬件平台上是否可以正常运行的测试。主要查看软件在不同操作系统、浏览器、数据库中运行是否正常。 2、你能不能说下你3-5年的职业规划? 答&#xff1a;首先&#xff0c;要巩固自己的测…

[数据集][目标检测]机油泄漏检测数据集VOC+YOLO格式43张1类别

数据集格式&#xff1a;Pascal VOC格式YOLO格式(不包含分割路径的txt文件&#xff0c;仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数)&#xff1a;43 标注数量(xml文件个数)&#xff1a;43 标注数量(txt文件个数)&#xff1a;43 标注类别数…

部分库函数及其模拟

前言&#xff1a;当我们学习c/c库函数的时候&#xff0c;我们可以用网站 cplusplus.com - The C Resources Network 来进行查阅&#xff0c;学习。 目录 库函数&#xff1a; 1.字符串函数 1.1求字符串长度 strlen 1.2长度不受限制的字符串函数 1.2.1strcpy 1.2.2strca…

TCP Analysis Flags 之 TCP Port numbers reused

前言 默认情况下&#xff0c;Wireshark 的 TCP 解析器会跟踪每个 TCP 会话的状态&#xff0c;并在检测到问题或潜在问题时提供额外的信息。在第一次打开捕获文件时&#xff0c;会对每个 TCP 数据包进行一次分析&#xff0c;数据包按照它们在数据包列表中出现的顺序进行处理。可…

【AI绘画】Midjourney光影控制详解

博客主页&#xff1a; [小ᶻZ࿆] 本文专栏: AI绘画 | Midjourney 文章目录 &#x1f4af;前言&#x1f4af;为什么要学习光影控制光影控制的作用 &#x1f4af;强化主题hard lighting&#xff08;硬光 &#xff09;soft lighting&#xff08;软光/柔光&#xff09;测试 &…

【Java 输入流详解:局部与全局定义及资源管理】

Java 输入流详解&#xff1a;局部与全局定义及资源管理 在Java编程中&#xff0c;输入流&#xff08;如Scanner类&#xff09;是读取用户输入的常用方式。通过Scanner&#xff0c;可以方便地读取不同类型的数据&#xff0c;比如整数、字符串等。作为基于输入流的工具&#xff0…

【非零段划分 / 2】

题目 思路 第一种思路&#xff1a;按照表面题意&#xff0c;枚举p&#xff0c;处理数组后进行计数&#xff1a; 复杂度 ∈ O ( n ⋅ m ) 复杂度 \in O(n \cdot m) 复杂度∈O(n⋅m) 第二种思路&#xff1a;把数组看成一个二维的山形图&#xff0c;先将相邻的水平线段转化成点…

一区霜冰算法+双向深度学习模型+注意力机制!RIME-BiTCN-BiGRU-Attention

一区霜冰算法双向深度学习模型注意力机制&#xff01;RIME-BiTCN-BiGRU-Attention 目录 一区霜冰算法双向深度学习模型注意力机制&#xff01;RIME-BiTCN-BiGRU-Attention效果一览基本介绍程序设计参考资料 效果一览 基本介绍 1.Matlab实现RIME-BiTCN-BiGRU-Attention霜冰算法…

如何本地搭建Whisper语音识别模型

要在本地搭建Whisper语音识别模型&#xff0c;您需要以下几个步骤&#xff1a; 步骤一&#xff1a;系统准备 操作系统: 建议使用Ubuntu 20.04或以上版本&#xff0c;确保系统足够稳定和兼容。硬件配置: 最好有一个强大的GPU&#xff0c;因为语音识别涉及大量的计算工作。推荐…

828华为云征文|华为云Flexus X实例部署k3s与kuboard图形化管理工具

828华为云征文&#xff5c;华为云Flexus X实例部署k3s与kuboard图形化管理工具 华为云最近正在举办828 B2B企业节&#xff0c;Flexus X实例的促销力度非常大&#xff0c;特别适合那些对算力性能有高要求的小伙伴。如果你有自建MySQL、Redis、Nginx等服务的需求&#xff0c;一定…

算法工程师重生之第二天(长度最小的子数组 螺旋矩阵II 区间和 开发商购买土地 总结 )

参考文献 代码随想录 一、长度最小的子数组 给定一个含有 n 个正整数的数组和一个正整数 target 。 找出该数组中满足其总和大于等于 target 的长度最小的 子数组 [numsl, numsl1, ..., numsr-1, numsr] &#xff0c;并返回其长度。如果不存在符合条件的子数组&#xff0c…

全网最适合入门的面向对象编程教程:46 Python函数方法与接口-函数与事件驱动框架

全网最适合入门的面向对象编程教程&#xff1a;46 Python 函数方法与接口-函数与事件驱动框架 摘要&#xff1a; 函数是 Python 中的一等公民,是一种可重用的代码块,用于封装特定的逻辑&#xff1b;事件驱动框架是一种编程模式&#xff0c;它将程序的控制流转移给外部事件,如用…

ssm微信小程序校园失物招领论文源码调试讲解

第二章 开发技术与环境配置 以Java语言为开发工具&#xff0c;利用了当前先进的SSM框架&#xff0c;以MyEclipse10为系统开发工具&#xff0c;MySQL为后台数据库&#xff0c;开发的一个微信小程序校园失物招领。 2.1 Java语言简介 Java是由SUN公司推出&#xff0c;该公司于20…

若依框架使用MyBatis-Plus中的baseMapper的方法报错Invalid bound statement (not found):

Invalid bound statement (not found): com.ruoyi.system.mapper.hc.HcOrderMapper.selectList 解决方法 MybatisSqlSessionFactoryBean sessionFactory new MybatisSqlSessionFactoryBean(); 使用 MybatisSqlSessionFactoryBean 而非 SqlSessionFactoryBean 的原因 MyBatis-…

Elasticsearch数据写入过程

1. 写入请求 当一个写入请求&#xff08;如 Index、Update 或 Delete 请求&#xff09;通过REST API发送到Elasticsearch时&#xff0c;通常包含一个文档的内容&#xff0c;以及该文档的索引和ID。 2. 请求路由 协调节点&#xff1a;首先&#xff0c;请求会到达一个协调节点…