flink重温笔记(十三): flink 高级特性和新特性(2)——ProcessFunction API 和 双流 join

Flink学习笔记

前言:今天是学习 flink 的第 13 天啦!学习了 flink 高级特性和新特性之ProcessFunction API 和 双流 join,主要是解决大数据领域数据从数据增量聚合的问题,以及快速变化中的流数据拉宽问题,即变化中多个数据源合并在一起的问题,结合自己实验猜想和代码实践,总结了很多自己的理解和想法,希望和大家多多交流!

Tips:"分享是快乐的源泉💧,在我的博客里,不仅有知识的海洋🌊,还有满满的正能量加持💪,快来和我一起分享这份快乐吧😊!

喜欢我的博客的话,记得点个红心❤️和小关小注哦!您的支持是我创作的动力!"


文章目录

  • Flink学习笔记
    • 四、Flink 高级特性和新特性
      • 2. Process Function API
        • 2.1 Process Function 分类
        • 2.2 KeyedProcessFunction [重点]
        • 2.3 具有增量聚合的 ProcessWindowFunction
          • 2.3.1 用法概述
          • 2.3.2 使用 ReduceFunction 进行增量窗口聚合
          • 2.3.3 使用 AggerateFunction 进行增量窗口聚合
          • 2.3.4 Using per-window state in ProcessWindowFunction
      • 3. 双流 Join
        • 3.1 面试介绍
        • 3.2 Window Join
          • 3.2.1 Tumbling Window Join
          • 3.2.2 Sliding Window Join
          • 3.2.3 Session Window Join
          • 3.2.3 案例演示
        • 3.3 Interval Join
          • 3.3.1 Interval Join 介绍
          • 3.3.2 案例演示

四、Flink 高级特性和新特性

2. Process Function API

之前的转换算子是无法访问时间戳信息和水位线信息的,但 Process Function 可以访问时间戳,水位线,以及注册定时时间等,Flink SQL 就是使用 Process Function 实现的

2.1 Process Function 分类
  • 1- ProcessFunction 用于 dataStream
  • 2- KeyedProcessFunction 用于 Keyed dataStream
  • 3- CoProcessFunction 用于 connect 连接的流
  • 4- ProcessJoinFunction 用于 join 流操作
  • 5- BroadcastProcessFunction 用于广播
  • 6- KeyedBroadcastProcessFunction 用于 keyed 后的广播
  • 7- ProcessWindowFunction 窗口增量聚合
  • 8- ProcessAllWindowFunction 全窗口聚合

2.2 KeyedProcessFunction [重点]

KeyedProcessFunction 作为 ProcessFunction 的扩展,在其 onTimer(…) 方法中提供对定时器对应key的访问。

所有的 Process Function 都继承自 RichFunction 接口,所以都有:

  • open()
  • close()
  • getRuntimeContext()

KeyedProcessFunction 额外提供了两个方法:

  • processElement,每个元素调用一次
  • onTimer,回调函数,用于定时器

案例:在服务器运维中,需要实时监控服务器机架的温度,如果一定时间内温度超过了一定阈值(100度),且后一次上报的温度超过了前一次上报的温度,需要触发告警(温度持续升高中)

