Flink DataStream API 介绍

一、介绍

官网

DataStream API 得名于特殊的 DataStream 类,该类用于表示 Flink 程序中的数据集合。你可以认为 它们是可以包含重复项的不可变数据集合。这些数据可以是有界(有限)的,也可以是无界(无限)的,但用于处理它们的API是相同的。

二、基础算子

1、Map

Map算子:输入一个元素同时输出一个元素,这里的写法和Java类似,可以使用糖化语法或者实现Function接口

package com.xx.common.study.api.base;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @author xiaxing* @describe Map算子,输入一个元素同时输出一个元素,这里的写法和Java类似,可以使用糖化语法或者实现Function接口* @since 2024/5/17 14:27*/
public class DataStreamMapApiDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Integer> sourceStream = env.fromElements(1, 2, 3);// 糖化语法SingleOutputStreamOperator<Integer> multiStream = sourceStream.map(e -> e * 2);multiStream.print("数据乘2");// 实现Function接口SingleOutputStreamOperator<Integer> addStream = sourceStream.map(new MapFunction<Integer, Integer>() {@Overridepublic Integer map(Integer value) throws Exception {return value + 2;}});addStream.print("数据加2");env.execute();}
}

2、FlatMap

FlatMap算子:输入一个元素同时产生零个、一个或多个元素

package com.xx.common.study.api.base;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Arrays;
import java.util.List;/*** @author xiaxing* @describe FlatMap算子,输入一个元素同时产生零个、一个或多个元素* @since 2024/5/17 14:27*/
public class DataStreamFlatMapApiDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> sourceStream = env.fromElements("1,2,3");// 对source进行加工处理sourceStream.flatMap((FlatMapFunction<String, List<String>>) (value, out) -> {String[] split = value.split(",");out.collect(Arrays.asList(split));}).print();// 错误写法,和Java写法不用,无法使用这种糖化语法
//        sourceStream.flatMap((k, v) -> {
//            String[] split = k.split(",");
//            v.collect(split);
//        }).print();env.execute();}
}

3、Filter

Filter算子:为每个元素执行一个布尔 function,并保留那些 function 输出值为 true 的元素

package com.xx.common.study.api.base;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @author xiaxing* @describe Filter算子,为每个元素执行一个布尔 function,并保留那些 function 输出值为 true 的元素* @since 2024/5/17 14:27*/
public class DataStreamFilterApiDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Integer> sourceStream = env.fromElements(1, 2, 3);// 保留整数sourceStream.filter(e -> (e % 2) == 0).print("糖化语法保留整数");sourceStream.filter(new FilterFunction<Integer>() {@Overridepublic boolean filter(Integer value) throws Exception {return value % 2 == 0;}}).print("实现Function保留整数");env.execute();}
}

4、KeyBy

KeyBy算子:在逻辑上将流划分为不相交的分区。具有相同 key 的记录都分配到同一个分区。在内部, keyBy() 是通过哈希分区实现的

package com.xx.common.study.api.base;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @author xiaxing* @describe KeyBy算子,在逻辑上将流划分为不相交的分区。具有相同 key 的记录都分配到同一个分区。在内部, keyBy() 是通过哈希分区实现的* @since 2024/5/17 14:27*/
public class DataStreamKeyByApiDemo {@Data@AllArgsConstructor@NoArgsConstructorpublic static class keyByDemo {private Integer id;private Integer count;}public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<keyByDemo> sourceStream = env.fromElements(new keyByDemo(1, 1),new keyByDemo(2, 2),new keyByDemo(3, 3),new keyByDemo(1, 4));KeyedStream<keyByDemo, Integer> keyByStream = sourceStream.keyBy(keyByDemo::getId);keyByStream.print("按照key分组");// 使用key分组之后可以使用一些常用的聚合算子// positionToSum:可以用于Tuple类型数据传递索引位置,field:传递字段名称keyByStream.sum("count").print();env.execute();}
}

5、Reduce

