Flink系统知识讲解之:容错与State状态管理

Flink系统知识之:容错与State状态管理

状态在Flink中叫作State,用来保存中间计算结果或者缓存数据。根据是否需要保存中间结果,分为无状态计算和有状态计算。对于流计算而言,事件持续不断地产生,如果每次计算都是相互独立的,不依赖于上下游的事件,则是无状态计算。如果计算需要依赖于之前或者后续的事件,则是有状态计算。State是实现有状态计算下的Exactly-Once的基础。

Flink的State提供了对State的操作接口,向上对接Flink的DataStream API,让用户在开发Flink应用的时候,可以将临时数据保存到State中,从State中读取数据。在运行的时候,在运行层面上与算子、Function体系融合,自动对State进行备份(checkpoint),一旦出现异常能够从保存的State中恢复状态,实现Exactly-Once。

状态类型

按照数据结构的不同,Flink中定义了多种State,应用于不同的场景,具体如下。
(1)ValueState
即类型为T的单值状态。这个状态与对应的Key绑定,是最简单的状态。可以通过update方法更新状态值,通过value()方法获取动态值。
(2)ListState
即Key上的状态值为一个列表。可以通过add方法往列表中添加值,也可以通过get()方法返回一个Iterable来遍历状态值。
(3)ReducingState
这种状态通过用户传入的reduceFunction,每次调用add方法添加值时,会调用reduceFunction,最后合并到一个单一的状态值。
(4)AggregatingState<IN, OUT>
聚合State,和(3)不同的是,这里聚合的类型可以是不同的元素类型,使用add(IN)来加入元素,并使用AggregateFunction函数计算聚合结果。
(5)MapState<UK, UV>
使用Map存储Key-Value对,通过put(UK, UV)或者putAll(Map<UK, UV>)来添加,使用get(UK)来获取。
(6)FoldingState<T, ACC>
跟ReducingState有点类似,不过它的状态值类型可以与add方法中传入的元素类型不同。已被标记为废弃,不建议使用。

KeyedState和OperatorState

State按照是否有Key划分为KeyedState和OperatorState两种。

按是否有Key划分支持的State
KeyedStateValueState
ListState
ReducingState
AggregatingState
MapState
FoldingState
OperatorStateListState
KeyedState

在KeyedStream中使用。状态是跟特定的Key绑定的,即KeyedStream流上的每一个Key对应一个State对象。在这种情况下,可以通过getRuntimeContext.getState方法来获取每个key绑定的对应的State对象。比如,假设正在处理一个流并且对 key 进行分组,并且有一个 ValueState 来存储每个 key 的最后修改时间,那么每个唯一的 key 都会有一个独立的 ValueState 实例。
关于 Keyed State 的几点关键信息:

  • Keyed State 只能用于 KeyedStream,无法在非键控的流或操作符中使用。
  • 同一时间,一个 Keyed State 只能访问当前处理的事件的 key 的状态。
算子状态OperatorState

与KeyedState不同,OperatorState跟一个特定算子的一个实例绑定,整个算子只对应一个State实例。Operator State 可以用于保存操作符级别的信息,此信息跨所有输入数据存在。它与特定的键无关。在有状态操作符中使用 Operator State 可以用于存储和检索状态信息。例如,保存 Kafka source 的 offset 就是使用 Operator State。
Flink 使用 Operator State 进行全局操作,如读/写外部系统的偏移量,保存所有 key 的全局聚合等。比如,源(Source)操作符在 Flink 中经常使用 Operator State,以保存并恢复读取的位置。
一个重要的特性是,Flink 可以将 Operator State 分布在操作符的所有并行实例中。这意味着,当你的作业需要重新平衡(例如,操作符的并行度改变)时,Flink 可以通过特定的分布和复制策略(只复制,广播等)重新分配 Operator State。

OperatorState目前只支持使用ListState。

下述提供了一个案例方法,在算子中自定义OperatorState状态并实现自定义的快照逻辑和状态初始化逻辑:

