Flink之状态管理

Flink状态管理

  • 状态
    • 概述
    • 状态分类
  • 键控、按键分区状态
    • 概述
    • 值状态 ValueState
    • 列表状态 ListState
    • Map状态 MapState
    • 归约状态 ReducingState
    • 聚合状态 Aggregating State
  • 算子状态
    • 概述
    • 列表状态 ListState
    • 联合列表状态 UnionListState
    • 广播状态 Broadcast State
  • 状态有效期 (TTL)
    • 概述
    • StateTtlConfig 配置对象
    • 参数说明
    • 清理
    • 使用示例
  • 状态后端 State Backend
    • 概述
    • 可用状态后端
    • 状态后端的配置

状态

概述

在流处理任务中,数据会以连续的流的形式输入到Flink中,而状态计算允许我们跟踪和处理这些输入数据的状态信息。状态可以是任何需要记录和使用的数据,例如聚合计数、累积结果、窗口中的中间状态等。

Flink中的状态管理是指在流处理任务中对数据的状态进行有效管理和维护的过程。状态管理是非常重要的,因为它允许我们在流式处理中维护和操作数据的状态信息,以实现复杂的计算逻辑和应用需求。

图片来源于网络,如有侵权,联系删除

状态分类

在Flink中,Flink状态有两种:系统状态Managed State和原始状态Raw State。通常使用系统状态,而原始状态则需要自定义实现。

系统状态根据数据集是否按照某一个Key进行分区,将状态分为算子状态Operator State和按键分区状态Keyed State。

1.系统状态

由Flink管理的全局状态,可以在整个应用程序中共享。系统状态与算子或键无关,可以被整个应用程序中的所有算子访问和更新。

2.原始状态

原始状态是一种低级别的状态表示形式,它提供了一种灵活的方式来定义和管理状态。它允许开发人员自定义状态的存储和访问方式,以满足特定的需求。

3.算子状态

用于在算子之间维护中间结果、聚合状态等。它与具体的算子实例绑定,与其他算子实例的状态相互独立。算子状态是分布式的,可以在故障恢复时进行检查点和状态恢复。

一个算子任务会按照并行度分为多个并行子任务执行,而不同的子任务会占据不同的任务槽task slot。由于不同的slot在计算资源上是物理隔离的,所以Flink能管理的状态在并行任务间是无法共享的,每个状态只能针对当前子任务的实例有效。

图片来源于网络,如有侵权,联系删除。
4.按键分区状态

与流的键相关联的状态,用于存储和管理与每个键相关的数据信息。按键分区状态能在Keyed Stream或Keyed ProcessFunction中使用。它会根据键将数据进行分区,保证相同键的数据会被同一个状态管理。

很多有状态的操作,如聚合、窗口都是要先做keyBy进行按键分区,之后任务所进行的所有计算都应该只针对当前key有效,所以状态也应该按照key彼此隔离。

(图片来源于网络,如有侵权,联系删除。)

键控、按键分区状态

概述

按键分区状态Keyed State是任务按照键key来访问和维护的状态。它就是以key为作用范围进行隔离。

注意:

使用按键分区状态必须基于Keyed Stream。没有进行keyBy分区的Data Stream,即使转换算子实现了对应的富函数类,也不能通过运行时上下文访问按键分区状态。

Keyed State在Flink中分为不同类型,具体支持的状态类型如下所示:

ValueState<T>:存储和访问单个值的状态,通常是一个单一的状态值。它可以用于存储中间结果、累加器等ListState<T>:存储和访问元素列表的状态,通常用于按键分区的列表操作MapState<UK,UV>:存储和访问键值对的状态,通常用于需要以键-值对形式存储和检索数据的情况AggregatingState<IN,OUT>:使用用户定义的聚合函数来逐个聚合元素的状态,通常用于对数据进行聚合操作,如计算平均值ReducingState<T>:使用用户定义的reduce函数来逐个聚合元素的状态,通常用于聚合操作,如求和

值状态 ValueState

值状态(ValueState)是Flink中的一种状态类型,用于存储和访问单个值。它可以用于在状态中保存和维护一个单一的值。

值状态通常用于在状态中存储一些需要随时间更新的值,例如计数器、累加器、最大/最小值等。

接口如下:

// T是泛型,表示状态的数据内容可以是任何具体的数据类型
public interface ValueState<T> extends State {// 获取当前状态的值T value() throws IOException;// 对状态进行更新,传入的参数value就是要覆写的状态值void update(T var1) throws IOException;
}

创建一个状态描述器StateDescriptor来提供状态的基本信息,状态描述器构造方法如下

public class ValueStateDescriptor<T> extends StateDescriptor<ValueState<T>, T> {// 需要传入状态的名称和类型public ValueStateDescriptor(String name, TypeInformation<T> typeInfo) {super(name, typeInfo, (Object)null);}	 
}

当前输入数据与上一条数据差值比较:

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket接收数据流SingleOutputStreamOperator<Tuple2<String, Integer>> streamSource = env.socketTextStream("IP", 8086)// 将输入数据转换为Tuple2.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {String[] split = value.split(",");return Tuple2.of(split[0], Integer.valueOf(split[1]));}})// 指定 watermark策略.assignTimestampsAndWatermarks(// 定义Watermark策略WatermarkStrategy.<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((value, ts) -> value.f1 * 1000L));// keyBy分区KeyedStream<Tuple2<String, Integer>, String> keyByStream = streamSource.keyBy(a -> a.f0);keyByStream.process(new MyKeyedProcessFunction()).print();env.execute();}public static class MyKeyedProcessFunction extends KeyedProcessFunction<String, Tuple2<String, Integer>, String> {// 定义状态ValueState<Integer> lastState;/*** 在open方法中,初始化状态** @param configuration* @throws Exception*/@Overridepublic void open(Configuration configuration) throws Exception {super.open(configuration);// 创建一个状态描述器StateDescriptor来提供状态的基本信息 传入状态的名称和存储类型lastState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("lastState", Types.INT));}@Overridepublic void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {// 取出上一条数据的水位值,注意Integer默认值是nullint lastValue = lastState.value() == null ? 0 : lastState.value();// 求水位线差值的绝对值>5的数据Integer currentValue = value.f1;if (Math.abs(currentValue - lastValue) > 5) {out.collect("窗口:" + value.f0 + " 数据:" + value + " 当前值" + currentValue + " 上一条数据值:" + lastValue + " 差值>5");}// 更新状态里的水位值lastState.update(currentValue);}}

