【Flink入门修炼】2-2 Flink State 状态

  • 什么是状态?状态有什么作用?
  • 如果你来设计,对于一个流式服务,如何根据不断输入的数据计算呢?
  • 又如何做故障恢复呢?

一、为什么要管理状态

流计算不像批计算,数据是持续流入的,而不是一个确定的数据集。在进行计算的时候,不可能把之前已经输入的数据全都保存下来,然后再和新数据合并计算。效率低下不说,内存也扛不住。
另外,如果程序出现故障重启,没有之前计算过的状态保存,那么也就无法再继续计算了。

因此,就需要一个东西来记录各个算子之前已经计算过值的结果,当有新数据来的时候,直接在这个结果上计算更新。这个就是状态

常见的流处理状态功能如下:

  • 数据流中的数据有重复,我们想对重复数据去重,需要记录哪些数据已经流入过应用,当新数据流入时,根据已流入过的数据来判断去重。
  • 检查输入流是否符合某个特定的模式,需要将之前流入的元素以状态的形式缓存下来。比如,判断一个温度传感器数据流中的温度是否在持续上升。
  • 对一个时间窗口内的数据进行聚合分析,分析一个小时内某项指标的75分位或99分位的数值。
  • 在线机器学习场景下,需要根据新流入数据不断更新机器学习的模型参数。

二、state 简介

Flink的状态是由算子的子任务来创建和管理的。一个状态更新和获取的流程如下图所示,一个算子子任务接收输入流,获取对应的状态,根据新的计算结果更新状态。
image.png

状态的保存:
需要考虑的问题:

  • container 异常后,状态不丢
  • 状态可能越来越大

因此,状态不能直接放在内存中,以上两点问题都无法保证。
需要有一个外部持久化存储方式,常见的如放到 HDFS 中。(此部分读者感兴趣可自行搜索资料探索)

三、Flink 状态类型

image.png

一)Managed State 和 Raw State

  • Managed State 是由 Flink 管理的。Flink帮忙存储、恢复和优化。
  • Raw State 是开发者自己管理的,需要自己序列化(较少用到)。

在 Flink 中推荐用户使用Managed State管理状态数据 ,主要原因是 Managed State 能够更好地支持状态数据的重平衡以及更加完善的内存管理。

Managed StateRaw State
状态管理方式Flink Runtime 管理,自动存储,自动恢复,内存管理方式上优化明显用户自己管理,需要用户自己序列化
状态数据结构已知的数据结构 value , list ,mapflink不知道你存的是什么结构,都转换为二进制字节数据
使用场景大多数场景适用需要满足特殊业务,自定义operator时使用,flink满足不了你的需求时候,使用复杂

下文将重点介绍Managed State。

二)Keyed State 和 Operator State

Managed State 又有两种类型:Keyed State 和 Operator State。

keyed stateoperator state
适用场景只能应用在 KeyedSteam 上可以用于所有的算子
State 处理方式每个 key 对应一个 state,一个 operator 处理多个 key ,会访问相应的多个 state一个 operator 对应一个 state
并发改变并发改变时,state随着key在实例间迁移并发改变时需要你选择分配方式,内置:1.均匀分配 2.所有state合并后再分发给每个实例
访问方式通过RuntimeContext访问,需要operator是一个richFunction需要你实现CheckPointedFunction或ListCheckPointed接口
支持数据结构ValuedState, ListState, Reducing State, Aggregating State, MapState, FoldingState(1.4弃用)只支持 listState
Keyed State

简单来说,通过 keyBy 分组的就会用到 Keyed State。就是按照分组来的状态。(Keyed State 是Operator State的特例,区别在于 Keyed State 事先按照 key 对数据集进行了分区,每个 Key State 仅对应ー个Operator和 Key 的组合。)
image.png

Keyed State可以通过 Key Groups 进行管理,主要用于当算子并行度发生变化时,自动重新分布Keyed State数据 。分配代码如下:

// KeyGroupRangeAssignment.javapublic static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {return MathUtils.murmurHash(keyHash) % maxParallelism;}
Operator State

Operator State 可以用在所有算子上,每个算子子任务或者说每个算子实例共享一个状态,流入这个算子子任务的数据可以访问和更新这个状态。
例如 Kafka Connector 中,每一个并行的 Kafka Consumer 都在 Operator State 中维护当前 Consumer 订阅的 partiton 和 offset。
image.png

三)Flink 实现类

image.png

在开发中,需要保存的状态也有不同的数据结构,那么 Flink 也提供了相应的类。
如上图所示:

  • ValueState[T] 保存单一变量状态
  • MapState[K, V] 同 java map,保存 kv 型状态
  • ListState[T] 数组类型状态
  • ReducingState[T] 单一状态,将原状态和新状态合并后再更新
  • AggregatingState[IN, OUT] 同样是合并更新,只不过前后数据类型可以不一样

四、实践