Reduce算子:在相同 key 的数据流上“滚动”执行 reduce。将当前元素与最后一次 reduce 得到的值组合然后输出新值

package com.xx.common.study.api.base;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @author xiaxing* @describe Reduce算子,在相同 key 的数据流上“滚动”执行 reduce。将当前元素与最后一次 reduce 得到的值组合然后输出新值* @since 2024/5/17 14:27*/
public class DataStreamReduceApiDemo {@Data@AllArgsConstructor@NoArgsConstructorpublic static class reduceByDemo {private String id;private Integer count;}public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<reduceByDemo> sourceStream = env.fromElements(new reduceByDemo("1", 1),new reduceByDemo("2", 2),new reduceByDemo("3", 3),new reduceByDemo("1", 4));sourceStream.keyBy(reduceByDemo::getId).reduce(new ReduceFunction<reduceByDemo>() {@Overridepublic reduceByDemo reduce(reduceByDemo value1, reduceByDemo value2) throws Exception {value1.setCount(value1.getCount() + value2.getCount());return value1;}}).print();env.execute();}
}

三、窗口

官网地址

3.1、概念

窗口(Window)是处理无界流的关键所在。窗口可以将数据流装入大小有限的“桶”中,再对每个“桶”加以处理。

Flink 中的时间有三种类型:

  • Event Time:是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink 通过时间戳分配器访问事件时间戳。

  • Ingestion Time:是数据进入 Flink 的时间。

  • Processing Time:是每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是 Processing Time。

3.2、语法

Keyed Windows

stream.keyBy(...)               <-  仅 keyed 窗口需要.window(...)              <-  必填项:"assigner"[.trigger(...)]            <-  可选项:"trigger" (省略则使用默认 trigger)[.evictor(...)]            <-  可选项:"evictor" (省略则不使用 evictor)[.allowedLateness(...)]    <-  可选项:"lateness" (省略则为 0)[.sideOutputLateData(...)] <-  可选项:"output tag" (省略则不对迟到数据使用 side output).reduce/aggregate/apply()      <-  必填项:"function"[.getSideOutput(...)]      <-  可选项:"output tag"

Non-Keyed Windows

stream.windowAll(...)           <-  必填项:"assigner"[.trigger(...)]            <-  可选项:"trigger" (else default trigger)[.evictor(...)]            <-  可选项:"evictor" (else no evictor)[.allowedLateness(...)]    <-  可选项:"lateness" (else zero)[.sideOutputLateData(...)] <-  可选项:"output tag" (else no side output for late data).reduce/aggregate/apply()      <-  必填项:"function"[.getSideOutput(...)]      <-  可选项:"output tag"

3.3、Window Assigners

Window Assigners为抽象类,Flink默认已经实现了4种窗口

3.3.1、滚动窗口(Tumbling Windows)

滚动窗口的 assigner 分发元素到指定大小的窗口。滚动窗口的大小是固定的,且各自范围之间不重叠。 比如说,如果你指定了滚动窗口的大小为 5 分钟,那么每 5 分钟就会有一个窗口被计算,且一个新的窗口被创建(如下图所示)。

在这里插入图片描述

DataStream<T> input = ...;// 滚动 event-time 窗口
input.keyBy(<key selector>).window(TumblingEventTimeWindows.of(Time.seconds(5))).<windowed transformation>(<window function>);// 滚动 processing-time 窗口
input.keyBy(<key selector>).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).<windowed transformation>(<window function>);// 长度为一天的滚动 event-time 窗口, 偏移量为 -8 小时。
input.keyBy(<key selector>).window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))).<windowed transformation>(<window function>);

3.3.2、滑动窗口(Sliding Windows)

与滚动窗口类似,滑动窗口的 assigner 分发元素到指定大小的窗口,窗口大小通过 window size 参数设置。 滑动窗口需要一个额外的滑动距离(window slide)参数来控制生成新窗口的频率。 因此,如果 slide 小于窗口大小,滑动窗口可以允许窗口重叠。这种情况下,一个元素可能会被分发到多个窗口。