输入测试数据:

>nc -lk 8086
key1,5
key1,7
key1,13
key1,20
key1,10

控制台输出结果:

窗口:key1 数据:(key1,13) 当前值13 上一条数据值:7 差值>5
窗口:key1 数据:(key1,20) 当前值20 上一条数据值:13 差值>5
窗口:key1 数据:(key1,10) 当前值10 上一条数据值:20 差值>5

列表状态 ListState

列表状态(ListState)是Flink中的一种状态类型,用于存储和访问元素列表。它可以用于在状态中保存和维护一组元素,并对列表中的元素进行添加、删除和访问操作。

列表状态通常用于需要在状态中保存多个元素的场景,例如累积计算、聚合操作或缓冲区管理等。

在ListState接口中同样有一个类型参数T,表示列表中数据的类型。

public interface ListState<T> extends MergingState<T, Iterable<T>> {void update(List<T> var1) throws Exception;void addAll(List<T> var1) throws Exception;
}

ListState也提供了一系列的方法来操作状态,使用方式与一般的List非常相似。

Iterable<T> get():获取当前的列表状态,返回的是一个可迭代类型Iterable<T>update(List<T> values):传入一个列表values,直接对状态进行覆盖add(T value):在状态列表中添加一个元素valueaddAll(List<T> values):向列表中添加多个元素,以列表values形式传入void clear(): 清空List状态 本组数据

ListState的状态描述器就叫作ListStateDescriptor,用法跟ValueStateDescriptor完全一致。

定义一个描述列表状态的描述符。描述符指定状态的名称和类型,状态描述器构造方法如下

public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, List<T>> {public ListStateDescriptor(String name, TypeInformation<T> elementTypeInfo) {super(name, new ListTypeInfo(elementTypeInfo), (Object)null);}
}    

取流中3个最大值,且排序

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket接收数据流SingleOutputStreamOperator<Tuple2<String, Integer>> streamSource = env.socketTextStream("IP", 8086)// 将输入数据转换为Tuple2.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {String[] split = value.split(",");return Tuple2.of(split[0], Integer.valueOf(split[1]));}})// 指定 watermark策略.assignTimestampsAndWatermarks(// 定义Watermark策略WatermarkStrategy.<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((value, ts) -> value.f1 * 1000L));// keyBy分区KeyedStream<Tuple2<String, Integer>, String> keyByStream = streamSource.keyBy(a -> a.f0);keyByStream.process(new MyKeyedProcessFunction()).print();env.execute();}public static class MyKeyedProcessFunction extends KeyedProcessFunction<String, Tuple2<String, Integer>, String> {// 定义状态ListState<Integer> listState;/*** 在open方法中,初始化状态** @param configuration* @throws Exception*/@Overridepublic void open(Configuration configuration) throws Exception {super.open(configuration);// 创建一个状态描述器StateDescriptor来提供状态的基本信息 传入状态的名称和存储类型listState = getRuntimeContext().getListState(new ListStateDescriptor<Integer>("listState", Types.INT));}@Overridepublic void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {// 来一条数据则存list状态里listState.add(value.f1);// 从list状态拿出来,得到一个IterableIterable<Integer> iterableList = listState.get();// 拷贝到List中List<Integer> list = new ArrayList<>();for (Integer val : iterableList) {list.add(val);}// 对List进行降序排序list.sort((o1, o2) -> o2 - o1);// list中的个数是连续变大的,一但超过3个就立即清理if (list.size() > 3) {// 元素清除,清除第4个list.remove(3);}out.collect("keyBy:" + value.f0 + " 当前数据:" + value + " 最大3个水位值:" + list.toString());// 更新list状态listState.update(list);}}
key1,1
key1,5
key1,7
key1,8
key1,9
keyBy:key1 当前数据:(key1,1) 最大3个水位值:[1]
keyBy:key1 当前数据:(key1,5) 最大3个水位值:[5, 1]
keyBy:key1 当前数据:(key1,7) 最大3个水位值:[7, 5, 1]
keyBy:key1 当前数据:(key1,8) 最大3个水位值:[8, 7, 5]
keyBy:key1 当前数据:(key1,9) 最大3个水位值:[9, 8, 7]

Map状态 MapState

Map 状态(MapState)是 Flink 中的一种状态类型,用于存储和访问键值对。它可以用于在状态中保存和维护一组键值对。

Map 状态通常用于需要根据键进行查找和更新的场景,例如缓存、索引、关联操作等。对应的是MapState<UK, UV>接口,有UK、UV两个泛型,分别表示保存的key和value的类型。

MapState提供了操作映射状态的方法,与Map的使用非常类似。另外,MapState也提供了获取整个映射相关信息的方法

public interface MapState<UK, UV> extends State {// 传入一个key作为参数,查询对应的value值UV get(UK var1) throws Exception;// 传入一个键值对,更新key对应的value值void put(UK var1, UV var2) throws Exception;// 将传入的映射map中所有的键值对,全部添加到映射状态中void putAll(Map<UK, UV> var1) throws Exception;// 将指定key对应的键值对删除void remove(UK var1) throws Exception;// 判断是否存在指定的key,返回一个boolean值boolean contains(UK var1) throws Exception;// 获取映射状态中所有的键值对Iterable<Map.Entry<UK, UV>> entries() throws Exception;// 获取映射状态中所有的键(key),返回一个可迭代Iterable类型Iterable<UK> keys() throws Exception;// 获取映射状态中所有的值(value),返回一个可迭代Iterable类型Iterable<UV> values() throws Exception;// 获取迭代器Iterator<Map.Entry<UK, UV>> iterator() throws Exception;// 判断映射是否为空,返回一个boolean值boolean isEmpty() throws Exception;
}