package cn.itcast.day12.process;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;import org.apache.commons.collections.IteratorUtils;
import java.text.SimpleDateFormat;/*** @author lql* @time 2024-03-08 13:01:05* @description TODO:数据结构:(id,温度)*/
public class SystemMonitorDemo {public static void main(String[] args) throws Exception {// todo 1) 初始化 flink 环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// todo 2) 指定并行度为 1env.setParallelism(1);// todo 3) 接入数据源DataStreamSource<String> socketTextStream = env.socketTextStream("node1", 9999);// todo 4) 将获取的数据转化为 tupleSingleOutputStreamOperator<Tuple2<Integer, Integer>> tupleDataStream = socketTextStream.map(new MapFunction<String, Tuple2<Integer, Integer>>() {@Overridepublic Tuple2<Integer, Integer> map(String line) throws Exception {String[] arrayData = line.split(",");return Tuple2.of(Integer.parseInt(arrayData[0]), Integer.parseInt(arrayData[1]));}});// todo 5) 分组操作KeyedStream<Tuple2<Integer, Integer>, Integer> tuple2TupleKeyedStream = tupleDataStream.keyBy(t -> t.f0);// todo 6) 自定义ProcessFunction对象,继承 KeyedProcessFunction 抽象类SingleOutputStreamOperator<String> result = tuple2TupleKeyedStream.process(new MyKeyedProcessFunction());// todo 7) 打印输出result.printToErr();// todo 8) 执行程序env.execute();}private static class MyKeyedProcessFunction extends KeyedProcessFunction<Integer,Tuple2<Integer,Integer>,String> {// 定义数据存储对象private ListState<Tuple2<Integer,Integer>> listState = null;// 定义时间对象SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 定义时间private Long timeTS = 0L;/*** 初始化资源* @param parameters* @throws Exception*/@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// 实例化 state 对象this.listState = getRuntimeContext().getListState(new ListStateDescriptor<Tuple2<Integer, Integer>>("listState",TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {})));System.out.println("初始化state对象...");}@Overridepublic void close() throws Exception {super.close();}/*** 定时器触发方法* @param timestamp* @param ctx* @param out* @throws Exception*/@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {super.onTimer(timestamp, ctx, out);System.out.println("触发了定时服务...");// 迭代转化状态到列表中,然后计算个数int stateSize = IteratorUtils.toList(this.listState.get().iterator()).size();if(stateSize >= 2){//返回数据,触发告警out.collect("触发了告警");}//清空历史的状态数据this.listState.clear();}/*** 对数据集中的每条数据进行处理* @param integerIntegerTuple2* @param context* @param collector* @throws Exception*/@Overridepublic void processElement(Tuple2<Integer, Integer> integerIntegerTuple2, Context context, Collector<String> collector) throws Exception {//获取状态中存储的历史数据Tuple2<Integer, Integer> lastData = null;for (Tuple2<Integer, Integer> tuple : listState.get()){lastData =tuple;}// 判断状态中的数据是否为空if (lastData==null){lastData = Tuple2.of(0,0);}System.out.println("状态中获取到的数据是:"+lastData);if (integerIntegerTuple2.f1 > 100 & integerIntegerTuple2.f1 > lastData.f1){System.out.println("温度上升中...注册定时器!");//满足了温度大于100,且后一次的温度大于前一次的温度//将当前的温度存储起来listState.add(Tuple2.of(integerIntegerTuple2.f0,integerIntegerTuple2.f1));//注册一个定时器(当前处理的时间+窗口长度=触发计算的时间)timeTS = context.timerService().currentProcessingTime() + 10000L;context.timerService().registerProcessingTimeTimer(timeTS);}else{if (integerIntegerTuple2.f1 < lastData.f1){System.out.println("温度下降了...取消定时器!");//取消定时器context.timerService().deleteProcessingTimeTimer(timeTS);}if (integerIntegerTuple2.f1 < 100){//清除状态存储的数据listState.clear();}}}}
}

结果:

输入:
1,100
1,101输出:
温度上升中...注册定时器!
触发了告警

2.3 具有增量聚合的 ProcessWindowFunction
image-20240308165337455

在 reduce 和 aggregate 中均有可以和 processWindowFunction 结合实现增量聚合的方法(红角星标记)。

原理:对于一个窗口来说,先增量计算,关闭窗口前,增量计算结果发给 ProcessWindowFunction 作为输入再全量处理。

特点:既可以增量聚合,又可以访问窗口的元数据信息(比如开始时间、状态等)。


2.3.1 用法概述
input.keyBy(...).timeWindow(...).reduce(incrAggregator: ReduceFunction[IN],function: ProcessWindowFunction[IN, OUT, K, W])input.keyBy(...).timeWindow(...).aggregate(incrAggregator: AggregateFunction[IN, ACC, V],windowFunction: ProcessWindowFunction[V, OUT, K, W])

2.3.2 使用 ReduceFunction 进行增量窗口聚合

数据:

{"userID": "user_1", "eventTime": "2020-11-09 10:41:32", "eventType": "browse", "productID": "product_1", "productPrice": 10}
{"userID": "user_1", "eventTime": "2020-11-09 10:41:33", "eventType": "browse", "productID": "product_1", "productPrice": 30}
{"userID": "user_1", "eventTime": "2020-11-09 10:41:34", "eventType": "browse", "productID": "product_1", "productPrice": 20}
{"userID": "user_1", "eventTime": "2020-11-09 10:41:36", "eventType": "browse", "productID": "product_1", "productPrice": 10}
{"userID": "user_1", "eventTime": "2020-11-09 10:41:38", "eventType": "browse", "productID": "product_1", "productPrice": 70}
{"userID": "user_1", "eventTime": "2020-11-09 10:41:40", "eventType": "browse", "productID": "product_1", "productPrice": 20}

例子:获取一段时间内(Window Size)每个用户(KeyBy)浏览的商品的最大价值的那条记录(ReduceFunction),并获得Key和Window信息。

package cn.itcast.day12.process;import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.collections.IteratorUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.joda.time.DateTime;import java.text.SimpleDateFormat;
import java.time.Duration;
/*** @author lql* @time 2024-03-08 17:06:59* @description TODO*/
public class ReduceAndProcessFunction {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<String> lines = env.socketTextStream("node1", 9999);// todo 3) 将获取的 json 数据解析成 java beanlines.process(new SocketProcessFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<UserActionLog>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<UserActionLog>() {@Overridepublic long extractTimestamp(UserActionLog userActionLog, long l) {try {SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");return format.parse(userActionLog.getEventTime()).getTime();} catch (Exception e) {e.printStackTrace();return 0L;} }}))// 按照用户分组.keyBy( (KeySelector<UserActionLog,String>) UserActionLog::getUserID )// 构造窗口函数 TimeWindow:滚动事件时间窗口.window(TumblingEventTimeWindows.of(Time.seconds(5)))// 窗口函数: 获取这段窗口时间内每个用户浏览的商品的最大价值对应的那条记录.reduce(//增量聚合操作new ReduceFunction<UserActionLog>() {@Overridepublic UserActionLog reduce(UserActionLog value1, UserActionLog value2) throws Exception {return value1.getProductPrice() > value2.getProductPrice() ? value1 : value2;}},//窗口函数操作,其中迭代器中的数据只有一条,已经进行了增量聚合new ProcessWindowFunction<UserActionLog, String, String, TimeWindow>() {@Overridepublic void process(String key, Context context, Iterable<UserActionLog> elements, Collector<String> out) throws Exception {UserActionLog max = elements.iterator().next();System.out.println("集合中的数据:"+ IteratorUtils.toList(elements.iterator()).size());String windowStart = new DateTime(context.window().getStart()).toString("yyyy-MM-dd HH:mm:ss");String windowEnd = new DateTime(context.window().getEnd()).toString("yyyy-MM-dd HH:mm:ss");String record = "key:"+key+"\n"+"窗口开始时间:"+windowStart+"\n窗口结束时间:"+windowEnd+"\n浏览的商品最大价值对应的记录:"+max;out.collect(record);}}).print();// todo 4) 启动程序env.execute();}@Data@AllArgsConstructor@NoArgsConstructorpublic static class UserActionLog{private String userID;private String eventTime;private String eventType;private String productID;private Long productPrice;}/*** 将获取的JSON数据解析成Java Bean*/private static class SocketProcessFunction extends ProcessFunction<String,UserActionLog>{/*** 每条数据都需要执行的方法* @param s* @param context* @param collector* @throws Exception*/@Overridepublic void processElement(String s, Context context, Collector<UserActionLog> collector) throws Exception {collector.collect(JSON.parseObject( s, UserActionLog.class ));}}
}

结果:

集合中的数据:1
key:user_1
窗口开始时间:2020-11-09 10:41:30
窗口结束时间:2020-11-09 10:41:35
浏览的商品最大价值对应的记录:ReduceAndProcessFunction.UserActionLog(userID=user_1, eventTime=2020-11-09 10:41:33, eventType=browse, productID=product_1, productPrice=30)
集合中的数据:1
key:user_1
窗口开始时间:2020-11-09 10:41:35
窗口结束时间:2020-11-09 10:41:40
浏览的商品最大价值对应的记录:ReduceAndProcessFunction.UserActionLog(userID=user_1, eventTime=2020-11-09 10:41:38, eventType=browse, productID=product_1, productPrice=70)

总结:

  • 1- 需要先设置并行度为1,便于少量数据观察到结果
  • 2- reduce/aggregate 暂时不需要 RichreduceFunction,报错:ReduceFunction of apply can not be a RichFunction.

2.3.3 使用 AggerateFunction 进行增量窗口聚合

数据:

{"userID": "user_1", "eventTime": "2020-11-09 10:41:32", "eventType": "browse", "productID": "product_1", "productPrice": 10}
{"userID": "user_1", "eventTime": "2020-11-09 10:41:33", "eventType": "browse", "productID": "product_1", "productPrice": 30}
{"userID": "user_1", "eventTime": "2020-11-09 10:41:34", "eventType": "browse", "productID": "product_1", "productPrice": 20}
{"userID": "user_1", "eventTime": "2020-11-09 10:41:36", "eventType": "browse", "productID": "product_1", "productPrice": 10}
{"userID": "user_1", "eventTime": "2020-11-09 10:41:38", "eventType": "browse", "productID": "product_1", "productPrice": 70}
{"userID": "user_1", "eventTime": "2020-11-09 10:41:40", "eventType": "browse", "productID": "product_1", "productPrice": 20}

例子:获取一段时间内(Window Size)每个用户(KeyBy)浏览的平均价值(AggregateFunction),并获得Key和Window信息。

package cn.itcast.day12.process;import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.collections.IteratorUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;import java.text.SimpleDateFormat;
import java.time.Duration;/*** @author lql* @time 2024-03-08 17:59:42* @description TODO*/
public class AggregateAndProcessFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<String> lines = env.socketTextStream("node1", 9999);// 将从Kafka获取的JSON数据解析成Java Beanlines.process(new KafkaProcessFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<UserActionLog>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<UserActionLog>() {@Overridepublic long extractTimestamp(UserActionLog element, long recordTimestamp) {try {SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");return format.parse(element.getEventTime()).getTime();} catch (Exception e) {e.printStackTrace();return 0L;}}}))// 按用户分组.keyBy((KeySelector<UserActionLog, String>) UserActionLog::getUserID)// 构造TimeWindow.window(TumblingEventTimeWindows.of(Time.seconds(5)))// 窗口函数: 获取这段窗口时间内,每个用户浏览的商品的平均价值,并发出Key和Window信息.aggregate(new AggregateFunction<UserActionLog, Tuple2<Long, Long>, Double>() {// 1、初始值// 定义累加器初始值@Overridepublic Tuple2<Long, Long> createAccumulator() {return new Tuple2<>(0L, 0L);}// 2、累加// 定义累加器如何基于输入数据进行累加@Overridepublic Tuple2<Long, Long> add(UserActionLog value, Tuple2<Long, Long> accumulator) {accumulator.f0 += 1;accumulator.f1 += value.getProductPrice();return accumulator;}// 3、合并// 定义累加器如何和State中的累加器进行合并@Overridepublic Tuple2<Long, Long> merge(Tuple2<Long, Long> acc1, Tuple2<Long, Long> acc2) {acc1.f0 += acc2.f0;acc1.f1 += acc2.f1;return acc1;}@Overridepublic Double getResult(Tuple2<Long, Long> longLongTuple2) {return longLongTuple2.f1 / (longLongTuple2.f0 * 1.0);}},new ProcessWindowFunction<Double, String, String, TimeWindow>() {@Overridepublic void process(String key, Context context, Iterable<Double> elements, Collector<String> out) throws Exception {Double avg = elements.iterator().next();String windowStart = new DateTime(context.window().getStart(), DateTimeZone.forID("+08:00")).toString("yyyy-MM-dd HH:mm:ss");String windowEnd=new DateTime(context.window().getEnd(), DateTimeZone.forID("+08:00")).toString("yyyy-MM-dd HH:mm:ss");String record="Key: "+key+" 窗口开始时间: "+windowStart+" 窗口结束时间: "+windowEnd+" 浏览的商品的平均价值: "+String.format("%.2f",avg);out.collect(record);}}).print();env.execute();}@Data@AllArgsConstructor@NoArgsConstructorpublic static class UserActionLog{private String userID;private String eventTime;private String eventType;private String productID;private Long productPrice;}/*** 将从Kafka获取的JSON数据解析成Java Bean*/private static class KafkaProcessFunction extends ProcessFunction<String, UserActionLog> {@Overridepublic void processElement(String value, Context ctx, Collector<UserActionLog> out) throws Exception {out.collect(JSON.parseObject(value, UserActionLog.class));}}
}

结果:

Key: user_1 窗口开始时间: 2020-11-09 10:41:30 窗口结束时间: 2020-11-09 10:41:35 浏览的商品的平均价值: 20.00
Key: user_1 窗口开始时间: 2020-11-09 10:41:35 窗口结束时间: 2020-11-09 10:41:40 浏览的商品的平均价值: 40.00

总结:

  • 这种方法主要以 aggregate 的累加器思路为重点,processWindowFunction 的方法主要是为了更能输出状态数据等信息。

2.3.4 Using per-window state in ProcessWindowFunction