比如说,你设置了大小为 10 分钟,滑动距离 5 分钟的窗口,你会在每 5 分钟得到一个新的窗口, 里面包含之前 10 分钟到达的数据(如下图所示)。

在这里插入图片描述

DataStream<T> input = ...;// 滑动 event-time 窗口
input.keyBy(<key selector>).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).<windowed transformation>(<window function>);// 滑动 processing-time 窗口
input.keyBy(<key selector>).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).<windowed transformation>(<window function>);// 滑动 processing-time 窗口,偏移量为 -8 小时
input.keyBy(<key selector>).window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))).<windowed transformation>(<window function>);

3.3.3、会话窗口(Session Windows)

会话窗口的 assigner 会把数据按活跃的会话分组。 与滚动窗口和滑动窗口不同,会话窗口不会相互重叠,且没有固定的开始或结束时间。 会话窗口在一段时间没有收到数据之后会关闭,即在一段不活跃的间隔之后。 会话窗口的 assigner 可以设置固定的会话间隔(session gap)或 用 session gap extractor 函数来动态地定义多长时间算作不活跃。 当超出了不活跃的时间段,当前的会话就会关闭,并且将接下来的数据分发到新的会话窗口。

在这里插入图片描述

DataStream<T> input = ...;// 设置了固定间隔的 event-time 会话窗口
input.keyBy(<key selector>).window(EventTimeSessionWindows.withGap(Time.minutes(10))).<windowed transformation>(<window function>);// 设置了动态间隔的 event-time 会话窗口
input.keyBy(<key selector>).window(EventTimeSessionWindows.withDynamicGap((element) -> {// 决定并返回会话间隔})).<windowed transformation>(<window function>);// 设置了固定间隔的 processing-time session 窗口
input.keyBy(<key selector>).window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))).<windowed transformation>(<window function>);// 设置了动态间隔的 processing-time 会话窗口
input.keyBy(<key selector>).window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {// 决定并返回会话间隔})).<windowed transformation>(<window function>);

3.3.4、全局窗口(Global Windows)

全局窗口的 assigner 将拥有相同 key 的所有数据分发到一个全局窗口。 这样的窗口模式仅在你指定了自定义的 trigger 时有用。 否则,计算不会发生,因为全局窗口没有天然的终点去触发其中积累的数据。

在这里插入图片描述

DataStream<T> input = ...;input.keyBy(<key selector>).window(GlobalWindows.create()).<windowed transformation>(<window function>);

3.4、窗口函数

定义了 window assigner 之后,我们需要指定当窗口触发之后,我们如何计算每个窗口中的数据

窗口函数有三种:ReduceFunction、AggregateFunction 或 ProcessWindowFunction。
前两者执行起来更高效,因为 Flink 可以在每条数据到达窗口后 进行增量聚合(incrementally aggregate)。 而 ProcessWindowFunction 会得到能够遍历当前窗口内所有数据的 Iterable,以及关于这个窗口的 meta-information

3.4.1、ReduceFunction

ReduceFunction 指定两条输入数据如何合并起来产生一条输出数据,输入和输出数据的类型必须相同

package com.xx.common.study.api.windows;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;/*** @author xiaxing* @describe 窗口函数-reduce* @since 2024/5/17 14:27*/
public class DataStreamWindowsReduceApiDemo {@Data@NoArgsConstructor@AllArgsConstructorpublic static class TumblingWindows {private Integer id;private Integer count;}public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> socketStream = env.socketTextStream("localhost", 7777);socketStream.map(new MapFunction<String, TumblingWindows>() {@Overridepublic TumblingWindows map(String value) throws Exception {String[] split = value.split(",");return new TumblingWindows(Integer.valueOf(split[0]), Integer.valueOf(split[1]));}}).keyBy(new KeySelector<TumblingWindows, Integer>() {@Overridepublic Integer getKey(TumblingWindows value) throws Exception {return value.getId();}}).window(TumblingProcessingTimeWindows.of(Time.seconds(5L))).reduce(new ReduceFunction<TumblingWindows>() {@Overridepublic TumblingWindows reduce(TumblingWindows value1, TumblingWindows value2) throws Exception {return new TumblingWindows(value1.getId(), value1.getCount() + value2.getCount());}}).print();env.execute();}
}

