详解 Flink 的状态管理

一、Flink 状态介绍

1. 流处理的无状态和有状态

  • 无状态的流处理:根据每一次当前输入的数据直接转换输出结果的过程,在处理中只需要观察每个输入的独立事件。例如, 将一个字符串类型的数据拆分开作为元组输出或将每个输入的数值加 1 后输出。Flink 中的基本转换算子 (map、filter、flatMap 等) 在计算时不依赖其他数据,所以都属于无状态的算子。

在这里插入图片描述

  • 有状态的流处理:根据每一次当前输入的数据和一些其他已处理的数据共同转换输出结果的过程,这些其他已处理的数据就称之为状态(state),状态由任务维护,可以被任务的业务逻辑访问。例如,做求和(sum)计算时,需要当前输入的数据和保存的之前所有输入数据的和共同计算;窗口操作中会将当前达到的数据和保存的之前已经到达的所有数据共同处理。Flink 中的聚合算子和窗口算子都属于有状态的算子。

    在这里插入图片描述

2. Flink 的状态管理

  • 在传统的事务型处理架构中,状态数据一般是保存在数据库中的,在业务处理过程中与数据库交互进行状态的读取和更新;但对于大数据实时处理架构来说,在业务处理时频繁地读写外部数据库会造成性能达不到要求,因此不能使用数据库进行状态管理
  • 在实时流处理中一般将状态直接保存在内存中来保证性能,但必须使用分布式架构来做扩展,在低延迟、高吞吐的基础上还要保证容错性,一系列复杂的问题随之产生
  • Flink 拥有一套完整的状态管理机制,将底层一些核心功能全部封装起来,包括状态一致性、状态的高效存储和访问、持久化保存和故障恢复以及资源扩展时的调整。开发者只需要调用相应的 API 就可以很方便地使用状态,或对应用的容错机制进行配置,从而将更多的精力放在业务逻辑的开发上

二、Flink 状态分类

1. 托管状态

Managed State,所有的托管状态都由 Flink 统一管理的,状态的存储访问、故障恢复和重组等一系列问题都由 Flink 实现

1.1 算子状态

Operator State,状态作用范围限定为当前的算子任务实例,只对当前的并行子任务实例有效;使用较少

在这里插入图片描述

  • 由同一并行任务所处理的所有数据都可以访问到相同的算子状态
  • 算子状态对于同一任务而言是共享的
  • 算子状态不能由相同或不同算子的另一个任务访问
1.1.1 算子状态数据结构
  • 列表状态(List state):将状态表示为一组数据的列表
  • 联合列表状态(Union list state):也是将状态表示为一组数据的列表。与列表状态的区别在于,在发生故障时或者从保存点(savepoint)启动应用程序时恢复的方式不同
  • 广播状态(Broadcast state):如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态
1.1.2 案例
public class TestFlinkOperatorState {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> inputStream = env.socketTextStream("localhost", 7777);DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fileds[0], new Long(fields[1]), new Double(fields[2]));});//定义一个有状态的map算子,用于统计输入数据个数DataStream<Integer> resultStream = dataStream.map(new MyCountMapper());resultStream.print();env.execute();}//定义有状态的 map 操作//实现 ListCheckpointed 接口,泛型为状态数据类型public static class MyCountMapper implements MapFunction<SensorReading, Integer>, ListCheckpointed<Integer> {//定义一个本地变量作为状态private Integer count = 0;@Overridepublic Integer map(SensorReading value) throws Exception {count++;return count;}//对状态做快照@Overridepublic List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {return Collections.singletonList(count);}//容错恢复状态@Overridepublic void restoreState(List<Integer> state) throws Exception {for(Integer num : state) {count += num;}}}}
1.2 按键分区状态

Keyed State,状态的作用范围以 key 来隔离,是根据输入流中定义的键(key)来维护和访问的,所以只能定义在按键分区流(KeyedStream)中,即 keyBy 之后才可以使用