与 windowFunction 不同,使用 ProcessWindowFunction 不仅仅可以拿到窗口内数据信息,还可以获取两个状态:

  • WindowState:表示窗口的状态,该状态值和窗口绑定的,一旦窗口消亡状态消失。
  • GlobalState:表示窗口的状态,该状态和Key绑定的,可以累计多个窗口的值。

数据:

1000,spark,2
5000,spark,2
6000,spark,3
10000,spark,5

例子:

package cn.itcast.day12.process;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.Iterator;/*** @author lql* @time 2024-03-08 20:36:12* @description TODO*/public class WindowStateAndGlobalStateFunctionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(10000);//1000,spark,3//1200,spark,5//2000,hadoop,2//socketTextStream返回的DataStream并行度为1DataStreamSource<String> lines = env.socketTextStream("node1", 9999);SingleOutputStreamOperator<String> dataWithWaterMark = lines.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner((line, timestamp) -> Long.parseLong(line.split(",")[0])));SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = dataWithWaterMark.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {String[] fields = value.split(",");return Tuple2.of(fields[1], Integer.parseInt(fields[2]));}});//调用keyByKeyedStream<Tuple2<String, Integer>, String> keyed = wordAndCount.keyBy(t -> t.f0);//NonKeyd Window: 不调用KeyBy,然后调用windowAll方法,传入windowAssinger// Keyd Window: 先调用KeyBy,然后调用window方法,传入windowAssingerWindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed = keyed.window(TumblingEventTimeWindows.of(Time.seconds(5)));//如果直接调用sum或reduce,只会聚合窗口内的数据,不去跟历史数据进行累加//需求:可以在窗口内进行增量聚合,并且还可以与历史数据进行聚合SingleOutputStreamOperator<String> result = windowed.aggregate(new MyAggFunc(), new MyWindowFunc());result.print();env.execute();}private static class MyAggFunc implements AggregateFunction<Tuple2<String, Integer>, Integer, Integer> {//创建一个初始值@Overridepublic Integer createAccumulator() {return 0;}//数据一条数据,与初始值或中间累加的结果进行聚合@Overridepublic Integer add(Tuple2<String, Integer> value, Integer accumulator) {return value.f1 + accumulator;}//返回的结果@Overridepublic Integer getResult(Integer accumulator) {return accumulator;}//如果使用的是非SessionWindow,可以不实现@Overridepublic Integer merge(Integer a, Integer b) {return null;}}private static class MyWindowFunc extends ProcessWindowFunction<Integer, String, String, TimeWindow> {// 一个是窗口描述器,一个是全局描述器private transient ReducingStateDescriptor<Integer> windowStateDescriptor;private transient ReducingStateDescriptor<Integer> globalStateDescriptor;@Overridepublic void open(Configuration parameters) throws Exception {windowStateDescriptor = new ReducingStateDescriptor<Integer>("window",new ReduceFunction<Integer>() {@Overridepublic Integer reduce(Integer value1, Integer value2) throws Exception {return value1 + value2;}}, TypeInformation.of(new TypeHint<Integer>() { }));globalStateDescriptor = new ReducingStateDescriptor<Integer>("global",new ReduceFunction<Integer>() {@Overridepublic Integer reduce(Integer value1, Integer value2) throws Exception {return value1 + value2;}}, TypeInformation.of(new TypeHint<Integer>() { }));}@Overridepublic void process(String key, Context context, Iterable<Integer> elements, Collector<String> out) throws Exception {Integer sum = 0;Iterator<Integer> iterator = elements.iterator();while (iterator.hasNext()){sum += iterator.next();}ReducingState<Integer> windowState = context.windowState().getReducingState(windowStateDescriptor);ReducingState<Integer> globalState = context.globalState().getReducingState(globalStateDescriptor);// lambda 表达式的遍历,每个元素 telements.forEach(t -> {try {windowState.add(t);globalState.add(t);} catch (Exception exception) {exception.printStackTrace();}});out.collect(key+",window:"+windowState.get()+",global:"+globalState.get());}}
}

结果:

1> spark,window:2,global:2
1> spark,window:5,global:7

总结:

  • 1- 注册两个状态描述器之后,需要重写 open 方法;
  • 2- 在 open 方法中,都需要 new 一个 ReducingStateDescriptor,然后重写 reduce 方法进行累加操作;
  • 3- 在 process 方法中,进行元素的迭代求和;
  • 4- 极为主要的是,运用 context.windowState() / globalState(),这个是主要区别!

3. 双流 Join

3.1 面试介绍

Join大体分类只有两种:Window Join 和 Interval Join

  • Window Join 将数据缓存在 Window State 中,窗口触发计算时执行join操作

    • Tumbling Window Join
    • Sliding Window Join
    • Session Widnow Join。
  • interval join

    • 也是利用state存储数据再处理,区别在于state中的数据有失效机制,依靠数据触发数据清理

3.2 Window Join
3.2.1 Tumbling Window Join

执行翻滚窗口联接时,具有公共键和公共翻滚窗口的所有元素将作为成对组合联接,并传递给 JoinFunction 或 FlatJoinFunction。

注意,在翻滚窗口[6,7]中没有发射任何东西,因为绿色流中不存在与橙色元素⑥和⑦结合的元素。

使用模板:

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(TumblingEventTimeWindows.of(Time.milliseconds(2))).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});

3.2.2 Sliding Window Join

在执行滑动窗口联接时,具有公共键和公共滑动窗口的所有元素将作为成对组合联接,并传递给 JoinFunction 或 FlatJoinFunction。

注意,在窗口[2,3]中,橙色②与绿色③连接,但在窗口[1,2]中没有与任何对象连接。

使用模板:

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...orangeStream.join(greenStream).where(<KeySelector>)
.equalTo(<KeySelector>)
.window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */)).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}
});

3.2.3 Session Window Join