模拟统计 数字 出现频率计数

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket接收数据流SingleOutputStreamOperator<Tuple2<String, Integer>> streamSource = env.socketTextStream("IP", 8086)// 将输入数据转换为Tuple2.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {String[] split = value.split(",");return Tuple2.of(split[0], Integer.valueOf(split[1]));}})// 指定 watermark策略.assignTimestampsAndWatermarks(// 定义Watermark策略WatermarkStrategy.<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((value, ts) -> value.f1 * 1000L));// keyBy分区KeyedStream<Tuple2<String, Integer>, String> keyByStream = streamSource.keyBy(a -> a.f0);keyByStream.process(new MyKeyedProcessFunction()).print();env.execute();}public static class MyKeyedProcessFunction extends KeyedProcessFunction<String, Tuple2<String, Integer>, String> {// 定义状态MapState<Integer, Integer> mapState;/*** 在open方法中,初始化状态** @param configuration* @throws Exception*/@Overridepublic void open(Configuration configuration) throws Exception {super.open(configuration);// 创建一个状态描述器StateDescriptor来提供状态的基本信息 传入状态的名称和存储类型mapState = getRuntimeContext().getMapState(new MapStateDescriptor<Integer, Integer>("mapState", Types.INT, Types.INT));}/*** 模拟统计 数字 出现频率计数*/@Overridepublic void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {// 判断是否存在对应的keyInteger number = value.f1;if (mapState.contains(number)) {// 包含key,直接对value+1Integer count = mapState.get(number);mapState.put(number, ++count);} else {// 不包含key,初始化mapState.put(number, 1);}out.collect("keyBy:" + value.f0 + " 数字:" + number + " 出现次数:" + mapState.get(number));}}
nc -lk 8086
key1,1
key1,1
key1,2
key1,3
key1,2
key1,1
keyBy:key1 数字:1 出现次数:1
keyBy:key1 数字:1 出现次数:2
keyBy:key1 数字:2 出现次数:1
keyBy:key1 数字:3 出现次数:1
keyBy:key1 数字:2 出现次数:2
keyBy:key1 数字:1 出现次数:3

归约状态 ReducingState

归约状态(Reducing State)是 Flink 中一种特殊类型的状态,用于对输入流进行归约操作。归约操作将输入流中的元素逐个进行聚合,生成一个汇总的结果值。不同于普通的 Map、List 或 Value 状态,归约状态可以在接收到新的元素时,对当前的状态值进行相应的归约操作。

归约状态ReducingState类似于值状态,不过需要对添加进来的所有数据进行归约,将归约聚合之后的值作为状态保存下来。

使用接口ReducingState,调用的方法类似于ListState,只不过它保存的只是一个聚合值,调用.add()方法时,不是在状态列表里添加元素,而是直接把新数据和之前的状态进行归约,并用得到的结果更新状态。

// 对Reducing状态,获取结果
OUT get() throws Exception;
// 对Reducing状态,添加数据
void add(IN var1) throws Exception;
// 对Reducing状态,清空数据
void clear();

创建一个状态描述器StateDescriptor来提供状态的基本信息,状态描述器构造方法如下

public class ReducingStateDescriptor<T> extends StateDescriptor<ReducingState<T>, T> {private final ReduceFunction<T> reduceFunction;public ReducingStateDescriptor(String name, ReduceFunction<T> reduceFunction, TypeInformation<T> typeInfo) {super(name, typeInfo, (Object)null);this.reduceFunction = (ReduceFunction)Preconditions.checkNotNull(reduceFunction);}
}

使用归约状态来计算输入流中的累计和

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket接收数据流SingleOutputStreamOperator<Tuple2<String, Integer>> streamSource = env.socketTextStream("IP", 8086)// 将输入数据转换为Tuple2.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {String[] split = value.split(",");return Tuple2.of(split[0], Integer.valueOf(split[1]));}})// 指定 watermark策略.assignTimestampsAndWatermarks(// 定义Watermark策略WatermarkStrategy.<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((value, ts) -> value.f1 * 1000L));// keyBy分区KeyedStream<Tuple2<String, Integer>, String> keyByStream = streamSource.keyBy(a -> a.f0);keyByStream.process(new MyKeyedProcessFunction()).print();env.execute();}public static class MyKeyedProcessFunction extends KeyedProcessFunction<String, Tuple2<String, Integer>, String> {// 定义状态ReducingState<Integer> reducingState;/*** 在open方法中,初始化状态** @param configuration* @throws Exception*/@Overridepublic void open(Configuration configuration) throws Exception {super.open(configuration);// 创建一个状态描述器StateDescriptor来提供状态的基本信息 传入状态的名称、reduceFunction、存储类型reducingState = getRuntimeContext().getReducingState(new ReducingStateDescriptor<Integer>("reducingState", new ReduceFunction<Integer>() {@Overridepublic Integer reduce(Integer value1, Integer value2) throws Exception {return value1 + value2;}}, Types.INT));}@Overridepublic void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {// 来一条数据则添加到reducing状态里reducingState.add(value.f1);// 对本组的Reducing状态,获取结果Integer sum = reducingState.get();out.collect("keyBy:" + value.f0 + " 当前数据:" + value + " 水位值合计:" + sum);}}
key1,1
key1,2
key1,3
keyBy:key1 当前数据:(key1,1) 水位值合计:1
keyBy:key1 当前数据:(key1,2) 水位值合计:3
keyBy:key1 当前数据:(key1,3) 水位值合计:6