在这里插入图片描述

  • 在进行按键分区(keyBy)之后,具有相同 key 的所有数据,都会分配到同一个并行子任务中,这个任务会维护和处理这个 key 对应的状态实例
  • 一个并行子任务可能会处理多个 key 的数据,所以该任务会为每个 key 都维护一个状态实例
  • 在底层,同一个并行子任务的所有 KeyedState 会根据 key 保存成键值对(key-value)的形式,当一条数据到来时,任务会自动将状态的访问范围限定为当前数据的 key,并从键值对(key-value)存储中读取出对应的状态值
  • 具有相同 key 的所有数据都会到访问相同的状态,而不同 key 的状态之间是彼此隔离的
  • 在应用的并行度改变时,状态也需要随之进行重组。不同 key 对应的 Keyed State 可以进一步组成所谓的键组(key groups),每一组都对应着一个并行子任务。键组是 Flink 重新分配 Keyed State 的单元,键组的数量就等于定义的最大并行度。当算子并行度发生改变时,Keyed State 就会按照当前的并行度重新平均分配,保证运行时各个子任务的负载相同
1.2.1 按键分区状态数据结构
//按键分区状态的实例化方法:在富函数中,调用 getRuntimeContext() 方法获取到运行时上下文之后
ValueState<T> getState(ValueStateDescriptor<T>)
MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)
ListState<T> getListState(ListStateDescriptor<T>)
ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
AggregatingState<IN,  OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
  • 值状态:ValueState<T>,将状态表示为单个的值,值的类型为 T
    • ValueState.value():获取状态值
    • ValueState.update(T value):添加或更新状态值
    • ValueState.clear():清空操作
  • 列表状态:ListState<T>,将状态表示为一组数据的列表,列表里的元素的数据类型为 T
    • ListState.add(T value):追加状态值
    • ListState.addAll(List<T> values):追加状态值列表
    • ListState.get():获取状态值的 Iterable<T>
    • ListState.update(List<T> values):更新状态值列表
    • ListState.clear():清空操作
  • 映射状态:MapState<K, V>,将状态表示为一组 Key-Value 对
    • MapState.get(UK key):获取状态值
    • MapState.put(UK key , UV value):添加或更新状态值
    • MapState.contains(UK key):判断状态值是否存在
    • MapState.remove(UK key):删除状态值
    • MapState.clear():清空操作
  • 聚合状态:ReducingState<T>AggregatingState<I, O>,将状态表示为一个用于聚合操作的列表
    • ReducingState.add():聚合状态值,调用实例化 ReducingState 时自定义 ReduceFunction 中的方法;AggregatingState 同理
    • ReducingState.clear():清空操作,AggregatingState 同理
1.2.2 案例
/**按键分区状态的使用步骤:1. 在自定义算子Function中声明一个按键分区数据结构,由于声明时需要使用 getRuntimeContext(),因此要使用继承富函数类的方式自定义算子Function2. 在自定义算子Function的对应算子方法中进行状态的读写等相关操作
*/
public class TestFlinkKeyedState {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> inputStream = env.socketTextStream("localhost", 7777);DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fileds[0], new Long(fields[1]), new Double(fields[2]));});/*需求:自定义有状态的map算子,按sensor_id统计个数*///使用按键分区状态必须先进行keyByDataStream<Integer> resultStream = dataStream.keyBy("id").map(new MyKeyCountMapper());resultStream.print();env.execute();}//使用继承富函数类的方式自定义MapFunctionpublic static class MyKeyCountMapper extends RichMapFunction<SensorReading, Integer> {//定义一个值状态属性private ValueState<Integer> myValueState;//在open方法中实例化值状态@Overridepublic void open(Configuration parameters) throws Exception {myValueState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("value-state", Integer.class));}@Overridepublic Integer map(SensorReading value) throws Exception {//获取状态值Integer count = myValueState.value();if(count == null) {count = 0;}count++;//更新状态值myValueState.update(count);return count;}}
}

2. 原始状态

Raw State,原始状态是自定义的,相当于开辟了一块内存,需要开发者自己管理,实现状态的序列化和故障恢复

  • Flink 不会对原始状态进行任何自动操作,也不知道状态的具体数据类型,只会把它当作最原始的字节(Byte)数组来存储
  • 只有在遇到托管状态无法实现的特殊需求时,才考虑使用原始状态;一般情况下不推荐使用

