Flink四大基石之CheckPoint

1、State Vs Checkpoint

State:状态,是Flink中某一个Operator在某一个时刻的状态,如maxBy/sum,注意State存的是历史数据/状态,存在内存中。

Checkpoint:快照点, 是Flink中所有有状态的Operator在某一个时刻的State快照信息/存档信息。

一句话概括: Checkpoint就是State的快照。

2、设置Checkpoint

package com.bigdata.day06;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class _01CheckPointDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 在windows运行,将数据提交hdfs,会出现权限问题,使用这个语句解决。System.setProperty("HADOOP_USER_NAME", "root");// 在这个基础之上,添加快照// 第一句:开启快照,每隔1s保存一次快照env.enableCheckpointing(1000);// 第二句:设置快照保存的位置env.setStateBackend(new FsStateBackend("hdfs://bigdata01:9820/flink/checkpoint"));// 第三句: 通过webui的cancel按钮,取消flink的job时,不删除HDFS的checkpoint目录env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//2. source-加载数据DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 9999);SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String s) throws Exception {String[] arr = s.split(",");return Tuple2.of(arr[0], Integer.valueOf(arr[1]));}});//3. transformation-数据处理转换SingleOutputStreamOperator<Tuple2<String, Integer>> result = mapStream.keyBy(0).sum(1);result.print();//4. sink-数据输出//5. execute-执行env.execute();}
}

测试代码效果:启动本地的nc, 启动hdfs服务。

启动代码,发现有权限问题:

Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=admin, access=WRITE, inode="/flink":root:supergroup:drwxr-xr-x

解决方案:

System.setProperty("HADOOP_USER_NAME", "root");在设置检查点之前,设置一句这样带权限的语句,如果是集群运行,不存在该问题。可以不设置!!!

查看快照情况:

运行,刷新查看checkpoint保存的数据,它会先生成一个新的文件夹,然后再删除老的文件夹,在某一时刻,会出现两个文件夹同时存在的情况。

 

启动HDFS、Flink

[root@hadoop10 app]#start-dfs.sh
[root@hadoop10 app]#start-cluster.sh

数据是保存了,但是并没有起作用,想起作用需要在集群上运行,以下演示集群上的效果:

第一次运行的时候 

在本地先clean, 再package ,再Wagon一下:

flink run -c 全类名 /opt/app/flink-test-1.0-SNAPSHOT.jarflink run -c com.bigdata.day06._01CheckPointDemo /opt/app/FlinkDemo-1.0-SNAPSHOT.jar记得,先启动nc ,再启动任务,否则报错!

 通过nc -lk 9999 输入以下内容:

 

想查看运行结果,可以通过使用的slot数量判断一下:

 取消flink job的运行

 查看一下这次的单词统计到哪个数字了:

第二次运行的时候 

flink run -c 全类名  -s hdfs://hadoop10:8020/flink-checkpoint/293395ef7e496bda2eddd153a18d5212/chk-34  /opt/app/flink-test-1.0-SNAPSHOT.jar启动:
flink run -c com.bigdata.day06._01CheckPointDemo -s hdfs://bigdata01:9820/flink/checkpoint/bf416df7225b264fc34f8ff7e3746efe/chk-603  /opt/app/FlinkDemo-1.0-SNAPSHOT.jar

-s 指定从checkpoint目录恢复状态数据 ,注意每个人都不一样

从上一次离开时,截止的checkpoint目录

 

 观察数据:输入一个hello,1 得到新的结果hello,8

3、重启策略

重启策略的意义:流式数据是不可能停止的,假如有一条错误数据导致程序直接退出,后面的大量数据是会丢失的,对公司来讲,意义是重大的,损失是惨重的。

重启策略是一个单独的策略,如果你配置了 checkpoint 含有重启策略的,如果你没有 checkpoint 也可以自行配置重启策略,总之重启策略和 checkpoint 没有必然联系。

就是一个流在运行过程中,假如出现了程序异常问题,可以进行重启,比如,在代码中人为添加一些异常:

 进行wordcount时,输入了一个bug,1 人为触发异常。

注意:此时如果有checkpoint ,是不会出现异常的,需要将checkpoint的代码关闭,再重启程序。会发现打印了异常,那为什么checkpoint的时候不打印,因为并没有log4j的配置文件,需要搞一个这样的配置文件才行。

程序中添加log4J的代码:

# Global logging configuration
#  Debug   info   warn  error
log4j.rootLogger=debug, stdout
# MyBatis logging configuration...
log4j.logger.org.mybatis.example.BlogMapper=TRACE
# Console output...
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p [%t] - %m%n

 

那为什么开启检查点之后,报错了程序还在运行?因为开启检查点之后,程序会进行自动重启(无限重启【程序错了才重启】)

//开启checkpoint,默认是无限重启,可以设置为不重启
//env.setRestartStrategy(RestartStrategies.noRestart());//重启3次,重启时间间隔是10s
//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));//2分钟内重启3次,重启时间间隔是5s
env.setRestartStrategy(RestartStrategies.failureRateRestart(3,Time.of(2,TimeUnit.MINUTES),Time.of(5,TimeUnit.SECONDS))
);env.execute("checkpoint自动重启");   //最后一句execute可以设置jobName,显示在8081界面

 程序如果上传至服务器端运行,可以看到重启状态

完整代码如下:

package com.bigdata.day06;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.concurrent.TimeUnit;public class Demo02 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 代码中不能有checkpoint,不是说checkpoint不好,而是太好了,它已经自带重试机制了。而且是无限重启的// 通过如下方式可将重试机制关掉// env.setRestartStrategy(RestartStrategies.noRestart());//// 两种办法// 第一种办法:重试3次,每一次间隔10S//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));// 第二种写法:在2分钟内,重启3次,每次间隔10senv.setRestartStrategy(RestartStrategies.failureRateRestart(3,Time.of(2,TimeUnit.MINUTES),Time.of(5,TimeUnit.SECONDS)));//2. source-加载数据DataStreamSource<String> streamSource = env.socketTextStream("bigdata01", 8899);streamSource.map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {String[] arr = value.split(",");String word = arr[0];if(word.equals("bug")){throw new Exception("有异常,服务会挂掉.....");}// 将一个字符串变为int类型int num = Integer.valueOf(arr[1]);// 第二种将字符串变为数字的方法System.out.println(Integer.parseInt(arr[1]));Tuple2<String, Integer> tuple2 = new Tuple2<>(word,num);// 还有什么方法? 第二种创建tuple的方法Tuple2<String, Integer> tuple2_2 = Tuple2.of(word,num);return tuple2;}}).keyBy(tuple->tuple.f0).sum(1).print();//3. transformation-数据处理转换//4. sink-数据输出//5. execute-执行env.execute();}
}

在本地测试,是没有办法看到重试机制的现象的,需要打包上传至集群,特别注意:使用的类名到底是哪一个。

4、savePoint

checkpoint自动完成state快照、savePoint是手动的完成快照。

如果程序在没有设置checkpoint的情况,可以通过savePoint设置state快照

1.提交一个flink job --提交的还是重启策略的代码打成的jar包

flink run -c 全类名 /opt/app/flink-test-1.0-SNAPSHOT.jar

2.输入一些数据,观察单词对应的数字的变化

3.执行savepoint操作

以下是 -->  停止flink job,并且触发savepoint操作
flink stop --savepointPath  hdfs://bigdata01:9820/flink-savepoint  152e493da9cdeb327f6cbbad5a7f8e41后面的序号为Job 的ID以下是 -->  不会停止flink的job,只是完成savepoint操作
flink savepoint 79f53c5c0bb3563b6b6ed3011176c411 hdfs://bigdata01:9820/flink-savepoint

备注:如何正确停止一个 flink 的任务

flink stop 6a27b580aa5c6b57766ae6241d9270ce(任务编号)

4.查看最近完成的flink job对应的savepoint

5.根据之前的savepoint路径,重新启动flink job

flink run -c 全类名 -s hdfs://hadoop10:8020/flink-savepoint/savepoint-79f53c-64b5d94771eb /opt/app/flink-test-1.0-SNAPSHOT.jar
Job has been submitted with JobID 3766ec9ff6f34b46376493a04b50a1f4

 再次输入单词,可以看到在之前的基础上累加

另外,在集群中运行我们的程序,默认并行度为1,它不会按照机器的CPU核数,而是按照配置文件中的一个默认值运行的。比如:flink-conf.yaml

web-ui 界面提交作业:

 

这个图形化界面,跟我们使用如下命令是一个效果:

flink run -c com.bigdata.day06._01CheckPointDemo -s hdfs://bigdata01:9820/flink/checkpoint/bf416df7225b264fc34f8ff7e3746efe/chk-603  /opt/app/FlinkDemo-1.0-SNAPSHOT.jar

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/481872.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于TensorFlow的手写体数字识别训练与测试

需求&#xff1a; 选择一个最简单的细分方向&#xff0c;初步了解AI图像识别的训练、测试过程TensorFlow、PyTorch、c&#xff0c;三种代码方案&#xff0c;先从TensorFlow入手探讨最基本问题的优化问题 总结&#xff1a; 基于TensorFlow的python代码库自带了mnist 训练数据…

YOLO系列论文综述(从YOLOv1到YOLOv11)【第11篇:YOLO变体——YOLO+Transformers、DAMO、PP、NAS】

YOLO变体 1 DAMO-YOLO2 PP-YOLO, PP-YOLOv2, and PP-YOLOE2.1 PP-YOLO数据增强和预处理2.2 PP-YOLOv22.3 PP-YOLOE 3 YOLO-NAS4 YOLO Transformers5 YOLOv1-v8及变体网络结构总结 YOLO系列博文&#xff1a; 【第1篇&#xff1a;概述物体检测算法发展史、YOLO应用领域、评价指标…

SE16N 外键校验报错问题

问题&#xff1a; SE16N维护时&#xff0c;偶尔有一些莫名奇妙的校验报错&#xff0c;条目XX在表XX中不存在&#xff0c;但是实际数据时存在的。 分析&#xff1a; DEBUG过程中&#xff0c;定位到数据校验部分&#xff0c;发现当外键定义的关联字段中存在某些不在对应维护表中…

【数据结构】二叉搜索树(二叉排序树)

&#x1f31f;&#x1f31f;作者主页&#xff1a;ephemerals__ &#x1f31f;&#x1f31f;所属专栏&#xff1a;数据结构 目录 前言 一、什么是二叉搜索树 二、二叉搜索树的实现 节点 属性和接口的声明 插入 查找 删除 拷贝构造 析构 中序遍历 三、二叉搜索树的…

【接口自动化测试】一文从3000字从0到1详解接口测试用例设计

接口自动化测试是软件测试中的一种重要手段&#xff0c;它能有效提高测试效率和测试覆盖率。在进行接口自动化测试之前&#xff0c;首先需要进行接口测试用例的设计。本文将从0到1详细且规范的介绍接口测试用例设计的过程&#xff0c;帮助读者快速掌握这一技能。 一、了解接口…

使用 PDF API 合并 PDF 文件

内容来源&#xff1a; 如何在 Mac 上合并 PDF 文件 1. 注册与认证 您可以注册一个免费的 ComPDFKit API 帐户&#xff0c;该帐户允许您在 30 天内免费无限制地处理 1,000 多个文档。 ComPDFKit API 使用 JSON Web Tokens 方法进行安全身份验证。从控制面板获取您的公钥和密钥&…

微服务即时通讯系统的实现(服务端)----(2)

目录 1. 语音识别子服务的实现1.1 功能设计1.2 模块划分1.3 模块功能示意图1.4 接口的实现 2. 文件存储子服务的实现2.1 功能设计2.2 模块划分2.3 模块功能示意图2.4 接口的实现 3. 用户管理子服务的实现3.1 功能设计3.2 模块划分3.3 功能模块示意图3.4 数据管理3.4.1 关系数据…

Scala—列表(可变ListBuffer、不可变List)用法详解

Scala集合概述-链接 大家可以点击上方链接&#xff0c;先对Scala的集合有一个整体的概念&#x1f923;&#x1f923;&#x1f923; 在 Scala 中&#xff0c;列表&#xff08;List&#xff09;分为不可变列表&#xff08;List&#xff09;和可变列表&#xff08;ListBuffer&…

Android 系统之Init进程分析

1、Init进程流程 2、Init细节逻辑 2.1 Init触发shutdown init进程触发系统重启是一个很合理的逻辑&#xff0c;为什么合理&#xff1f; init进程是android世界的一切基石&#xff0c;如果android世界的某些服务或者进程出现异常&#xff0c;那么会导致整个系统无法正常使用…

NVR录像机汇聚管理EasyNVR多个NVR同时管理基于B/S架构的技术特点与能力应用