聚合状态 Aggregating State

聚合状态是Flink 中一种特殊类型的状态,用于对输入流进行聚合操作。聚合操作将输入流中的元素逐个进行聚合,并生成一个汇总的结果值。与归约状态不同,聚合状态可以在接收到新的元素时,根据自定义的聚合逻辑对当前的状态值进行增量聚合。

AggregatingState接口相关方法

// 对Reducing状态,获取结果
OUT get() throws Exception;
// 对Reducing状态,添加数据
void add(IN var1) throws Exception;
// 对Reducing状态,清空数据
void clear();

与归约状态不同的是,它的聚合逻辑是由在描述器中传入一个更加一般化的聚合函数AggregateFunction来定义的
。里面通过一个累加器Accumulator来表示状态,所以聚合的状态类型可以跟添加进来的数据类型完全不同,使用更加灵活。

public class AggregatingStateDescriptor<IN, ACC, OUT> extends StateDescriptor<AggregatingState<IN, OUT>, ACC> {private final AggregateFunction<IN, ACC, OUT> aggFunction;public AggregatingStateDescriptor(String name, AggregateFunction<IN, ACC, OUT> aggFunction, TypeInformation<ACC> stateType) {super(name, stateType, (Object)null);this.aggFunction = (AggregateFunction)Preconditions.checkNotNull(aggFunction);}
}

模拟统计 数字 出现频率计数

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket接收数据流SingleOutputStreamOperator<Tuple2<String, Integer>> streamSource = env.socketTextStream("IP", 8086)// 将输入数据转换为Tuple2.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {String[] split = value.split(",");return Tuple2.of(split[0], Integer.valueOf(split[1]));}})// 指定 watermark策略.assignTimestampsAndWatermarks(// 定义Watermark策略WatermarkStrategy.<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((value, ts) -> value.f1 * 1000L));// keyBy分区KeyedStream<Tuple2<String, Integer>, String> keyByStream = streamSource.keyBy(a -> a.f0);keyByStream.process(new MyKeyedProcessFunction()).print();env.execute();}/*** param1 键的类型* param2 输入类型* param3 输出元素的类型*/public static class MyKeyedProcessFunction extends KeyedProcessFunction<String, Tuple2<String, Integer>, String> {// 定义状态AggregatingState<Integer, HashMap<Integer, Integer>> aggregatingState;/*** 在open方法中,初始化状态** @param configuration* @throws Exception*/@Overridepublic void open(Configuration configuration) throws Exception {super.open(configuration);// 创建一个状态描述器StateDescriptor来提供状态的基本信息 传入状态的名称、AggregateFunction、累加器类型aggregatingState = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor<Integer, HashMap<Integer, Integer>, HashMap<Integer, Integer>>("aggregatingState",new MyAggregateFunction(),TypeInformation.of(new TypeHint<HashMap<Integer, Integer>>() {})));}@Overridepublic void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {// 将水位值添加到聚合状态中aggregatingState.add(value.f1);// 从聚合状态中获取结果HashMap<Integer, Integer> res = aggregatingState.get();out.collect("keyBy:" + value.f0 + " 数字:" + value.f1 + " 出现次数:" + res.get(value.f1));}}/*** param1 聚合的值的类型 (输入值)* param2 累加器的类型 (中间聚合状态)* param3 聚合结果的类型*/public static class MyAggregateFunction implements AggregateFunction<Integer, HashMap<Integer, Integer>, HashMap<Integer, Integer>> {// 创建累加器,类型HashMap<Integer, Integer>@Overridepublic HashMap<Integer, Integer> createAccumulator() {HashMap<Integer, Integer> map = new HashMap<>();return map;}@Overridepublic HashMap<Integer, Integer> add(Integer value, HashMap<Integer, Integer> accumulator) {if (accumulator.containsKey(value)) {Integer sum = accumulator.get(value) + 1;accumulator.put(value, sum);} else {accumulator.put(value, 1);}return accumulator;}@Overridepublic HashMap<Integer, Integer> getResult(HashMap<Integer, Integer> accumulator) {return accumulator;}@Overridepublic HashMap<Integer, Integer> merge(HashMap<Integer, Integer> a, HashMap<Integer, Integer> b) {return null;}}
key1,1
key1,2
key1,3
key1,2
key1,3
key1,2
keyBy:key1 数字:1 出现次数:1
keyBy:key1 数字:2 出现次数:1
keyBy:key1 数字:3 出现次数:1
keyBy:key1 数字:2 出现次数:2
keyBy:key1 数字:3 出现次数:2
keyBy:key1 数字:2 出现次数:3

算子状态

概述

算子状态(Operator State)是 Flink 中一种用于保存和管理算子(Operator)状态的机制。算子状态通常用于在算子之间保持一些中间结果,或者用于保存全局信息。

算子状态是一个算子并行实例上定义的状态,作用范围被限定为当前算子任务。算子状态跟数据的key无关,所以不同key的数据只要被分发到同一个并行子任务,就会访问到同一个算子状态。

算子状态一般用在Source或Sink等与外部系统连接的算子上,或者完全没有key定义的场景。

当算子的并行度发生变化时,算子状态也支持在并行的算子任务实例之间做重组分配。根据状态的类型不同,重组分配的方案也会不同。

在Flink中,算子任务可以分为无状态和有状态两种情况。

无状态算子

无状态的算子任务只需要观察每个独立事件,根据当前输入的数据直接转换输出结果。例如:基本转换算子map、filter、flatMap等计算时不依赖其他数据,就都属于无状态的算子。

有状态算子

有状态的算子任务,除当前数据之外,还需要一些其他数据来得到计算结果。其他数据就是所谓的状态。例如:聚合算子、窗口算子都属于有状态的算子。

算子状态有以下几个特点:

算子状态是与算子实例绑定的,每个算子实例都会维护自己的状态。这意味着在并行计算中,每个并行实例都会有独立的状态算子状态可以是一种类型,也可以是多种类型的组合。常见的算子状态类型包括 ValueState、ListState、MapState 等算子状态可以在算子实例之间进行快速的备份和恢复,以保证程序的容错性算子状态可以存储在内存中,也可以通过配置选择将其存储在外部持久化存储系统中,如 RocksDB

算子状态也支持不同的结构类型,主要有三种:ListState、UnionListState和BroadcastState。

列表状态 ListState

在算子状态的上下文中,不会按键分别处理状态,每一个并行子任务上会保留一个列表

当算子并行度进行缩放调整时,算子的列表状态中的所有元素项会被统一收集起来,相当于把多个分区的列表合并成了一个大列表,然后再均匀地分配给所有并行任务。

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 从socket接收数据流DataStreamSource<String> source = env.socketTextStream("IP", 8086);source.map(new MyMapFunction()).print();env.execute();}// 实现CheckpointedFunction接口public static class MyMapFunction implements MapFunction<String, Integer>, CheckpointedFunction {// 本地变量private Integer count = 0;// 定义状态private ListState<Integer> state;@Overridepublic Integer map(String value) throws Exception {return ++count;}/*** 本地变量持久化:将本地变量拷贝到算子状态中,开启checkpoint时才会调用*/@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {System.out.println("snapshotState...");// 清空算子状态state.clear();// 将 本地变量 添加到 算子状态 中state.add(count);}/*** 初始化本地变量:程序启动和恢复时, 从状态中把数据添加到本地变量,每个子任务调用一次*/@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {System.out.println("initializeState...");// 从上下文初始化 子状态state = context.getOperatorStateStore().getListState(new ListStateDescriptor<Integer>("state", Types.INT));// 从算子状态中把数据拷贝到本地变量if (context.isRestored()) {for (Integer val : state.get()) {count += val;}}}}

初始化本地变量方法与并行度设置有关

initializeState...
initializeState...

输入测试数据

1
2
3
4
1> 1
2> 1
1> 2
2> 2

联合列表状态 UnionListState

它与常规列表状态的区别在于,算子并行度进行缩放调整时对于状态的分配方式不同。

在并行度调整时,常规列表状态是轮询分配状态项,而联合列表状态的算子则会直接广播状态的完整列表。

并行度缩放之后的并行子任务就获取到了联合后完整的大列表,可以自行选择要使用的状态项和要丢弃的状态项。

        /*** 初始化本地变量:程序启动和恢复时, 从状态中把数据添加到本地变量,每个子任务调用一次*/@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {System.out.println("initializeState...");// 从上下文初始化 子状态state = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<Integer>("union-state", Types.INT));// 从算子状态中把数据拷贝到本地变量if (context.isRestored()) {for (Integer val : state.get()) {count += val;}}}

广播状态 Broadcast State

广播状态是 Flink 中一种特殊的算子状态类型,可用于在流处理任务中将数据广播到所有并行任务中共享和访问。它适用于将少量的全局信息广播到算子的每个实例,以便进行更灵活的计算。

因为广播状态在每个并行子任务上的实例都一样,所以在并行度调整的时候就比较简单,只要复制一份到新的并行任务就可以实现扩展。而对于并行度缩小的情况,可以将多余的并行子任务连同状态直接删除,因为状态都是复制出来的,并不会丢失

广播状态具有以下特点:

广播状态只需要占用少量的内存,因为它通常用于存储比较小的全局数据或配置信息广播状态在整个任务中共享,使得每个算子实例都可以访问广播状态中的数据,而无需进行网络通信广播状态在任务开始时被广播并分发到每个算子实例,保持数据的一致性

更改广播状态示例

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 从socket接收数据流SingleOutputStreamOperator<Tuple2<String, Integer>> sourceMap = env.socketTextStream("IP", 8086)// 将输入数据转换为Tuple2.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {String[] split = value.split(",");return Tuple2.of(split[0], Integer.valueOf(split[1]));}});// 广播配置流DataStreamSource<String> dataStreamSource = env.socketTextStream("IP", 8087);// 使用给定的名称和给定的类型信息新建一个MapStateDescriptorMapStateDescriptor<String, Integer> broadcastMapState = new MapStateDescriptor<>("broadcast-state", Types.STRING, Types.INT);// 得到广播流BroadcastStream<String> broadcastStream = dataStreamSource.broadcast(broadcastMapState);// 数据流和广播配置流进行关联BroadcastConnectedStream<Tuple2<String, Integer>, String> broadcastConnectedStream = sourceMap.connect(broadcastStream);// 调用 processbroadcastConnectedStream.process(new BroadcastProcessFunction<Tuple2<String, Integer>, String, String>() {/*** 数据流的处理* 数据流只能读取广播状态,不能修改* @param value 非广播侧的输入类型* @param ctx 广播端的输入类型* @param out 运算符的输出类型*/@Overridepublic void processElement(Tuple2<String, Integer> value, ReadOnlyContext ctx, Collector<String> out) throws Exception {// 通过上下文获取广播状态,取出值ReadOnlyBroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(broadcastMapState);Integer configValue = broadcastState.get("myConfig");// 注意:刚启动时,可能是数据流的第一条数据先来configValue = (configValue == null ? 0 : configValue);if (value.f1 > configValue) {out.collect("输入数字:" + value.f1 + " > 广播状态值:" + configValue);} else {out.collect("输入数字:" + value.f1 + " <= 广播状态值:" + configValue);}}/*** 广播后配置流的处理* 只有广播流才能修改广播状态*/@Overridepublic void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {// 通过上下文获取广播状态,往里面写数据BroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(broadcastMapState);broadcastState.put("myConfig", Integer.valueOf(value));}}).print();env.execute();}

输入测试数据

nc -lk 8086
key1,1
key1,2
key1,3

输出:

1> 输入数字:1 > 广播状态值:0
2> 输入数字:2 > 广播状态值:0
1> 输入数字:3 > 广播状态值:0

更改广播状态

nc -lk 8087
5

输入测试数据

nc -lk 8086
key1,6
key1,8

输出:

2> 输入数字:6 > 广播状态值:5
1> 输入数字:8 > 广播状态值:5

状态有效期 (TTL)

概述

状态效期、生存时间(State TTL,Time-to-Live)是 Flink 中的一个功能,用于为状态设置过期时间。通过设置状态生存时间,可以自动清理过期的状态数据,避免无限增长的状态。

任何类型的keyed state都可以有 有效期 (TTL)。如果配置了TTL且状态值已过期,则会尽最大可能清除对应的值

所有状态类型都支持单元素的TTL。 这意味着列表元素和映射元素将独立到期。

StateTtlConfig 配置对象

配置状态的TTL时,需要创建一个StateTtlConfig配置对象,然后调用状态描述器的enableTimeToLive()方法启动TTL功能。

创建一个StateTtlConfig配置对象

        StateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.seconds(10)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();

启动TTL功能

        ValueStateDescriptor<String> valueStateDescriptor = new ValueStateDescriptor<>("MyState", String.class);valueStateDescriptor.enableTimeToLive(stateTtlConfig);

参数说明

newBuilder()

状态TTL配置的构造器方法,必须调用,返回一个Builder之后再调用.build()方法就可以得到StateTtlConfig。方法需要传入一个Time作为参数,这就是设定的状态生存时间。

在这里插入图片描述

setUpdateType()

设置更新类型。更新类型指定了什么时候更新状态失效时间

在这里插入图片描述

DisabledTTL 已禁用。这意味着状态不会过期,它将一直保持有效,直到显式删除或状态存储由于其他原因而被清理OnCreateAndWrite:表示只有创建状态和更改状态(写操作)时更新失效时间。配置默认为OnCreateAndWriteOnReadAndWrite:表示无论读写操作都会更新失效时间,也就是只要对状态进行了访问,就表明它是活跃的,从而延长生存时间

setStateVisibility()

设置状态的可见性。状态可见性是指因为清除操作并不是实时的,当状态过期之后还可能继续存在,如果对它进行访问,能否正常读取到是一个问题

在这里插入图片描述

NeverReturnExpired:默认行为,表示从不返回过期值,也就是只要过期就认为它已经被清除,不能继续读取ReturnExpireDefNotCleanedUp:如果过期状态还存在,就返回它的值

清理

过期数据的清理

默认情况下,过期数据会在读取的时候被删除,同时会有后台线程定期清理(StateBackend支持)。可以通过StateTtlConfig配置关闭后台清理:

StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).disableCleanupInBackground().build();