三、Flink 状态编程案例

/**需求:检测同一个传感器的温度值,如果连续的两个温度差值超过 10 度,就输出报警信息
*/
public class FlinkKeyedStateCase {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> inputStream = env.socketTextStream("localhost", 7777);DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});//定义一个有状态的 flatMap 操作,若同一个传感器连续两个温度的差值超过 10 度,则输出报警//报警信息:sensor_id,前一次温度值,当前温度值DataStream<Tuple3<String, Double, Double>> warningStream = dataStream.keyBy("id").flatMap(new TempChangeWarning(10.0));warningStream.print();env.execute();}//使用继承富函数类的方式自定义FlatMapFunctionpublic static class TempChangeWarning extends RichFlatMapFunction<SensorReading, Tuple3<String, Double, Double>> {//定义温度差阈值属性private Double threshold;//定义值状态属性,保存上一次的温度值private ValueState<Double> lastTempState;public TempChangeWarning(Double threshold) {this.threshold = threshold;}//在open方法中实例化值状态@Overridepublic void open(Configuration parameters) throws Exception {lastTempState = getRuntimeContext().getState(new ValueStateDescriptor("last-temp", Double.class));}//重写flatMap方法@Overridepublic void flatMap(SensorReading value, Collector<Tuple3<String, Double, Double>> out) throws Exception {//获取上一次温度状态值Double lastTemp = lastTempState.value();//如果状态值不为null,则进行差值判断if(lastTemp != null) {Double diff = Math.abs(lastTemp - value.getTemperature());//差值超过阈值,则输出报警信息if(diff >= threshold) {out.collect(new Tuple3<>(value.getId(), lastTemp, value.getTemperature()));}}//更新状态值lastTempState.update(value.getTemperature());}//在close方法中清空状态@Overridepublic void close() throws Exception {lastTempState.clear();}}
}

四、Flink 状态后端

State Backends,一个可插入的决定状态的存储、访问以及维护等工作的组件

1. 介绍

​ 在 Flink 中,状态的存储、访问以及维护,都是由一个可插拔的组件决定的,这个组件就叫作状态后端(state backends)。状态后端主要负责两件事:一是本地的状态管理,二是将检查点(checkpoint)写入远程的持久化存储。

2. 分类

  • MemoryStateBackend:内存级的状态后端,会将键控状态作为内存中的对象进行管理,将它们存储
    在 TaskManager 的 JVM 堆上;而将 checkpoint 存储在 JobManager 的内存中。

  • FsStateBackend:文件系统级的状态后端,对于本地状态,跟 MemoryStateBackend 一样,也会存储在 TaskManager 的 JVM 堆上,但会将 checkpoint 存储到远程的持久化文件系统(FileSystem)中,如 HDFS。

  • RocksDBStateBackend:将所有状态和 checkpoint 序列化后,存入本地的 RocksDB 中存储。RocksDBStateBackend 的支持并不直接包含在 flink 中,需要引入依赖。

    <dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_2.12</artifactId><version>1.10.1</version>
    </dependency>
    

3. 配置

3.1 配置文件配置
  • 进入 flink 安装目录下的 conf 目录,打开 flink-conf.yaml 文件

    cd /opt/module/flink/conf
    vim flink-conf.yaml
    
  • 在文件中的 Fault tolerance and checkpointing 部分进行配置

    #Fault tolerance and checkpointing
    #============================================================
    state.backend: filesystem #默认值为 filesystem,可选值为 jobmanager/filesystem/rocksdb#state.checkpoints.dir: hdfs://namenode:port/flink/checkpointsjobmanager.execution.failover-strategy: region #容错恢复策略,默认是按区域恢复
    
3.2 代码配置

在代码中为每个作业单独配置状态后端