3.4.2、ReduceFunction糖化语法

使用Lambda糖化语法对代码进行了简化

package com.xx.common.study.api.windows;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;/*** @author xiaxing* @describe 窗口函数-reduce-糖化语法* @since 2024/5/17 14:27*/
public class DataStreamWindowsReduceLambdaApiDemo {@Data@NoArgsConstructor@AllArgsConstructorpublic static class TumblingWindows {private Integer id;private Integer count;}public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> socketStream = env.socketTextStream("localhost", 7777);socketStream.map(value -> {String[] split = value.split(",");return new TumblingWindows(Integer.valueOf(split[0]), Integer.valueOf(split[1]));}).keyBy(TumblingWindows::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(5L))).reduce((value1, value2) -> new TumblingWindows(value1.getId(), value1.getCount() + value2.getCount())).print();env.execute();}
}

3.4.3、AggregateFunction

ReduceFunction 是 AggregateFunction 的特殊情况。 AggregateFunction 接收三个类型:输入数据的类型(IN)、累加器的类型(ACC)和输出数据的类型(OUT)。 输入数据的类型是输入流的元素类型,AggregateFunction 接口有如下几个方法: 把每一条元素加进累加器、创建初始累加器、合并两个累加器、从累加器中提取输出(OUT 类型)

package com.xx.common.study.api.windows;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;import java.util.Optional;/*** @author xiaxing* @describe 窗口函数-Aggregate* @since 2024/5/17 14:27*/
public class DataStreamWindowsAggregateApiDemo {@Data@NoArgsConstructor@AllArgsConstructorpublic static class TumblingWindows {private Integer id;private Integer count;}public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> socketStream = env.socketTextStream("localhost", 7777);// 求和AggregateFunction<TumblingWindows, TumblingWindows, TumblingWindows> aggregateFunction = new AggregateFunction<TumblingWindows, TumblingWindows, TumblingWindows>() {@Overridepublic TumblingWindows createAccumulator() {// 创建累加器,并将其初始化为默认值return new TumblingWindows();}@Overridepublic TumblingWindows add(TumblingWindows value, TumblingWindows accumulator) {// 将输入的元素添加到累加器,返回更新后的累加器Integer count1 = Optional.of(value.getCount()).orElse(0);Integer count2 = Optional.ofNullable(accumulator.getCount()).orElse(0);return new TumblingWindows(value.getId(), count1 + count2);}@Overridepublic TumblingWindows getResult(TumblingWindows accumulator) {// 从累加器中提取操作的结果return accumulator;}@Overridepublic TumblingWindows merge(TumblingWindows a, TumblingWindows b) {// 将两个累加器合并为一个新的累加器return new TumblingWindows(a.getId(), a.getCount() + b.getCount());}};socketStream.map(value -> {String[] split = value.split(",");return new TumblingWindows(Integer.valueOf(split[0]), Integer.valueOf(split[1]));}).keyBy(TumblingWindows::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(5L))).aggregate(aggregateFunction).print();env.execute();}
}

3.4.4、ProcessWindowFunction