在执行会话窗口联接时,具有相同键(当“组合”时满足会话条件)的所有元素以成对组合方式联接,并传递给JoinFunction或FlatJoinFunction。

注意,在第三个会话中,绿色流中没有元素,所以⑧和⑨没有连接!

使用模板:

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...orangeStream.join(greenStream).where(<KeySelector>)
.equalTo(<KeySelector>)
.window(EventTimeSessionWindows.withGap(Time.milliseconds(1))).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}
});

3.2.3 案例演示

例子:使用两个指定Source模拟数据,一个Source是订单明细,一个Source是商品数据。我们通过window join,将数据关联到一起。

package cn.itcast.day13.join;/*** @author lql* @time 2024-03-09 21:03:00* @description TODO:
思路
Window Join首先需要使用where和equalTo指定使用哪个key来进行关联,此处我们通过应用方法,基于GoodsId来关联两个流中的元素。
设置5秒的滚动窗口,流的元素关联都会在这个5秒的窗口中进行关联。
apply方法中实现将两个不同类型的元素关联并生成一个新类型的元素。*/import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;/*** 来做个案例:* 使用两个指定Source模拟数据,一个Source是订单明细,一个Source是商品数据。我们通过window join,将数据关联到一起。*/
public class JoinDemo01 {public static void main(String[] args) throws Exception {//todo 1)环境初始化StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//todo 2)设置并行度env.setParallelism(1);//todo 3)构建数据源//构建商品数据流// 因为继承的 Richsource 没有指出返回类型,所以这里需要指出了!!!SingleOutputStreamOperator<Goods> goodsDataStream = env.addSource(new GoodsSource11(), TypeInformation.of(Goods.class)).assignTimestampsAndWatermarks(new GoodsWatermark());//构建订单明细数据流SingleOutputStreamOperator<OrderItem> orderItemDataStream = env.addSource(new OrderItemSource(), TypeInformation.of(OrderItem.class)).assignTimestampsAndWatermarks(new OrderDetailWatermark());DataStream<FactOrderItem> result = goodsDataStream.join(orderItemDataStream)//第一个流的where.where(Goods::getGoodsId)//第二个流的where.equalTo(OrderItem::getGoodsId)//添加窗口.window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new JoinFunction<Goods, OrderItem, FactOrderItem>() {@Overridepublic FactOrderItem join(Goods goods, OrderItem orderItem) throws Exception {FactOrderItem factOrderItem = new FactOrderItem();factOrderItem.setGoodsId(goods.getGoodsId());factOrderItem.setGoodsName(goods.getGoodsName());factOrderItem.setCount(new BigDecimal(orderItem.getCount()));factOrderItem.setTotalMoney(goods.getGoodsPrice().multiply(new BigDecimal(orderItem.getCount())));return factOrderItem;}});result.printToErr();env.execute();}//商品类@Data@NoArgsConstructor@AllArgsConstructorpublic static class Goods {private String goodsId;private String goodsName;private BigDecimal goodsPrice;public static List<Goods> GOODS_LIST;public static Random r;static  {r = new Random();GOODS_LIST = new ArrayList<>();GOODS_LIST.add(new Goods("1", "小米12", new BigDecimal(4890)));GOODS_LIST.add(new Goods("2", "iphone12", new BigDecimal(12000)));GOODS_LIST.add(new Goods("3", "MacBookPro", new BigDecimal(15000)));GOODS_LIST.add(new Goods("4", "Thinkpad X1", new BigDecimal(9800)));GOODS_LIST.add(new Goods("5", "MeiZu One", new BigDecimal(3200)));GOODS_LIST.add(new Goods("6", "Mate 40", new BigDecimal(6500)));}public static Goods randomGoods() {int rIndex = r.nextInt(GOODS_LIST.size());return GOODS_LIST.get(rIndex);}@Overridepublic String toString() {return JSON.toJSONString(this);}}//订单明细类@Data@AllArgsConstructor@NoArgsConstructorpublic static class OrderItem {private String itemId;private String goodsId;private Integer count;@Overridepublic String toString() {return JSON.toJSONString(this);}}//关联结果@Data@AllArgsConstructor@NoArgsConstructorpublic static class FactOrderItem {private String goodsId;private String goodsName;private BigDecimal count;private BigDecimal totalMoney;private String itemId;@Overridepublic String toString() {return JSON.toJSONString(this);}}//构建一个商品Stream源(这个好比就是维表)public static class GoodsSource11 extends RichSourceFunction {private Boolean isCancel;@Overridepublic void open(Configuration parameters) throws Exception {isCancel = false;}@Overridepublic void run(SourceContext sourceContext) throws Exception {while(!isCancel) {// steam 可以将列表转化为流// lambda 表达式将返回对象逐个进行 collectGoods.GOODS_LIST.stream().forEach(goods -> sourceContext.collect(goods));TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {isCancel = true;}}//构建订单明细Stream源public static class OrderItemSource extends RichSourceFunction {private Boolean isCancel;private Random r;@Overridepublic void open(Configuration parameters) throws Exception {isCancel = false;r = new Random();}@Overridepublic void run(SourceContext sourceContext) throws Exception {while(!isCancel) {Goods goods = Goods.randomGoods();OrderItem orderItem = new OrderItem();orderItem.setGoodsId(goods.getGoodsId());orderItem.setCount(r.nextInt(10) + 1);orderItem.setItemId(UUID.randomUUID().toString());sourceContext.collect(orderItem);orderItem.setGoodsId("111");sourceContext.collect(orderItem);TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {isCancel = true;}}// 因为这里没有指定是哪一种水印,重写两个方法!/*** 定义商品水印信息*/private static class GoodsWatermark implements WatermarkStrategy<Goods> {// 因为这里看见水印生成器,所以一定要想到有继承方法,参考自定义水印章节@Overridepublic WatermarkGenerator<Goods> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<Goods>(){ // 继承两个方法@Overridepublic void onEvent(Goods goods, long eventTimestamp, WatermarkOutput watermarkOutput) {System.out.println("商品数据时间:"+System.currentTimeMillis());watermarkOutput.emitWatermark(new Watermark(System.currentTimeMillis()));}@Overridepublic void onPeriodicEmit(WatermarkOutput watermarkOutput) {watermarkOutput.emitWatermark(new Watermark(System.currentTimeMillis()));}};}@Overridepublic TimestampAssigner<Goods> createTimestampAssigner(TimestampAssignerSupplier.Context context) {// 在流处理过程中,每个 Goods 元素都将被分配一个当前的时间戳return (element, recordTimestamp) -> System.currentTimeMillis();}}/*** 定义订单明细数据流的水印*/public static class OrderDetailWatermark implements WatermarkStrategy<OrderItem>{@Overridepublic WatermarkGenerator<OrderItem> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<OrderItem>() {@Overridepublic void onEvent(OrderItem event, long eventTimestamp, WatermarkOutput output) {System.out.println("订单明细数据时间:"+System.currentTimeMillis());output.emitWatermark(new Watermark(System.currentTimeMillis()));}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis()));}};}@Overridepublic TimestampAssigner<OrderItem> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return (element, recordTimestamp) -> System.currentTimeMillis();}}
}

结果:

订单明细数据时间:1709991872660
商品数据时间:1709991872660
订单明细数据时间:1709991872661
商品数据时间:1709991872661
商品数据时间:1709991872661
商品数据时间:1709991872661
商品数据时间:1709991872661
商品数据时间:1709991872661
订单明细数据时间:1709991873665
商品数据时间:1709991873665
订单明细数据时间:1709991873665
商品数据时间:1709991873665
商品数据时间:1709991873665
商品数据时间:1709991873665
商品数据时间:1709991873665
商品数据时间:1709991873665
订单明细数据时间:1709991874665
商品数据时间:1709991874665
商品数据时间:1709991874665
商品数据时间:1709991874665
商品数据时间:1709991874665
商品数据时间:1709991874665
商品数据时间:1709991874665
订单明细数据时间:1709991874665{"count":4,"goodsId":"5","goodsName":"MeiZu One","totalMoney":12800}
{"count":4,"goodsId":"5","goodsName":"MeiZu One","totalMoney":12800}
{"count":4,"goodsId":"5","goodsName":"MeiZu One","totalMoney":12800}
{"count":4,"goodsId":"4","goodsName":"Thinkpad X1","totalMoney":39200}
{"count":4,"goodsId":"4","goodsName":"Thinkpad X1","totalMoney":39200}
{"count":4,"goodsId":"4","goodsName":"Thinkpad X1","totalMoney":39200}
{"count":1,"goodsId":"2","goodsName":"iphone12","totalMoney":12000}
{"count":1,"goodsId":"2","goodsName":"iphone12","totalMoney":12000}
{"count":1,"goodsId":"2","goodsName":"iphone12","totalMoney":12000}

总结:

  • 1- 注意定义 java bean 类处理流信息的时候
  • 2- 窗口流注意水印操作的生成器方式,发生水印的时间
  • 3- joinFunction 需要重写 join 方法

3.3 Interval Join
3.3.1 Interval Join 介绍
  • interval join也是使用相同的key来join两个流(流A、流B),并且流B中的元素中的时间戳,和流A元素的时间戳,有一个时间间隔。

b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound]

a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

流B的元素的时间戳 ≥ 流A的元素时间戳 + 下界(负号),且,流B的元素的时间戳 ≤ 流A的元素时间戳 + 上界(正号)。

这些边界是包含的,但是可以应用 .lowerBoundExclusive().upperBoundExclusive 来更改行为!

使用模板:

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...orangeStream.keyBy(<KeySelector>).intervalJoin(greenStream.keyBy(<KeySelector>)).between(Time.milliseconds(-2), Time.milliseconds(1)).process (new ProcessJoinFunction<Integer, Integer, String(){@Overridepublic void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {out.collect(first + "," + second);}
});

3.3.2 案例演示

例子:

package cn.itcast.day13.join;import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/*** @author lql* @time 2024-03-09 22:27:18* @description TODO*/
public class JoinDemo02 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 构建商品数据流DataStream<Goods> goodsDS = env.addSource(new GoodsSource11(), TypeInformation.of(Goods.class)).assignTimestampsAndWatermarks(new GoodsWatermark());// 构建订单明细数据流DataStream<OrderItem> orderItemDS = env.addSource(new OrderItemSource(), TypeInformation.of(OrderItem.class)).assignTimestampsAndWatermarks(new OrderItemWatermark());// 进行关联查询SingleOutputStreamOperator<FactOrderItem> factOrderItemDS = orderItemDS.keyBy(item -> item.getGoodsId()).intervalJoin(goodsDS.keyBy(goods -> goods.getGoodsId())).between(Time.seconds(-1), Time.seconds(0)).upperBoundExclusive().process(new ProcessJoinFunction<OrderItem, Goods, FactOrderItem>() {@Overridepublic void processElement(OrderItem left, Goods right, Context ctx, Collector<FactOrderItem> out) throws Exception {FactOrderItem factOrderItem = new FactOrderItem();factOrderItem.setGoodsId(right.getGoodsId());factOrderItem.setGoodsName(right.getGoodsName());factOrderItem.setCount(new BigDecimal(left.getCount()));factOrderItem.setTotalMoney(right.getGoodsPrice().multiply(new BigDecimal(left.getCount())));out.collect(factOrderItem);}});factOrderItemDS.print();env.execute("Interval JOIN");}//商品类@Datapublic static class Goods {private String goodsId;private String goodsName;private BigDecimal goodsPrice;public static List<Goods> GOODS_LIST;public static Random r;static {r = new Random();GOODS_LIST = new ArrayList<>();GOODS_LIST.add(new Goods("1", "小米12", new BigDecimal(4890)));GOODS_LIST.add(new Goods("2", "iphone12", new BigDecimal(12000)));GOODS_LIST.add(new Goods("3", "MacBookPro", new BigDecimal(15000)));GOODS_LIST.add(new Goods("4", "Thinkpad X1", new BigDecimal(9800)));GOODS_LIST.add(new Goods("5", "MeiZu One", new BigDecimal(3200)));GOODS_LIST.add(new Goods("6", "Mate 40", new BigDecimal(6500)));}public static Goods randomGoods() {int rIndex = r.nextInt(GOODS_LIST.size());return GOODS_LIST.get(rIndex);}public Goods() {}public Goods(String goodsId, String goodsName, BigDecimal goodsPrice) {this.goodsId = goodsId;this.goodsName = goodsName;this.goodsPrice = goodsPrice;}@Overridepublic String toString() {return JSON.toJSONString(this);}}//订单明细类@Datapublic static class OrderItem {private String itemId;private String goodsId;private Integer count;@Overridepublic String toString() {return JSON.toJSONString(this);}}//关联结果@Datapublic static class FactOrderItem {private String goodsId;private String goodsName;private BigDecimal count;private BigDecimal totalMoney;@Overridepublic String toString() {return JSON.toJSONString(this);}}//构建一个商品Stream源(这个好比就是维表)public static class GoodsSource11 extends RichSourceFunction {private Boolean isCancel;@Overridepublic void open(Configuration parameters) throws Exception {isCancel = false;}@Overridepublic void run(SourceContext sourceContext) throws Exception {while (!isCancel) {Goods.GOODS_LIST.stream().forEach(goods -> sourceContext.collect(goods));TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {isCancel = true;}}//构建订单明细Stream源public static class OrderItemSource extends RichSourceFunction {private Boolean isCancel;private Random r;@Overridepublic void open(Configuration parameters) throws Exception {isCancel = false;r = new Random();}@Overridepublic void run(SourceContext sourceContext) throws Exception {while (!isCancel) {Goods goods = Goods.randomGoods();OrderItem orderItem = new OrderItem();orderItem.setGoodsId(goods.getGoodsId());orderItem.setCount(r.nextInt(10) + 1);orderItem.setItemId(UUID.randomUUID().toString());sourceContext.collect(orderItem);orderItem.setGoodsId("111");sourceContext.collect(orderItem);TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {isCancel = true;}}//构建水印分配器(此处为了简单),直接使用系统时间了public static class GoodsWatermark implements WatermarkStrategy<Goods> {@Overridepublic TimestampAssigner<Goods> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return (element, recordTimestamp) -> System.currentTimeMillis();}@Overridepublic WatermarkGenerator<Goods> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<Goods>() {@Overridepublic void onEvent(Goods event, long eventTimestamp, WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis()));}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis()));}};}}public static class OrderItemWatermark implements WatermarkStrategy<OrderItem> {@Overridepublic TimestampAssigner<OrderItem> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return (element, recordTimestamp) -> System.currentTimeMillis();}@Overridepublic WatermarkGenerator<OrderItem> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<OrderItem>() {@Overridepublic void onEvent(OrderItem event, long eventTimestamp, WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis()));}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis()));}};}}
}