public class MySinkFunction implements SinkFunction<Long>, CheckpointedFunction {private ListState<Long> listState;@Overridepublic void invoke(Long value, Context context) throws Exception {// 用户自定义业务逻辑listState.add(value);}@Override// Operator级别的State需要用户来实现快照保存逻辑public void snapshotState(FunctionSnapshotContext context) throws Exception {Iterable<Long> state = listState.get();System.out.println("Snapshot State: ");for(Long s : state){System.out.println(s);}}@Override// Operator级别的State需要用户来实现状态的初始化		public void initializeState(FunctionInitializationContext context) throws Exception {listState = context.getOperatorStateStore().getListState(new ListStateDescriptor<>("listState",TypeInformation.of(new TypeHint<Long>() {})));// Restore Stateif (context.isRestored()) {System.out.println("Restored State: ");for(Long s : listState.get()){System.out.println(s);}}}
}

状态描述(StateDescriptor)

State既然是暴露给用户的,那么就有一些属性是可以用户自定义设置的,如State名称,类型,序列化/反序列化器,State过期时间等。在Flink中对State状态描述称为StateDescriptor,是状态的元数据描述:

  • 类型信息:StateDescriptor中的类型信息用来告诉Flink这个状态的具体类型,这在序列化和反序列化状态时需要用到。Flink需要知道如何正确地读取和写入状态数据,以保证其正确性和一致性。
  • 状态名字:状态的名字在Flink中是唯一的,它用于在状态后端(StateBackend)中区分和查找不同的状态。
  • 默认值:对于某些类型的状态(比如 ValueState),StateDescriptor 也可以包含一个默认值。当尝试获取一个不存在的状态时,Flink 会返回这个默认值。

当从状态后端StateBackend读取State时,为什么是通过StateDescriptor来获取,而不能仅仅根据名称来获取?

Flink为什么不仅仅使用名字来获取状态,原因就在于以上关于统一性和类型安全的考虑。仅仅靠名称,Flink无法知道应该如何正确地序列化和反序列化状态,也无法给用户提供方便和牢靠的类型安全保证。另外,加入默认值这样的额外配置也会使状态的使用变得更加灵活。

StateDescriptor的体系结构图如下所示:

对应于每一类State,Flink内部都设计了对应的StateDescriptor,在任务使用State的地方,都需要通过StateDescriptor描述状态的信息。

运行时,在RichFunction和ProcessFunction中,通过RuntimeContext上下文对象,使用StateDescriptor从状态后端(StateBackend)中获取实际的State实例,然后在开发者编写的UDF中就可以使用这个State了。StateBackend中有对应则返回现有的State,没有则创建新的State。

    public class CountWithState extends RichMapFunction<Long, Long> {// ValueState 对象,保存当前的计数private transient ValueState<Long> countState;@Overridepublic void open(org.apache.flink.configuration.Configuration parameters) throws Exception {// 定义状态描述符ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>("count", // 状态名称TypeInformation.of(Long.class), // 状态类型0L); // 默认值// 用描述符从运行时上下文获取状态countState = getRuntimeContext().getState(descriptor);}@Overridepublic Long map(Long value) throws Exception {Long currCount = countState.value();currCount += value;// 更新状态countState.update(currCount);return currCount;}
}

状态存储(StateBackend)

Flink中无论是哪种类型的State,都需要被持久化到可靠存储中,才具备应用级的容错能力,State的存储在Flink中被称为StateBackend。StateBackend需要具备如下两种能力。

  • 在计算过程中中提供访问State的能力,开发者在编写业务逻辑中能够使用StateBackend的接口读写数据。
  • 能够将State持久化到外部存储,提供容错能力。

根据使用场景的不同,Flink内置了3种StateBackend,其体系结构如下所示:

  • 纯内存:MemoryStateBackend,适用于验证、测试、不推荐生产环境使用。
  • 内存+文件:FsStateBackend,适用于长周期大规模的数据。
  • RocksDB:RocksDBStateBackend,适用于长周期大规模的数据。

这3种State的关系如下所示:
在这里插入图片描述

在运行时,MemoryStateBackend和FsStateBackend本地的State都保存在TaskManager的内存中,所以其底层都依赖于HeapKeyedStateBackend。HeapKeyedStateBackend是面向Flink引擎内存的,使用者无须感知。

MemoryStateBackend和FsStateBackend

MemoryStateBackend和FsStateBackend状态存储都依赖于内存保存运行时的状态State,即它们的State都是加载到内存在运行时使用的,区别只在于当State持久化时保存的位置。

MemoryStateBackend

内存型StateBackend在Flink中叫作MemoryStateBackend,运行时所需要的State数据保存在TaskManager JVM堆上内存中,KV类型的State,窗口算子的State使用HashTable来保存数据、触发器等。执行检查点时,会把State的快照数据保存到JobManager进程的内存中。MemoryStateBackend可以使用异步的方式进行快照(也可以使用同步的方式)。推荐使用异步的方式,以避免阻塞算子处理数据。

基于内存的StateBackend不建议在生产环境下使用,可以在本地开发调试时使用。
注意点如下:

  • State存储在JobManager内存中,受限于JobManager的内存大小。
  • 每个State默认5MB,可通过MemoryStateBackend构造函数调整。
  • 每个State不能超过Akka Frame大小。

为什么Flink的MemoryStateBackend将快照的大小限制在 Akka Frame Size(默认值为 4 MB)以下?

这是因为Flink的Checkpoint过程是依赖于Akka的Actor Model进行通信的。在Snapshot的过程中,需要将数据发送给JobManager,这个发送过程是通过Akka的消息传递完成的。
对于Akka框架来说,发送的消息大小默认是有限制的,即4MB,这是为了保护系统。如果没有设置消息大小,那么一个大的消息可能会消耗大量的内存或者带宽,这是不可预见的,也可能引起系统负载问题或者导致服务拒绝。因此,Akka框架设置了这个默认限制来避免过大的消息。
因此,对于MemoryStateBackend,由于它直接将快照数据存储到JobManager内存中,需要通过Akka进行通信,这就是为什么它的快照大小被限制在 Akka Frame Size以内。

FsStateBackend

文件型StateBackend在Flink中被称作FsStateBackend,运行时所需要的State数据保存在TaskManager内存中,执行检查点时,会把State的快照数据持久化到配置的文件系统中。这使得 FsStateBackend 可以管理更大量的状态,并且即便在任务管理器失效后也可以从外部系统恢复状态。但它的快照操作可能会受磁盘速率的影响。可以使用分布式文件系统或本地文件系统,如使用HDFS的路径为:hdfs://namenode:40010/flink/checkpoints/...,使用本地文件系统的路径为:file:///data/flink/checkpoints/...

FsStateBackend可以使用在处理大状态、长窗口、或大键值状态的有状态处理任务,同时也非常适合用于高可用方案。

需要注意的点:

  • State数据首先会被存储在TaskManager的内存中。
  • State大小不能超过TaskManager内存。
  • TaskManager异步地将State数据写入外部存储。

基于RocksDB的StateBackend

RocksDBStateBackend跟上面两种StateBackend类型不同,其利用Google的RocksDB作为本地嵌入式数据库,将数据流计算的状态State存储在本地磁盘中,不会受限于TaskManager的内存大小。在执行检查点时,再将整个RocksDB中保存的State数据全量或者增量持久化到配置的文件系统中,这一过程中,会在JobManager的内存中存储少量的检查点的元数据信息。

RocksDB客服了State受内存限制的问题,同时又能持久化到远端的文件系统中,同样也适合在生产中使用。但是RocksDBStateBackend状态数据是存储在磁盘中的,相比如直接在内存中操作状态数据,读写性能肯定是会慢很多的,因此IO可能成为任务的瓶颈,导致数据流的吞吐量剧烈下降。

适用场景:

  • 最适合处理大状态、大窗口,或者大键值状态的有状态处理任务。
  • RocksDBStateBackend非常适合用于高可用方案。
  • RocksDBStateBackend是目前唯一支持增量检查点的StateBackend。增量检查点非常适用于超大状态的场景。

需要注意的点:

  • 总State大小仅限于磁盘大小,不受内存限制。
  • RocksStateBackend也需要配置外部文件系统,集中保存State。
  • RocksDB的JNI API基于byte数组,单key和单value的大小不能超过2的31次方字节。

状态持久化 (State Snapshot)

StateBackend中的数据最终需要持久化到第三方存储中,确保集群故障或者作业故障能够恢复,针对不同类型的快照策略如图所示:。

HeapSnapshotStrategy策略对应于HeapKeyedStateBackend,RocksDBStateBackend的持久化策略有两种:全量持久化策略(RocksFullSnapshotStrategy)和增量化持久策略(RocksIncementalSnapshotStrategy)。

全量持久化策略

全量持久化,也就是说每次把全量的State写入到状态存储中(如HDFS)。内存型、文件型、RocksDB类型的StateBackend都支持全量持久化策略。内存型和文件型的StateBackend依赖于HeapKeyedStateBackend,HeapKeyedStateBackend使用StateTable存储结构来存储数据。
在执行持久化存储策略的时候,使用异步机制,每个算子启动1个独立的线程,将自身的状态写入分布式存储中。在做持久化的过程中,状态可能会被持续修改,因为,Flink使用了CopyOnWriteStateTable来保证线程安全,RocksDBStateBackend则使用RockDB的快照机制,使用快照来保证线程安全。

这里简单介绍一下CopyOnWrite机制,以及Flink为什么在状态持久化时要使用CopyOnWrite机制。

CopyOnWrite 是一种优化策略,主要用于在面对频繁读取操作与偶尔需要执行的写入操作的环境中提高性能和线程安全。"Copy-on-write"的含义是只有在需要修改数据时才复制这些数据,然后在复制数据上进行修改,这样就不会影响其他线程正在读这些数据的活动。

当执行状态持久化时,会有一个独立线程来执行持久化操作,而算子的线程还会执行处理数据和写入状态的逻辑。因此,如果不加以处理,则持久化线程对状态State的读和算子线程对状态的写可能就会出现冲突,导致线程不安全。为此, Flink引入了CopyOnWrite机制,当算子线程对State状态写入时,会创建一个当前状态State的副本来写入,写入完成后,将指向原状态的指针重新指向新的状态副本,在这个过程中,修改操作是原子性的,因此新来的读请求总是能看到一致的状态(要么是旧状态,要么是新状态)。因此,在这种模式下,持久化和算子处理彼此互不影响,在能够提升线程安全的情况下,提升算子处理性能,降低持久化操作对算子本身处理任务的影响。

增量持久化策略

增量持久化就是每次持久化增量的State,只有RocksDBStateBackend支持增量持久化。
Flink增量式的检查点以RocksDB为基础,RocksDB是一个基于LSM-Tree的KV存储,新的数据保存在内存中,成为memtable。如果Key相同,后到的数据将覆盖之前的数据,一旦memtable写满了,RocksDB就会将数据压缩并写入到磁盘。memtable的数据持久化到磁盘后,就变成了不可变的sstable。
因为sstable是不可变的,Flink对比前一个检查点创建和删除的RocksDB sstable文件就可以计算出状态有哪些改变。为了确保sstable是不可变的,Flink会在RocksDB上触发刷新操作,强制将memtable刷新到磁盘上。在Flink执行检查点时,会将新的sstable持久化到存储中(如HDFS等),同时保留引用。这个过程中Flink并不会持久化本地所有的sstable,因为本地的一部分历史sstable在之前的检查点中已经持久化到存储中了,只需要增加对sstable文件的引用次数就可以。

RocksDB会在后台合并sstable并删除其中重复的数据。然后在RocksDB删除原来的sstable,替换成新合成的sstable,新的sstable包含了被删除的sstable中的信息。通过合并,历史的sstable会合并成一个新的sstable,并删除这些历史sstable,可以减少检查点的历史文件,避免大量小文件的产生。

总结

计算分为无状态计算和有状态计算两类。无状态计算不需要容错,有状态计算则必须有容错机制,这就是State的作用。在Flink中从StateBackend中获取State时需要使用StateDescriptor,StateDescriptor保存了状态的元数据信息,提供了读取State时需要的诸如名称、默认值、序列化方式等信息。
状态最终需要能够持久化到外部存储才能有效实现容错,Flink提供了三种StateBackend:MemoryStateBackend、FsStateBackend、RocksDBStateBackend。前两种都是基于HeapKeyedStateBackend来实现的,因为它们都依赖于内存保存运行时的状态数据(存储在TaskManager的堆内存中)。RocksDBStateBackend则使用RocksDB作为状态后端,状态不受内存大小的限制,但是IO可能会成为性能瓶颈。

最后,Flink在执行持久化操作时,会启动一个独立线程来执行,并利用CopyOnWrite机制,使用CopyOnWriteStateTable来保存持久化过程中的线程安全。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/2145.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python线性混合效应回归LMER分析大鼠幼崽体重数据、假设检验可视化|数据分享...

全文链接&#xff1a;https://tecdat.cn/?p38816 在数据分析领域&#xff0c;当数据呈现出层次结构时&#xff0c;传统的一般线性模型&#xff08;GLM&#xff09;可能无法充分捕捉数据的特征。混合效应回归作为GLM的扩展&#xff0c;能够有效处理这类具有层次结构的数据&…

大疆机场及无人机上云

最近基于大疆上云api进行二次开发&#xff0c;后面将按照开发步骤对其进行说明&#xff01;

【WEB】网络传输中的信息安全 - 加密、签名、数字证书与HTTPS

文章目录 1. 概述2. 网络传输安全2.1.什么是中间人攻击2.2. 加密和签名2.2.1.加密算法2.2.2.摘要2.2.3.签名 2.3.数字证书2.3.1.证书的使用2.3.2.根证书2.3.3.证书链 2.4.HTTPS 1. 概述 本篇主要是讲解讲一些安全相关的基本知识&#xff08;如加密、签名、证书等&#xff09;&…

SpringMVC

开发模式&#xff1a; &#xff08;1&#xff09;前后端不分离&#xff1a;服务端渲染 数据和结构并不分离&#xff0c;客户端发送请求后访问指定路径资源&#xff0c;服务端业务处理之后将数据组装到页面&#xff0c;并返回带数据的完整页面。 &#xff08;2&#xff09;前…

uni-app编写微信小程序使用uni-popup搭配uni-popup-dialog组件在ios自动弹出键盘。

uni-popup-dialog 对话框 将 uni-popup 的type属性改为 dialog&#xff0c;并引入对应组件即可使用对话框 &#xff0c;该组件不支持单独使用 示例 <button click"open">打开弹窗</button> <uni-popup ref"popup" type"dialog"…

UML系列之Rational Rose笔记九:组件图

一、新建组件图 二、组件图成品展示 三、工作台介绍 最主要的还是这个component组件&#xff1b; 然后还有这几个&#xff0c;正常是用不到的&#xff1b;基本的使用第四部分介绍一下&#xff1a; 四、基本使用示例 这些&#xff0c;主要是运用package还有package specifica…

数据结构《MapSet哈希表》

文章目录 一、搜索树1.1 定义1.2 模拟实现搜索 二、Map2.1 定义2.2 Map.Entry2.3 TreeMap的使用2.4 Map的常用方法 三、Set3.1 定义3.2 TreeSet的使用3.3 Set的常用方法 四、哈希表4.1 哈希表的概念4.2 冲突4.2.1 冲突的概念4.2.2 冲突的避免1. 选择合适的哈希函数2. 负载因子调…

赛灵思(Xilinx)公司Artix-7系列FPGA

苦难从不值得歌颂&#xff0c;在苦难中萃取的坚韧才值得珍视&#xff1b; 痛苦同样不必美化&#xff0c;从痛苦中开掘出希望才是壮举。 没有人是绝对意义的主角&#xff0c; 但每个人又都是自己生活剧本里的英雄。滑雪&#xff0c;是姿态优雅的“贴地飞行”&#xff0c;也有着成…

qt vs ios开发应用环境搭建和上架商店的记录

qt 下载链接如下 https://download.qt.io/new_archive/qt/5.14/5.14.2/qt-opensource-mac-x64-5.14.2.dmg 安装选项全勾选就行&#xff0c;这里特别说明下qt5.14.2/qml qt5.14.2对qml支持还算成熟&#xff0c;但很多特性还得qt6才行&#xff0c;这里用qt5.14.2主要是考虑到服…

JavaSE学习心得(反射篇)

反射 前言 获取class对象的三种方式 利用反射获取构造方法 利用反射获取成员变量 利用反射获取成员方法 练习 保存信息 跟配置文件结合动态创建 前言 接上期文章&#xff1a;JavaSE学习心得&#xff08;多线程与网络编程篇&#xff09; 教程链接&#xff1a;黑马…

FPGA 串口与HC05蓝牙模块通信

介绍 关于接线&#xff1a;HC-05蓝牙模块一共有6个引脚&#xff0c;但经过我查阅资料以及自己的实操&#xff0c;实际上只需要用到中间的4个引脚即可&#xff08;即RXD,TXD,GND,VCC&#xff09;。需要注意的是&#xff0c;蓝牙模块的RXD引脚需要接单片机的TXD引脚&#xff0c;同…

基于CiteSpace的知网专利文献计量分析与可视化

CiteSpace是一款可视化学术文献分析软件&#xff0c;它可以帮助用户分析和可视化研究领域的文献数据。适用于分析大量文献数据&#xff0c;例如由 Web of Science、Scopus 和知网等学术数据库生成的数据。图为来自CiteSpace的成图&#xff0c;是不是很美观&#xff1f;接下来我…

Gitee图形界面上传(详细步骤)

目录 1.软件安装 2.安装顺序 3.创建仓库 4.克隆远程仓库到本地电脑 提交代码的三板斧 1.软件安装 Git - Downloads (git-scm.com) Download – TortoiseGit – Windows Shell Interface to Git 2.安装顺序 1. 首先安装git-2.33.1-64-bit.exe&#xff0c;顺序不能搞错2. …

深入了解生成对抗网络(GAN):原理、实现及应用

生成对抗网络&#xff08;GAN, Generative Adversarial Networks&#xff09;是由Ian Goodfellow等人于2014年提出的一种深度学习模型&#xff0c;旨在通过对抗训练生成与真实样本相似的数据。GAN在图像生成、图像修复、超分辨率等领域取得了显著的成果。本文将深入探讨GAN的基…

Git的基本命令以及其原理(公司小白学习)

从 Git 配置、代码提交与远端同步三部分展开&#xff0c;重点讲解 Git 命令使用方式及基本原理。 了解这些并不是为了让我们掌握&#xff0c;会自己写版本控制器&#xff0c;更多的是方便大家查找BUG&#xff0c;解决BUG &#xff0c;这就和八股文一样&#xff0c;大多数都用…

信号与系统初识---信号的分类

文章目录 0.引言1.介绍2.信号的分类3.关于周期大小的求解4.实信号和复信号5.奇信号和偶信号6.能量信号和功率信号 0.引言 学习这个自动控制原理一段时间了&#xff0c;但是只写了一篇博客&#xff0c;其实主要是因为最近在打这个华数杯&#xff0c;其次是因为在补这个数学知识…

【初识扫盲】厚尾分布

厚尾分布&#xff08;Fat-tailed distribution&#xff09;是一种概率分布&#xff0c;其尾部比正态分布更“厚”&#xff0c;即尾部的概率密度更大&#xff0c;极端值出现的概率更高。 一、厚尾分布的特征 尾部概率大 在正态分布中&#xff0c;极端值&#xff08;如距离均值很…

--- 多线程编程 基本用法 java ---

随着时代的发展&#xff0c;单核cpu的发展遇到了瓶颈&#xff0c;而要提高算力就要发展多核cpu&#xff0c;他能允许多个程序同时运行&#xff0c;这时并发编程他能利用到多核的优势&#xff0c;于是就成为了时代所趋了 其实多进程编程也能进行实现并发编程&#xff0c;只不过…

Linux网络_套接字_UDP网络_TCP网络

一.UDP网络 1.socket()创建套接字 #include<sys/socket.h> int socket(int domain, int type, int protocol);domain (地址族): AF_INET网络 AF_UNIX本地 AF_INET&#xff1a;IPv4 地址族&#xff0c;适用于 IPv4 协议。用于网络通信AF_INET6&#xff1a;IPv6 地址族&a…

idea分支合并代码

步骤一 首先把两个分支的代码都提交了&#xff0c;保持和远程仓库一致&#xff0c;不要有任何没提交的代码。如果一些程序的yml配置文件&#xff0c;不想提交&#xff0c;可以复制一个&#xff0c;不受git管理。如果有没有提交的代码&#xff0c;合并分支的时候就会提示那些代…