全量快照时进行清理

可以启用全量快照时进行清理的策略,这可以减少整个快照的大小。当前实现中不会清理本地的状态,但从上次快照恢复时,不会恢复那些已经删除的过期数据。

StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).cleanupFullSnapshot().build();

增量数据清理

在状态访问或处理时进行,会在存储后端保留一个所有状态的惰性全局迭代器。 每次触发增量清理时,从迭代器中选择已经过期的数进行清理。

 StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1))/*** @cleanupSize 每次清理时检查状态的条目数,在每个状态访问时触发* @runCleanupForEveryRecord  表示是否在处理每条记录时触发清理*/.cleanupIncrementally(10, true).build();

在RocksDB压缩时清理

如果使用RocksDBstatebackend,则会启用Flink为RocksDB定制的压缩过滤器。RocksDB会周期性的对数据进行合并压缩从而减少存储空间。Flink提供的RocksDB压缩过滤器会在压缩时过滤掉已经过期的状态数据。

StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).cleanupInRocksdbCompactFilter(1000).build();

使用示例

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket接收数据流SingleOutputStreamOperator<Tuple2<String, Integer>> streamSource = env.socketTextStream("IP", 8086)// 将输入数据转换为Tuple2.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {String[] split = value.split(",");return Tuple2.of(split[0], Integer.valueOf(split[1]));}})// 指定 watermark策略.assignTimestampsAndWatermarks(// 定义Watermark策略WatermarkStrategy.<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((value, ts) -> value.f1 * 1000L));// keyBy分区KeyedStream<Tuple2<String, Integer>, String> keyByStream = streamSource.keyBy(a -> a.f0);keyByStream.process(new MyKeyedProcessFunction()).print();env.execute();}public static class MyKeyedProcessFunction extends KeyedProcessFunction<String, Tuple2<String, Integer>, String> {// 定义状态ValueState<Integer> lastState;/*** 在open方法中,初始化状态** @param configuration* @throws Exception*/@Overridepublic void open(Configuration configuration) throws Exception {super.open(configuration);// 创建StateTtlConfigStateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.seconds(5)) // 过期时间5s
//                                        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 状态 创建和写入更新 时更新过期时间.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite) // 状态 读取、创建和写入更新 时更新 过期时间.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 不返回过期的状态值.build();// 状态描述器 启用TTLValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("lastState", Types.INT);stateDescriptor.enableTimeToLive(stateTtlConfig);this.lastState = getRuntimeContext().getState(stateDescriptor);}@Overridepublic void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {// 获取状态值Integer lastValue = lastState.value();out.collect("keyBy:" + value.f0 + " 状态值: " + lastValue);// 更新状态lastState.update(value.f1);}}

快速输入测试数据

key1,1
key1,2
key1,4
keyBy:key1 状态值: null
keyBy:key1 状态值: 1
keyBy:key1 状态值: 2

等待超过5秒输入测试数据

key1,6
keyBy:key1 状态值: null

状态后端 State Backend

概述

状态后端是 Flink 中用于管理和持久化状态数据的机制。状态后端负责将算子状态和键控状态Keyed State存储在可靠且可恢复的存储系统中,并提供对状态数据的读取和写入操作。