结果:

5> {"count":4,"goodsId":"5","goodsName":"MeiZu One","totalMoney":12800}
3> {"count":9,"goodsId":"3","goodsName":"MacBookPro","totalMoney":135000}

总结:

  • 1- connect + broadcast 连接适用于数据几乎不变的情况下
  • 2- BroadcastState 连接适用于数据变化不那么快的情况下
  • 3- 双流 Join 连接适用于流式数据变化很快的情况下(类似于股价)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/272447.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

idea连接远程服务器

1. 双击shift&#xff0c;出现如下界面 2. 远程连接 原文来自这个up主的&#xff0c;点击蓝色字体就可以跳转啦&#xff01; 输入主机ip、用户名、密码&#xff0c;点击Test Connection验证&#xff0c;最后点击ok添加成功 有用的话记得给俺点个赞&#xff0c;靴靴~

出现身份验证错误,无法连接到本地安全机构 顺利解决这个问题希望能帮助大家

出现身份验证错误&#xff0c;无法连接到本地安全机构&#xff0c;远程计算机&#xff1a;XX&#xff0c;这可能是由于密码过期&#xff0c;如果密码已过期请更新密码。 我们可以在系统属性中对远程进行设置&#xff0c;以解决远程桌面无法连接到本地安全机构这一问题。 步骤…

AndroidStudio跑马灯实现

在activity_main.xml中编写如下代码&#xff1a; <?xml version"1.0" encoding"utf-8"?> <LinearLayout xmlns:android"http://schemas.android.com/apk/res/android"android:layout_width"match_parent"android:layout_h…

如何配置IDEA中的JavaWeb环境(2023最新版)

创建项目 中文版&#xff1a;【文件】-【新建】-【项目】 点击【新建项目】&#xff0c;改好【名称】点击【创建】 右键自己建立的项目-【添加框架支持】&#xff08;英文版是Add Framework Support...&#xff09; 勾选【Web应用程序】-【确定】 配置tomcat 点击编辑配置 点…

保姆级讲解字符串函数(上篇)

目录 字符分类函数 导图 函数介绍 1.getchar 2. isupper 和 islower 字符转换函数&#xff1a;&#xff08;toupper , tolower&#xff09; 与 putchar 字符串函数 导图 string函数的使用和模拟实现 string的使用 求字符串长度 字符串的比较 string函数的模拟实现…

基于PSO粒子群算法的三角形采集堆轨道优化matlab仿真

目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.本算法原理 5.完整程序 1.程序功能描述 假设一个收集轨道&#xff0c;上面有5个采集堆&#xff0c;这5个采集堆分别被看作一个4*20的矩阵&#xff08;下面只有4*10&#xff09;&#xff0c;每个模块&…