public class TestStatebackend {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//配置状态后端//1.MemoryStateBackendenv.setStateBackend(new MemoryStateBackend());//2.FsStateBackendenv.setStateBackend(new FsStateBackend("hdfs://......"));//3.RocksDBStateBackend,需要先引入依赖env.setStateBackend(new RocksDBStateBackend("checkpointDataUri"));DataStream<String> inputStream = env.socketTextStream("localhost", 7777);DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fileds[0], new Long(fields[1]), new Double(fields[2]));});dataStream.print();env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/345146.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深入探索:十种流行的深度神经网络及其运作原理

算法 深入探索&#xff1a;十种流行的深度神经网络及其运作原理一、卷积神经网络&#xff08;CNN&#xff09;基本原理工作方式 二、循环神经网络&#xff08;RNN&#xff09;基本原理工作方式 三、长短期记忆网络&#xff08;LSTM&#xff09;基本原理工作方式 四、门控循环单…

使用Python创建Word文档

使用Python创建Word文档 安装python-docx库创建Word文档代码效果 在这篇文章中&#xff0c;我们将介绍如何使用 Python创建一个Word文档。首先&#xff0c;我们需要安装python-docx库&#xff0c;然后通过一段简单的代码示例展示如何创建和编辑Word文档。 安装python-docx库 …

C++使用thread_local实现每个线程下的单例

对于一个类&#xff0c;想要在每个线程种有且只有一个实例对象&#xff0c;且线程之间不共享该实例&#xff0c;可以按照单例模式的写法&#xff0c;同时使用C11提供的thread_local关键字实现。 在单例模式的基础上&#xff0c;使用thread_local关键字修饰单例的instance&…

CasaOS玩客云如何部署小雅AList并结合内网穿透远程访问海量资源

文章目录 前言1. 本地部署AList2. AList挂载网盘3. 部署小雅alist3.1 Token获取3.2 部署小雅3.3 挂载小雅alist到AList中 4. Cpolar内网穿透安装5. 创建公网地址6. 配置固定公网地址 前言 本文主要介绍如何在安装了CasaOS的玩客云主机中部署小雅AList&#xff0c;并在AList中挂…

IT闲谈-IMD是什么,有什么优势

目录 一、引言二、IDM是什么&#xff1f;三、IDM的优势1. 高速下载2. 稳定性强3. 强大的任务管理4. 视频下载5. 浏览器整合 四、应用场景1. 商务办公2. 教育学习3. 娱乐休闲 总结 一、引言 在数字化时代&#xff0c;下载管理器已成为我们日常工作和生活中不可或缺的工具。而在…

【CentOS 7】CentOS 7极致指南:高级部署PyCharm 2022.3.3专业版,实现定制化配置与无缝桌面集成

【CentOS 7】CentOS 7极致指南&#xff1a;高级部署PyCharm 2022.3.3专业版&#xff0c;实现定制化配置与无缝桌面集成 大家好 我是寸铁&#x1f44a; 总结了一篇CentOS 7极致指南&#xff1a;高级部署PyCharm 2022.3.3专业版&#xff0c;实现定制化配置与无缝桌面集成✨ 喜欢的…

关于CodeCombat(沙漠)布朗噪声的攻略

关于CodeCombat(沙漠)//布朗噪声的攻略 总的来说怎么猥琐怎么来 1.走到墙角骷髅看不到的位置&#xff0c;让宠物制造噪音&#xff0c;然后英雄走过去&#xff0c;就是这样没错&#xff08;坐标之类能明白) 最后看看运行结果吧 Rec 0002 希望天天开心

把chatgpt当实习生,进行matlab gui程序编程

最近朋友有个项目需要整点matlab代码&#xff0c;无奈自己对matlab这种工科的软件完全是外行&#xff0c;无奈只有求助gpt这种AI助手了。大神们告诉我们&#xff0c;chatgpt等的助手已经是大学实习生水平啦&#xff0c;通过多轮指令交互就可以让他帮你完成工作啦&#xff01;所…

如何学习Golang语言!

第一部分&#xff1a;Go语言概述 起源与设计哲学&#xff1a;Go语言由Robert Griesemer、Rob Pike和Ken Thompson三位Google工程师设计&#xff0c;旨在解决现代编程中的一些常见问题&#xff0c;如编译速度、运行效率和并发编程。主要特点&#xff1a;Go语言的语法简单、编译…

Django Forbidden (CSRF cookie not set.)解决办法

解决办法就是在setting.py文件中注释&#xff1a; django.middleware.csrf.CsrfViewMiddleware, 这个中间件是为了防止跨站请求伪造的&#xff0c;平时用网页表单请求时&#xff0c;post提交是没有问题的&#xff0c;但是用api调用时就会被禁止&#xff0c;为了能使用接口调用…

C#发送邮件的SMTP配置方法?如何群发邮件?

C#发送邮件安全性如何保障&#xff1f;C#怎么配置实现发送邮件&#xff1f; 在C#开发中&#xff0c;发送电子邮件是一个常见的需求。无论是用于注册确认、密码重置还是其他通知功能&#xff0c;SMTP&#xff08;简单邮件传输协议&#xff09;都是实现这一功能的关键。下面&…

Codeforces Round 949 (Div. 2) A~D

A. Turtle and Piggy Are Playing a Game &#xff08;思维&#xff09; 题意&#xff1a; 给出一个整数 x x x &#xff0c;使得 l ≤ x ≤ r l \le x \le r l≤x≤r &#xff0c;其中 l , r l, r l,r 为给定值。同时保证 2 l ≤ r 2l \le r 2l≤r 。 执行以下操作&…

【Docker系列】跨平台 Docker 镜像构建:深入理解`--platform`参数

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

!力扣102. 二叉树的层序遍历

给你二叉树的根节点 root &#xff0c;返回其节点值的 层序遍历 。 &#xff08;即逐层地&#xff0c;从左到右访问所有节点&#xff09;。 示例 1&#xff1a; 输入&#xff1a;root [3,9,20,null,null,15,7] 输出&#xff1a;[[3],[9,20],[15,7]] /*** Definition for…

计算机网络 ——网络层(IPv4地址)

计算机网络 ——网络层&#xff08;IPv4地址&#xff09; 什么是IPv4地址IP地址的分类特殊的IP地址 查看自己的IPv4地址 我们今天来看IPv4地址&#xff1a; 什么是IPv4地址 IPv4&#xff08;Internet Protocol version 4&#xff09;是第四版互联网协议&#xff0c;是第一个被…

电调, GPS与飞塔

电调油门行程校准&#xff1a; 断电-----油门推到最高-------电调上电-------滴滴------油门推到最低---滴滴滴---校准完成。 http://【【教程】油门行程校准&#xff08;航模&#xff0c;电机&#xff0c;电调&#xff09;】https://www.bilibili.com/video/BV1yJ411J7aX?v…

【JS】理解闭包及其应用

历史小剧场 明朝灭亡&#xff0c;并非是简单的政治问题&#xff0c;事实上&#xff0c;这是世界经济史上的一个重要案例。 所谓没钱&#xff0c;就是没有白银。----《明朝那些事儿》 什么是闭包&#xff1f; 闭包就是指有权访问另一个函数作用域中变量的函数 闭包变量存储位置&…

【深度学习】深度学习之巅:在 CentOS 7 上打造完美Python 3.10 与 PyTorch 2.3.0 环境

【深度学习】深度学习之巅&#xff1a;在 CentOS 7 上打造完美Python 3.10 与 PyTorch 2.3.0 环境 大家好 我是寸铁&#x1f44a; 总结了一篇【深度学习】深度学习之巅&#xff1a;在 CentOS 7 上打造完美Python 3.10 与 PyTorch 2.3.0 环境✨ 喜欢的小伙伴可以点点关注 &#…

有序二叉树java实现

类实现&#xff1a; package 树;import java.util.LinkedList; import java.util.Queue;public class BinaryTree {public TreeNode root;//插入public void insert(int value){//插入成功之后要return结束方法TreeNode node new TreeNode(value);//如果root为空的话插入if(r…

山东大学软件学院项目实训-创新实训-基于大模型的旅游平台(二十七)- 微服务(7)

11.1 : 同步调用的问题 11.2 异步通讯的优缺点 11.3 MQ MQ就是事件驱动架构中的Broker 安装MQ docker run \-e RABBITMQ_DEFAULT_USERxxxx \-e RABBITMQ_DEFAULT_PASSxxxxx \--name mq \--hostname mq1 \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3-management 浏览器访问1…