状态后端主要负责管理本地状态的存储方式和位置

可用状态后端

Flink内置了以下这些开箱即用的state backends :

如果不设置,默认使用HashMapStateBackend。两种状态后端最大的区别,就在于本地状态存放在哪里

HashMapStateBackend: 哈希表状态后端EmbeddedRocksDBStateBackend:内嵌RocksDB状态后端

1.HashMapStateBackend

在HashMapStateBackend内部,数据以Java对象的形式存储在堆中。Key/value形式的状态和窗口算子会持有一个hash table,其中存储着状态值、触发器。

具有以下特点:

高性能:由于状态存储在内存中,哈希表状态后端提供极快的数据读取和写入性能低延迟:状态的访问速度非常快,因为无需进行磁盘或网络访问低容错性:哈希表状态后端不提供持久化能力,即在故障发生时可能会丢失状态数据。适用于开发和调试环境,或对数据一致性要求较低的场景

2.EmbeddedRocksDBStateBackend

将状态数据存储在硬盘上的RocksDB数据库中,RocksDB是一种内嵌的key-value存储介质,可以把数据持久化到本地硬盘。配置开启后,会将处理中的数据全部放入RocksDB数据库中,RocksDB默认存储在TaskManager的本地数据目录里。

RocksDB的状态数据被存储为序列化的字节数组,读写操作需要序列化/反序列化,因此状态的访问性能要差一些。另外,因为做了序列化,key的比较也会按照字节进行,而不是直接调用.hashCode()和.equals()方法。

不同于HashMapStateBackend中的java对象,状态数据被以序列化字节数组的方式存储,需要序列化、反序列化,因此key之间的比较是以字节序的形式进行而不是使用Java的调用.hashCode()和.equals()方法。

执行是异步快照,不会因为保存检查点而阻塞数据的处理,并且还提供了增量式保存检查点的机制,在很多情况下可以大大提升保存效率。

具有以下特点:

持久化和可恢复性:内嵌RocksDB状态后端可将状态数据持久化到磁盘,并在故障发生时能够恢复状态数据高容量:由于状态存储在磁盘上,内嵌RocksDB状态后端可以处理大规模的状态数据中等性能:相较于哈希表状态后端,内嵌RocksDB状态后端的读写性能略低。但由于RocksDB是一个高效的键值存储引擎,它仍然提供了相对较好的读写性能

状态后端的配置

默认状态后端是由集群配置文件flink-conf.yaml指定的,配置的键名称为state.backend

默认配置对集群上运行的所有作业都有效,可以通过更改配置值来改变默认的状态后端。还可以在代码中为当前作业单独配置状态后端,这个配置会覆盖掉集群配置文件的默认值。

1.配置默认全局的状态后端

flink-conf.yaml中,可以使用state.backend来配置默认状态后端。

# 默认状态后端,哈希表状态后端
state.backend.type: hashmap# 内嵌RocksDB状态后端
state.backend.type: rocksdb# 定义检查点和元数据写入的目录
state.checkpoints.dir: hdfs://node01:8020/flink/checkpoints

2.设置每个Job的状态后端

使用hashmap状态后端

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
HashMapStateBackend hashMapStateBackend = new HashMapStateBackend();env.setStateBackend(hashMapStateBackend);

使用rocksdb状态后端

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend();env.setStateBackend(embeddedRocksDBStateBackend);

注意:Flink发行版中默认包含了RocksDB(解压的Flink安装包),在IDE中使用rocksdb状态后端,需要为Flink项目添加依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb</artifactId><version>${flink.version}</version>
</dependency>

3.提交参数指定

flink run-application -t yarn-application-p 2 -Dstate.backend.type=rocksdb -c 全类名 jar包

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/185786.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

pytorch(小土堆)深度学习

第五节课讲项目的创建和对比 第六节&#xff1a;Dataset,Dataloader Dataset提供一种方式区获取数据及其label(如何获取每一个数据及其label&#xff0c;告诉我们总共有多少的数据) Dataloader为后面的网络提供不同的数据形式 第七节&#xff1a;Dataset类代码实战 显示图片 f…

WebSocket在node端和客户端的使用

摘要 如果想要实现一个聊天的功能&#xff0c;就会想到使用WebSocket来搭建。那如果没有WebSocet的时候&#xff0c;我们会以什么样的思路来实现聊天功能呢&#xff1f; 假如有一个A页面 和 B页面进行通信&#xff0c;当A发送信息后&#xff0c;我们可以将信息存储在文件或者…

安防监控EasyCVR视频汇聚平台无法接入Ehome5.0是什么原因?该如何解决?

视频云存储/安防监控EasyCVR视频汇聚平台基于云边端智能协同&#xff0c;支持海量视频的轻量化接入与汇聚、转码与处理、全网智能分发、视频集中存储等。安防平台EasyCVR拓展性强&#xff0c;视频能力丰富&#xff0c;具体可实现视频监控直播、视频轮播、视频录像、云存储、回放…

2023.11.09 homework (2)

【七年级上数学】 教别人也是教自己&#xff0c;总结下&#xff1a; 13&#xff09;找规律的题目&#xff0c;累加题目&#xff0c;要整体看&#xff0c;不然不容易算出来&#xff0c;求最大值&#xff0c;那么就是【最大值集群和】减去【最小集群和】就是最大值 9-12&#x…

模态对话框和非模态对话框

创建到堆区这样非模态对话框就不会一闪而过 .exec使程序进入阻塞状态 ()[]{}lambda表达式 55号属性可以在对话框关闭的时候将堆区的内存释放掉从而防止内存泄露

在linux安装单机版hadoop-3.3.6

一、下载hadoop https://mirrors.tuna.tsinghua.edu.cn/apache/hadoop/core/hadoop-3.3.6/ 二、配置环境变量 1、配置java环境变量 2、配置hadoop环境变量 export HADOOP_HOME/usr/local/bigdata/hadoop-3.3.6 export HBASE_HOME/usr/local/bigdata/hbase-2.5.6 export JA…