ProcessWindowFunction 可以与 ReduceFunction 或 AggregateFunction 搭配使用, 使其能够在数据到达窗口的时候进行增量聚合。当窗口关闭时,ProcessWindowFunction 将会得到聚合的结果。 这样它就可以增量聚合窗口的元素并且从 ProcessWindowFunction` 中获得窗口的元数据。

package com.xx.common.study.api.windows;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;/*** @author xiaxing* @describe 窗口函数-reduce-process* @since 2024/5/17 14:27*/
public class DataStreamWindowsReduceProcessApiDemo {@Data@NoArgsConstructor@AllArgsConstructorpublic static class TumblingWindows {private Integer id;private Integer count;}public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> socketStream = env.socketTextStream("localhost", 7777);socketStream.map(value -> {String[] split = value.split(",");return new TumblingWindows(Integer.valueOf(split[0]), Integer.valueOf(split[1]));}).keyBy(TumblingWindows::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(5L))).reduce(new MyReduceFunction(), new MyProcessWindowsFunction()).print();env.execute();}private static class MyReduceFunction implements ReduceFunction<TumblingWindows> {@Overridepublic TumblingWindows reduce(TumblingWindows value1, TumblingWindows value2) throws Exception {return new TumblingWindows(value1.getId(), value1.getCount() + value2.getCount());}}private static class MyProcessWindowsFunction extends ProcessWindowFunction<TumblingWindows, TumblingWindows, Integer, TimeWindow> {@Overridepublic void process(Integer integer, ProcessWindowFunction<TumblingWindows, TumblingWindows, Integer, TimeWindow>.Context context, Iterable<TumblingWindows> elements, Collector<TumblingWindows> out) throws Exception {elements.forEach(e -> {Integer count = e.getCount();// 当count > 10时才数据元素if (count > 10) {out.collect(e);}});}}
}

3.5、窗口算子

除了窗口函数外,Flink还提供了一些窗口算子,其用法类似于基础算子,这部分内容和上面的窗口存在部分重复,上面主要想说明Flink中涉及哪些类型的窗口,这块演示一下一些窗口算子的基本用法,在后面的Process Function会更加深入的使用窗口函数

3.5.1、Window

可以在已经分区的 KeyedStreams 上定义 Window。Window 根据某些特征(例如,最近 5 秒内到达的数据)对每个 key Stream 中的数据进行分组

package com.xx.common.study.api.windows.base;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;/*** @author xiaxing* @describe Window算子, 以在已经分区的 KeyedStreams 上定义 Window。Window 根据某些特征(例如,最近 5 秒内到达的数据)对每个 key Stream 中的数据进行分组* @since 2024/5/17 14:27*/
public class DataStreamWindowsApiDemo {@Data@AllArgsConstructor@NoArgsConstructorpublic static class WindowByDemo {private Integer id;private Integer count;}public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> sourceStream = env.socketTextStream("localhost", 7777);// 将5秒内的数据按照key分组加起来sourceStream.map((value) -> {String[] split = value.split(",");return new WindowByDemo(Integer.valueOf(split[0]), Integer.valueOf(split[1]));}).keyBy(WindowByDemo::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(5L))).sum("count").print();env.execute();}
}

3.5.1、WindowAll

WindowAll算子,不需要调用keyBy算子

package com.xx.common.study.api.windows.base;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;/*** @author xiaxing* @describe WindowAll算子,不需要调用keyBy算子* @since 2024/5/17 14:27*/
public class DataStreamWindowsAllApiDemo {@Data@AllArgsConstructor@NoArgsConstructorpublic static class WindowAllDemo {private Integer id;private Integer count;}public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> sourceStream = env.socketTextStream("localhost", 7777);// 将5秒内的数据加起来sourceStream.map((value) -> {String[] split = value.split(",");return new WindowAllDemo(Integer.valueOf(split[0]), Integer.valueOf(split[1]));}).windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5L))).sum("count").print();env.execute();}
}

3.5.1、Window Apply

3.5.1、WindowReduce

四、Join

五、Process Function

六、旁路输出

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/333043.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

快速排序算法备考

快排模板 快速排序(快排) (C语言实现)_c语言快速排序_Brant_zero2022的博客-CSDN博客 快排使用递归来实现 关键思想:划分 //划分 int partion(int A[],int L,int R){int midA[L];while(L<R){//每一次划分:左边元素<枢轴元素<右边元素//R往前找&#xff0c;直到找到一…

IO系列(八) -浅析NIO工作原理

一、简介 现在使用 NIO 的场景越来越多&#xff0c;很多网上的技术框架或多或少的使用 NIO 技术&#xff0c;譬如 Tomcat、Jetty、Netty&#xff0c;学习和掌握 NIO 技术已经不是一个 Java 攻城狮的加分技能&#xff0c;而是一个必备技能。 那什么是 NIO 呢&#xff1f; NIO…

不拍视频,不直播怎么在视频号卖货赚钱?开一个它就好了!

大家好&#xff0c;我是电商糖果 视频号这两年看着抖音卖货的热度越来越高&#xff0c;也想挤进电商圈。 于是它模仿抖音推出了自己的电商平台——视频号小店。 只要商家入驻视频号小店&#xff0c;就可以在视频号售卖商品。 具体怎么操作呢&#xff0c;需要拍视频&#xf…

Redis实践—全国地址信息缓存

一、背景 在涉及全国地址的应用中&#xff0c;地址信息通常被频繁地查询和使用&#xff0c;例如电商平台、物流系统等。为了提高系统性能和减少对数据库的访问压力&#xff0c;可以使用缓存来存储常用的地址信息&#xff0c;其中 Redis 是一个非常流行的选择。 本次在一个企业入…

就业信息|基于SprinBoot+vue的就业信息管理系统(源码+数据库+文档)

就业信息管理系统 目录 基于SprinBootvue的就业信息管理系统 一、前言 二、系统设计 三、系统功能设计 1前台功能模块 2后台功能模块 4.2.1管理员功能 4.2.2学生功能 4.2.3企业功能 4.2.4导师功能 四、数据库设计 五、核心代码 六、论文参考 七、最新计算机毕设…

[力扣]——70.爬楼梯

题目描述&#xff1a; 假设你正在爬楼梯。需要 n 阶你才能到达楼顶。 每次你可以爬 1 或 2 个台阶。你有多少种不同的方法可以爬到楼顶呢&#xff1f; 本题较为简单&#xff0c;主要用到递归思想 int fun(int n,int memo[]) {if(memo[n]!-1) //如果备忘录中已经有记录了…

学 Go 具体能干什么?

学习 Go (Golang) 后&#xff0c;你可以从事许多不同的工作和项目&#xff0c;Go 语言以其高性能、并发处理和简洁的语法而闻名&#xff0c;特别适合以下几个领域&#xff1a; 1. 后端开发 Go 在后端开发中非常流行&#xff0c;特别适合构建高性能的 Web 服务和 API。 Web 框…

安卓获取内部存储信息

目录 前言获取存储容量 前言 原生系统设置里的存储容量到底是怎么计算的&#xff0c;跟踪源码&#xff0c;涉及到VolumeInfo、StorageManagerVolumeProvider、PrivateStorageInfo、StorageStatsManager......等等&#xff0c;java上层没有办法使用简单的api获取到吗&#xff1f…

【全开源】分类记账小程序系统源码(ThinkPHP+FastAdmin+UniApp)

基于ThinkPHPFastAdminUniAppvk-uView-uiVue3.0开发的一款支持多人协作的记账本小程序&#xff0c;可用于家庭&#xff0c;团队&#xff0c;组织以及个人的日常收支情况记录&#xff0c;支持周月年度统计。 &#xff1a;智能管理您的财务生活 一、引言&#xff1a;财务智能化…

多线程编程(12)之HashMap1.8源码分析

之前已经分析过了一版1.7版本的HashMap&#xff0c;这里主要是来分析一下1.8HashMap源码。 一、HashMap数据结构 HashMap 是一个利用散列表&#xff08;哈希表&#xff09;原理来存储元素的集合&#xff0c;是根据Key value而直接进行访问的数 据结构。 在 JDK1.7 中&#xff…

Text Control 控件 中 Service Pack 3:MailMerge 支持 SVG 图像

图像的合并方式与报告模板中的合并字段相同。占位符在设计时添加&#xff0c;并与文件、数据库或内存中的数据合并。可以将图像对象添加到具有指定名称的模板中。数据列必须包含字节数组形式的二进制图像数据、System.Drawing.Image 类型的对象、文件名、十六进制或 Base64 编码…

产品经理-需求收集(二)

1. 什么是需求 指在一定的时期中&#xff0c;一定场景中&#xff0c;无论是心理上还是生理上的&#xff0c;用户有着某种“需要”&#xff0c;这种“需要”用户自己不一定知道的&#xff0c;有了这种“需要”后用户就有做某件事情的动机并促使达到其某种目的&#xff0c;这也就…

Python 开心消消乐

&#x1f49d;&#x1f49d;&#x1f49d;欢迎莅临我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:「stormsha的主页」…

记一次绕过宝塔防火墙的BC站渗透

0x00 信息收集 由于主站存在云waf 一测就封 且初步测试不存在能用得上的洞 所以转战分站 希望能通过分站获得有价值的信息 这是一个查询代理帐号的站 url输入admin 自动跳转至后台 看这个参数 猜测可能是thinkCMF 0x01 getshell thinkcmf正好有一个RCE 可以尝试一下 ?afetc…

01.爬虫---初识网络爬虫

01.初识网络爬虫 1.什么是网络爬虫2.网络爬虫的类型3.网络爬虫的工作原理4.网络爬虫的应用场景5.网络爬虫的挑战与应对策略6.爬虫的合法性总结 1.什么是网络爬虫 网络爬虫&#xff0c;亦称网络蜘蛛或网络机器人&#xff0c;是一种能够自动地、系统地浏览和收集互联网上信息的程…

SpringValidation

一、概述&#xff1a; ​ JSR 303中提出了Bean Validation&#xff0c;表示JavaBean的校验&#xff0c;Hibernate Validation是其具体实现&#xff0c;并对其进行了一些扩展&#xff0c;添加了一些实用的自定义校验注解。 ​ Spring中集成了这些内容&#xff0c;你可以在Spri…

一文教你如何调用Ascend C算子

Ascend C是CANN针对算子开发场景推出的编程语言&#xff0c;原生支持C和C标准规范&#xff0c;兼具开发效率和运行性能。基于Ascend C编写的算子程序&#xff0c;通过编译器编译和运行时调度&#xff0c;运行在昇腾AI处理器上。使用Ascend C&#xff0c;开发者可以基于昇腾AI硬…

AC/DC电源模块:提供高质量的电力转换解决方案

BOSHIDA AC/DC电源模块&#xff1a;提供高质量的电力转换解决方案 AC/DC电源模块是一种电力转换器件&#xff0c;可以将交流电转换为直流电。它通常用于各种电子设备和系统中&#xff0c;提供高质量的电力转换解决方案。 AC/DC电源模块具有许多优点。首先&#xff0c;它能够提…

【学习笔记】计算机组成原理(七)

指令系统 文章目录 指令系统7.1 机器指令7.1.1 指令的一般格式7.1.2 指令字长 7.2 操作数类型和操作类型7.2.1 操作数类型7.2.2 数据在存储器中的存放方式7.2.3 操作类型 7.3 寻址方式7.3.1 指令寻址7.3.1.1 顺序寻址7.3.1.2 跳跃寻址 7.3.2 数据寻址7.3.2.1 立即寻址7.3.2.2 直…

探秘SpringBoot默认线程池:了解其运行原理与工作方式(@Async和ThreadPoolTaskExecutor)

文章目录 文章导图Spring封装的几种线程池SpringBoot默认线程池TaskExecutionAutoConfiguration&#xff08;SpringBoot 2.1后&#xff09;主要作用优势使用场景如果没有它 2.1版本以后如何查看参数方式一&#xff1a;通过Async注解--采用ThreadPoolTaskExecutordetermineAsync…