EasyNVR视频融合平台基于云边端协同设计&#xff0c;能够轻松接入并管理海量的视频数据。该平台兼容性强、拓展灵活&#xff0c;提供了视频监控直播、录像存储、云存储服务、回放检索以及平台级联等一系列功能。B/S架构使得EasyNVR实现了视频监控的多元化兼容与高效管理。 其采…

使用ffmpeg命令实现视频文件间隔提取帧图片

将视频按每隔五秒从视频中提取一张图片 使用 ffmpeg 工具&#xff0c;通过设置 -vf&#xff08;视频过滤器&#xff09;和 -vsync 选项 命令格式 ffmpeg -i input_video.mp4 -vf "fps1/5" output_%03d.png 解释&#xff1a; -i input_video.mp4&#xff1a;指定输…

Java安全—原生反序列化重写方法链条分析触发类

前言 在Java安全中反序列化是一个非常重要点&#xff0c;有原生态的反序列化&#xff0c;还有一些特定漏洞情况下的。今天主要讲一下原生态的反序列化&#xff0c;这部分内容对于没Java基础的来说可能有点难&#xff0c;包括我。 序列化与反序列化 序列化&#xff1a;将内存…

现代网络架构PCI DSS合规范围确定和网络分割措施实施探讨

本文为atsec和作者技术共享类文章&#xff0c;旨在共同探讨信息安全业界的相关话题。未经许可&#xff0c;任何单位及个人不得以任何方式或理由对本文的任何内容进行修改。转载请注明&#xff1a;atsec信息安全和作者名称 1 引言 支付卡行业数据安全标准 &#xff08;P…

docker快速部署gitlab

文章目录 场景部署步骤默认账号密码效果 场景 新增了一台机器, 在初始化本地开发环境&#xff0c;docker快速部署gitlab 部署步骤 编写dockerfile version: 3.7services:gitlab:image: gitlab/gitlab-ce:latestcontainer_name: gitlabrestart: alwayshostname: gitlabenviron…

Kubernetes 01

MESOS&#xff1a;APACHE 分布式资源管理框架 2019-5 Twitter退出&#xff0c;转向使用Kubernetes Docker Swarm 与Docker绑定&#xff0c;只对Docker的资源管理框架&#xff0c;阿里云默认Kubernetes Kubernetes&#xff1a;Google 10年的容器化基础框架&#xff0c;borg…

芯科科技率先支持Matter 1.4,推动智能家居迈向新高度

Matter 1.4引入核心增强功能、支持新设备类型&#xff0c;持续推进智能家居互联互通 近日&#xff0c;连接标准联盟&#xff08;Connectivity Standard Alliance&#xff0c;CSA&#xff09;发布了Matter 1.4标准版本。作为连接标准联盟的重要成员之一&#xff0c;以及Matter标…

Redis 分布式锁实现方案

一、概述 分布式锁&#xff0c;即分布式系统中的锁。在单体应用中我们通过锁解决的是控制共享资源访问的问题&#xff0c;而分布式锁&#xff0c;就是解决了分布式系统中控制共享资源访问的问题。与单体应用不同的是&#xff0c;分布式系统中竞争共享资源的最小粒度从线程升级…

数据结构-最小生成树

一.最小生成树的定义 从V个顶点的图里生成的一颗树&#xff0c;这颗树有V个顶点是连通的&#xff0c;有V-1条边&#xff0c;并且边的权值和是最小的,而且不能有回路 二.Prim算法 Prim算法又叫加点法&#xff0c;算法比较适合稠密图 每次把边权最小的顶点加入到树中&#xff0…

ASP.NET Web(.Net Framework) Http服务器搭建以及IIS站点发布

ASP.NET Web&#xff08;.Net Framework&#xff09; Http服务器搭建以及IIS站点发布 介绍创建ASP.NET Web &#xff08;.Net Framework&#xff09;http服务器创建项目创建脚本部署Http站点服务器测试 Get测试编写刚才的TestWebController.cs代码如下测试写法1测试写法2 Post测…

【AI系统】昇腾 AI 架构介绍

昇腾 AI 架构介绍 昇腾计算的基础软硬件是产业的核⼼&#xff0c;也是 AI 计算能⼒的来源。华为&#xff0c;作为昇腾计算产业⽣态的⼀员&#xff0c;是基础软硬件系统的核⼼贡献者。昇腾计算软硬件包括硬件系统、基础软件和应⽤使能等。 而本书介绍的 AI 系统整体架构&#…