实现一个简单的计数窗口。
输入数据是一个元组 Tuple2.of(1L, 3L),把元组的第一个元素当作 key(在示例中都 key 都是 “1”),第二个元素当 value。
该函数将出现的次数以及总和存储在 ValueState 中。 一旦出现次数达到 2,则将平均值发送到下游,并清除状态重新开始。 请注意,我们会为每个不同的 key(元组中第一个元素)保存一个单独的值。

public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {/*** The ValueState handle. The first field is the count, the second field a running sum.*/private transient ValueState<Tuple2<Long, Long>> sum;@Overridepublic void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {// access the state valueTuple2<Long, Long> currentSum = sum.value();// update the countcurrentSum.f0 += 1;// add the second field of the input valuecurrentSum.f1 += input.f1;// update the statesum.update(currentSum);// if the count reaches 2, emit the average and clear the stateif (currentSum.f0 >= 2) {out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));sum.clear();}}@Overridepublic void open(Configuration config) {ValueStateDescriptor<Tuple2<Long, Long>> descriptor =new ValueStateDescriptor<>("average", // the state nameTypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type informationTuple2.of(0L, 0L)); // default value of the state, if nothing was setsum = getRuntimeContext().getState(descriptor);}
}// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L)).keyBy(value -> value.f0).flatMap(new CountWindowAverage()).print();// the printed output will be (1,4) and (1,5)

四、小结

本节我们介绍了 Flink 状态,是用于流式计算中中间数据存储和故障恢复的。
Flink 状态分为 Raw State 和 Manage State,其中 Manage State 中又包含 Keyed State 和 Operator State。最重要的是 Keyed State 要重点理解和掌握。
在编程开发过程中,针对不同的数据结构,Flink 提供了对应的 State 类。并提供了一个 state demo 代码供学习。


参考文章:
七、Flink入门–状态管理_flink流式任务如何保证7*24小时运行-CSDN博客
Flink状态管理详解:Keyed State和Operator List State深度解析
爆肝 3 月,3w 字、15 章节详解 Flink 状态管理!(建议收藏)-腾讯云开发者社区-腾讯云(较详细)
Flink 笔记二 Flink的State–状态原理及原理剖析_flink key state是每个key对应一个state还是每个分区对应一个state-CSDN博客(源码剖析)
Flink 状态管理详解(State TTL、Operator state、Keyed state)-腾讯云开发者社区-腾讯云
Flink 源码阅读笔记(10)- State 管理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/270145.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

网络编程(3/4)

广播 ​ #include<myhead.h>int main(int argc, const char *argv[]) {//1、创建套接字int sfd socket(AF_INET, SOCK_DGRAM, 0);if(sfd -1){perror("socket error");return -1;}//2、将套接字设置成允许广播int broadcast 1;if(setsockopt(sfd, SOL_SOC…

javascript基础入门

1.第一个javascript程序 javascript程序不能够独立的运行&#xff0c;必须依赖于HTML文件&#xff0c;type属性值用来说明脚本的类型&#xff0c;这里 是指使用javascript编写的文本文件&#xff1b; 2.alert警告框 alert&#xff08;&#xff09;函数显示一条指定的信息&am…

Vue router文件中本地路由配置使用i18n【解决tab名称出现undefined,导致i18n没有实现问题】

问题 点击按钮 跳转详情页后 tab名称出现错误&#xff0c;报 undefined ## 需求 点击工单详情按钮&#xff0c;跳转详情页面&#xff08;新页面&#xff09;&#xff0c;新页面tab栏名称 还是为 工单出库&#xff0c;但要求工单出库文字配置为多语言&#xff0c;使用i18n来配置…

[云原生] K8s之pod控制器详解

Pod 是 Kubernetes 集群中能够被创建和管理的最小部署单元。所以需要有工具去操作和管理它们的生命周期,这里就需要用到控制器了。 Pod 控制器由 master 的 kube-controller-manager 组件提供&#xff0c;常见的此类控制器有 Replication Controller、ReplicaSet、Deployment、…

SQOOP安装与使用

SQOOP安装及使用 文章目录 SQOOP安装及使用SQOOP安装1、上传并解压2、修改配置文件3、修改环境变量4、添加MySQL连接驱动5、测试 准备MySQL数据登录MySQL数据库创建student数据库切换数据库并导入数据另外一种导入数据的方式使用Navicat运行SQL文件导出MySQL数据库 importMySQL…

购买使用静态住宅代理IP前,你需要测试的5件事

静态住宅代理IP&#xff0c;是一种在网络通信过程中提供固定IP地址的代理服务。与动态代理IP相比&#xff0c;静态代理IP提供的是持久且不变的IP地址。这种稳定性使得静态代理IP在需要长期稳定网络身份的场景中&#xff0c;如跨境电商/社媒养号、网络监控、品牌保护、长期数据爬…

安卓使用ExoPlayer出现膨胀类异常

1.导包 implementation com.google.android.exoplayer:exoplayer-core:2.15.1implementation com.google.android.exoplayer:exoplayer-ui:2.15.1 2.在Androidifest.xml加入权限&#xff0c;我这里加了网络与读写权限 <uses-permission android:name"android.permissio…

windows中使用nnUNet的nnUNet_convert_decathlon_task提示路径不对

找到问题并且解决解决办法 报错时候的指令 nnUNet_convert_decathlon_task -i D:\桌面\nnUNet\DATASET\nnUNet_raw\nnUNet_raw_data\Task05_Prostate 修改为 nnUNet_convert_decathlon_task -i D:/桌面/nnUNet/DATASET/nnUNet_raw/nnUNet_raw_data/Task05_Prostate 修改点&…

H5双人五子棋小游戏

H5小游戏源码、JS开发网页小游戏开源源码大合集。无需运行环境,解压后浏览器直接打开。有需要的,私信本人,发演示地址,可以后再订阅,发源码,含60+小游戏源码。如五子棋、象棋、植物大战僵尸、开心消消乐、扑鱼达人、飞机大战等等 <!DOCTYPE html> <html> <…

移动开发:网格视图

一、在新建GridView模块下添加图片以及创建cell.xml文件 1.粘贴图片时选择红框中的路径&#xff0c;点击“OK” 2.在路径后添加-mdpi后缀,再点击“OK” 二、相关代码块 1.MainActivity.java文件代码 package com.example.gridview;import androidx.appcompat.app.AppCompatAc…

备考2024年北京高考数学:20114~2023十年选择题练习和解析

距离2024年高考还有三个月的时间&#xff0c;如何用三个月的时间再提高北京数学高考的成绩&#xff1f;吃透历年真题以及背后的知识点是行之有效的方法 之一。 今天我们来看一下2014-2023年的北京市高考数学的选择题&#xff0c;从过去十年&#xff08;2014-2023&#xff09;的…

面试问答总结之并发编程

文章目录 &#x1f412;个人主页&#xff1a;信计2102罗铠威&#x1f3c5;JavaEE系列专栏&#x1f4d6;前言&#xff1a;&#x1f380;多线程的优点、缺点&#x1f415;并发编程的核心问题 &#xff1a;不可见性、乱序性、非原子性&#x1fa80;不可见性&#x1fa80;乱序性&am…

【真机Bug】异步加载资源未完成访问单例导致资源创建失败

1.错误表现描述 抽卡时&#xff0c;10抽展示界面为A。抽取内容可能是整卡或者碎片&#xff0c;抽到整卡&#xff0c;会有立绘展示和点击详情的按钮。点击详情后出现详情页B。【此时界面A预制体被销毁&#xff0c;卡片数据进入数据缓存池】点击页面B的返回按钮&#xff0c;单例…

刚刚,OpenAI官方发文驳斥马斯克,自曝8年间邮件往来截图

文章开篇表示&#xff1a;「OpenAI 的使命是确保 AGI 惠及全人类&#xff0c;这意味着既要构建安全、有益的 AGI&#xff0c;又要帮助创造广泛的利益。我们正在分享我们在实现使命方面所学到的知识&#xff0c;以及有关我们与马斯克关系的一些事实。我们打算驳回马斯克的所有主…

回归啦!!!

消失的日子在实习&#xff0c;今天最后一天了来看看自己的学习日志&#xff0c;有没有可以和小伙伴交流的部分吧&#xff01; 目录 一、产品one ①简介 ②底层原理 ③知识点一 作用一&#xff1a;日志采集 作用二&#xff1a;实时监测 作用三&#xff1a;规则匹配 作用…

Word中Endnote加载项不见了怎么处理?

打开word-①文件-②选项-③加载项 勾选endnote cite while you write 完成上述操作后&#xff0c;endnote便出现在菜单栏中。

[C语言]——分支和循环(2)

目录 一.逻辑操作符&#xff1a;&& , || , &#xff01; 1.逻辑取反运算符! 2.与运算符&& 3.或运算符 4.练习&#xff1a;闰年的判断 5.短路 二.switch语句 1.if语句和switch语句的对比 2.switch语句中的break 3.switch语句中的default 4.switch…

【C++】类的默认成员函数(上)

&#x1f525;博客主页&#xff1a; 小羊失眠啦. &#x1f3a5;系列专栏&#xff1a;《C语言》 《数据结构》 《C》 《Linux》 《Cpolar》 ❤️感谢大家点赞&#x1f44d;收藏⭐评论✍️ 文章目录 一、默认成员函数二、构造函数构造函数的概念及特性 三、析构函数析构函数的特性…

论文笔记:Compact Multi-Party Confidential Transactions

https://link.springer.com/chapter/10.1007/978-3-030-65411-5_21 A compact, private, Multi-Party Confidential Transactions (MCT) 紧凑型多方机密交易&#xff08;Compact MCT&#xff09;&#xff1a;MCT的长度与常规的单一所有者交易一样短&#xff1b;换句话说&…

【大数据架构(3)】Lambda vs. Kappa Architecture-选择你需要的架构

文章目录 一. Data Processing Architectures1. Lambda Architecture1.1. 架构说明a. Data Ingestion Layerb. Batch Layer (Batch processing)c. Speed Layer (Real-Time Data Processing)d. Serving Layer 1.2. Lambda Architecture的优缺点1.3. 使用案例 2. Kappa Architect…