手术训练系统项目

★ 手术训练系统项目 项目描述&#xff1a;手术训练系统&#xff0c;它提供了多项功能&#xff0c;包括账户登录与创建、数据库与账户管理、课程管理、小组管理、成绩统计、证书发布、训练和系统设置。 职责描述: 1、训练功能开发&#xff08;任务概述、任务指导、评分规则、评…

71 内网安全-域横向网络传输应用层隧道技术

目录 必备知识点&#xff1a;1.代理和隧道技术区别?2.隧道技术为了解决什么?3.隧道技术前期的必备条件? 演示案例:网络传输应用层检测连通性-检测网络层ICMP隧道Ptunnel使用-检测利用传输层转发隧道Portmap使用-检测,利用传输层转发隧道Netcat使用-检测,利用,功能应用层DNS隧…

Jmeter分布式性能测试细节+常见问题解决,资深老鸟带你避坑...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 Jmeter分布式测试…

【TiDB】TiDB CLuster部署

目录 0 大纲 一 集群部署工具TiUP简介 1 TiUP 简介 2 TiUP使用 3 TiUP使用举例 二 TiDB Cluster安装配置需求 1 生产环境硬件需求 2 操作系统需求 三 TIDB部署 1 软硬件需求以及前置检查​编辑 2 安装TiUP 组件 ​3 集群拓扑文件 4 执行部署命令 &#xff08;1&…

6-爬虫-scrapy解析数据(使用css选择器解析数据、xpath 解析数据)、 配置文件

1 scrapy解析数据 1.1 使用css选择器解析数据 1.2 xpath 解析数据 2 配置文件 3 整站爬取博客–》爬取详情–》数据传递 scrapy 爬虫框架补充 # 1 打码平台---》破解验证码-数字字母&#xff1a;ddddocr-计算题&#xff0c;滑块&#xff0c;成语。。。-云打码&#xff0c;超…

HK WEB3 MONTH Polkadot Hong Kong 火热报名中!

HK Web3 Month 11月除了香港金融科技周外&#xff0c;HK Web3 Month又是一大盛事&#xff0c;从10月29日开始开幕直到11月18日结束。此次将齐聚世界各地的Web3产业从业者、开发者、社群成员和学生来参与本次盛会。除外&#xff0c;超过75位产业知名的讲者与超过50场工作坊将为…

大数据毕业设计选题推荐-农作物观测站综合监控平台-Hadoop-Spark-Hive

✨作者主页&#xff1a;IT毕设梦工厂✨ 个人简介&#xff1a;曾从事计算机专业培训教学&#xff0c;擅长Java、Python、微信小程序、Golang、安卓Android等项目实战。接项目定制开发、代码讲解、答辩教学、文档编写、降重等。 ☑文末获取源码☑ 精彩专栏推荐⬇⬇⬇ Java项目 Py…

linux基础:3.linux基础环境开发工具和配置。

linux基础环境开发工具和配置 一.学习yum工具进行软件安装&#xff1a;1.什么是yum&#xff1a;2.查看软件包&#xff1a;3.安装和删除&#xff1a;4.yum生态&#xff1a; 二.vim的使用&#xff1a;一.快速介绍一下vim二.vim正常模式&#xff1a;2-1&#xff1a;命令模式1.光标…

strtok函数详解:字符串【分割】的利器

目录 一&#xff0c;strtok函数简介 二&#xff0c;strtok函数的用法 三&#xff0c;strtok函数的注意事项 一&#xff0c;strtok函数简介 strtok函数可以帮助我们将一个字符串按照指定的分隔符进行分割&#xff0c;从而得到我们想要的子字符串。 &#x1f342;函数头文件&am…

【Qt绘制小猪】以建造者模式绘制小猪

效果 学以致用&#xff0c;使用设计模式之建造者模式绘制小猪。 代码 接口&#xff1a;申明绘制的步骤 PigBuilder.h #ifndef PIGBUILDER_H #define PIGBUILDER_H#include <QObject> #include <QPainter>class PigBuilder : public QObject {Q_OBJECT public:ex…

记录C# WinForm项目调用Rust生成的dll库

一、开发环境 1.RustRover (version&#xff1a;2023.3 EAP) 2.Visual Studio 2019 (version&#xff1a;16.11.30) 3.Windows 10 64位 OS 4.WinR&#xff1a;控制台程序&#xff0c;cmd.exe 二、使用RustRover编译Rust脚本为dll 1.下载安装Rust&#xff0c;https://www.…

Spark大数据应用实战

系列文章目录 送书第一期 《用户画像&#xff1a;平台构建与业务实践》 送书活动之抽奖工具的打造 《获取博客评论用户抽取幸运中奖者》 送书第二期 《Spring Cloud Alibaba核心技术与实战案例》 送书第三期 《深入浅出Java虚拟机》 送书第四期 《AI时代项目经理成长之道》 …

计算机丢失mfc100.dll如何恢复,详细解析mfc100.dll文件丢失解决方法

在计算机使用过程中&#xff0c;我们可能会遇到一些错误提示&#xff0c;比如“mfc100.dll丢失”。这是因为动态链接库&#xff08;DLL&#xff09;文件是Windows操作系统的重要组成部分&#xff0c;它们包含了许多程序运行所需的函数和数据。当这些DLL文件丢失或损坏时&#x…

Selenium爬取内容并存储至MySQL数据库

前面我通过一篇文章讲述了如何爬取博客摘要等信息。通常,在使用Selenium爬虫爬取数据后,需要存储在TXT文本中,但是这是很难进行数据处理和数据分析的。这篇文章主要讲述通过Selenium爬取我的个人博客信息,然后存储在数据库MySQL中,以便对数据进行分析,比如分析哪个时间段…