Java多线程学习的关键要点和常见案例总结

文章目录 Java多线程学习的关键要点&#xff1a;案例示例&#xff1a; Java多线程编程还包括更多的高级特性和实用技巧高级主题&#xff1a;实用案例&#xff1a;线程池的高级用法和配置&#xff1a;线程安全的最佳实践&#xff1a; Java多线程学习的关键要点和常见案例总结如下…

防患未然,OceanBase巡检工具应用实践——《OceanBase诊断系列》之五

1. OceanBase为什么要做巡检功能 尽管OceanBase拥有很好的MySQL兼容性&#xff0c;但在长期的生产环境中&#xff0c;部署不符合标准规范、硬件支持异常&#xff0c;或配置项错误等问题&#xff0c;这些短期不会出现的问题&#xff0c;仍会对数据库集群构成潜在的巨大风险。为…

血泪教训双非计算机考研避坑指南

记住&#xff0c;考研不是要考多少分&#xff0c;而是要上岸&#xff0c;上岸&#xff0c;上岸&#xff01;&#xff01;&#x1f621; 一、坏、渣、难、险&#xff0c;一律打咩 坏: 歧视本科院校‼️ 这个就不用多说了&#xff0c;你明明付出了大于等于别人的努力&#xff0c;…

猫头虎分享已解决Bug || 数据中心断电:PowerLoss, DataCenterBlackout

博主猫头虎的技术世界 &#x1f31f; 欢迎来到猫头虎的博客 — 探索技术的无限可能&#xff01; 专栏链接&#xff1a; &#x1f517; 精选专栏&#xff1a; 《面试题大全》 — 面试准备的宝典&#xff01;《IDEA开发秘籍》 — 提升你的IDEA技能&#xff01;《100天精通鸿蒙》 …

运维打工人,兼职跑外卖的第二个周末

北京&#xff0c;晴&#xff0c;西南风1级。 前序 今天天气还行&#xff0c;赶紧起来&#xff0c;把衣服都洗洗&#xff0c;准备准备&#xff0c;去田老师吃饭早饭了。 一个甜饼、一个茶叶蛋、3元自助粥花费7.5。5个5挺吉利的。 跑外卖的意义 两个字减肥&#xff0c;记录刚入…

JavaWeb03-HTTP协议,Tomcat,Servlet

目录 一、HTTP协议 1.概述 2.特点 3.请求数据格式 &#xff08;1&#xff09;请求行 &#xff08;2&#xff09;请求头 &#xff08;3&#xff09;请求体 &#xff08;4&#xff09;常见请求头 &#xff08;5&#xff09;GET和POST请求区别 4.响应数据格式 &#xf…

分销商城微信小程序:用户粘性增强,促进复购率提升

在数字化浪潮的推动下&#xff0c;微信小程序作为一种轻便、高效的移动应用形式&#xff0c;正成为越来越多企业开展电商业务的重要平台。而分销商城微信小程序的出现&#xff0c;更是为企业带来了前所未有的机遇。通过分销商城微信小程序&#xff0c;企业不仅能够拓宽销售渠道…

PyQt5实现远程更新exe可执行文件

PyQt5实现远程下载更新exe可执行文件 1、实现流程 1、获取远程http地址 2、获取需要更新的exe文件 3、点击更新 4、把exe强关闭 5、下载文件 6、更新2、效果图 3、示例代码 conf.ini配置文件: {"http_address_edit_value": "http://xxx.com/xxx/xxx.exe&qu…

数据结构从入门到精通——队列

队列 前言一、队列1.1队列的概念及结构1.2队列的实现1.3队列的实现1.4扩展 二、队列面试题三、队列的具体实现代码Queue.hQueue.ctest.c队列的初始化队列的销毁入队列出队列返回队头元素返回队尾元素检测队列是否为空检测元素个数 前言 队列是一种特殊的线性数据结构&#xff…

【蓝桥杯-单片机】基础模块LED和按键

文章目录 【蓝桥杯-单片机】Led、按键等基础模块01 前置准备&#xff08;1&#xff09;新建工程&#xff08;4&#xff09;编写程序 02 基础模块&#xff1a;LED&#xff08;0&#xff09;LED原理图&#xff08;1&#xff09;对P1整体赋值&#xff0c;控制所有的LED灯&#xff…

three.js如何实现简易3D机房?(一)基础准备-下

接上一篇&#xff1a; three.js如何实现简易3D机房&#xff1f;&#xff08;一&#xff09;基础准备-上&#xff1a;http://t.csdnimg.cn/MCrFZ 目录 四、按需引入 五、导入模型 四、按需引入 index.vue文件中 <template><div class"three-area">&l…

算法第二十五天-寻找排序数组中的最小值

寻找排序数组中的最小值 题目要求 解题思路 二分法 代码 class Solution:def findMin(self, nums: List[int]) -> int:low, high 0, len(nums) - 1while low < high:pivot low (high - low) // 2if nums[pivot] < nums[high]:high pivot else:low pivot 1re…

计算两帧雷达数据之间的变换矩阵

文章目录 package.xmlCMakeLists.txtpoint_cloud_registration.cc运行结果 package.xml <?xml version"1.0"?> <package format"2"><name>point_cloud_registration</name><version>0.0.0</version><descriptio…

基于STC12C5A60S2系列1T 8051单片机的TM1638键盘数码管模块的按键扫描、数码管显示按键值、显示按键LED应用

基于STC12C5A60S2系列1T 8051单片机的TM1638键盘数码管模块的按键扫描、数码管显示按键值、显示按键LED应用 STC12C5A60S2系列1T 8051单片机管脚图STC12C5A60S2系列1T 8051单片机I/O口各种不同工作模式及配置STC12C5A60S2系列1T 8051单片机I/O口各种不同工作模式介绍TM1638键盘…