【详细介绍及演示】Flink之checkpoint检查点的使用

目录

一、介绍

二、 设置checkpoint检查点演示

1、 代码演示

 2、测试代码效果

3、查看快照情况

​编辑

三、在集群上运行

1、第一次运行

2、第二次运行

四、自定义检查点savePoint

1、提交一个flink job  打成jar包

2、输入一些数据,观察单词对应的数字的变化

​编辑 3、执行savepoint操作,添加检查点

 4、查看最近完成的flink job对应的savepoint

5、重新启动flink job,进行测试

6、观察变化

五、总结


一、介绍

Checkpoint:快照点, 是Flink中所有有状态的Operator在某一个时刻的State快照信息/存档信息。

一句话概括: Checkpoint就是State的快照。

二、 设置checkpoint检查点演示

简单的举例说明:

1、 代码演示

package com.bigdata.checkpoint;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;import java.util.concurrent.TimeUnit;public class CheckPointWordCountDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 在windows运行,将数据提交hdfs,会出现权限问题,使用这个语句解决。System.setProperty("HADOOP_USER_NAME", "root");// 在这个基础之上,添加快照// 第一句:开启快照,每隔1s保存一次快照env.enableCheckpointing(1000);// 第二句:设置快照保存的位置env.setStateBackend(new FsStateBackend("hdfs://bigdata01:9820/flink/checkpoint"));// 第三句: 通过webui的cancel按钮,取消flink的job时,不删除HDFS的checkpoint目录env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//checkpoint默认重启策略是一直重启,可以自己定义重启策略//重启策略可以单独使用,不设置checkpoint也可使用   //savepoint可以手动使用命令设置checkpoint//2分钟内重启3次,重启时间间隔是5senv.setRestartStrategy(RestartStrategies.failureRateRestart(3,Time.of(2, TimeUnit.MINUTES),Time.of(5,TimeUnit.SECONDS)));//2. source-加载数据DataStreamSource<String> source = env.socketTextStream("localhost", 9999);//3. transformation-数据处理转换source.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String line, Collector<String> collector) throws Exception {String[] arr = line.split(" ");for (String word : arr) {collector.collect(word);}}}).map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String, Integer> map(String word) throws Exception {return Tuple2.of(word,1);}}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> tuple2) throws Exception {return tuple2.f0;}}).sum(1).print();//4. sink-数据输出//5. execute-执行env.execute();}
}

 2、测试代码效果

首先启动本地的nc, 启动hdfs服务

3、查看快照情况

运行,刷新查看checkpoint保存的数据,它会先生成一个新的文件夹,然后再删除老的文件夹,在某一时刻,会出现两个文件夹同时存在的情况。

三、在集群上运行

 首先启动flink:start-cluster.sh

由上一步可以发现数据是保存了,但是并没有起作用,想起作用需要在集群上运行,以下演示集群上的效果:

1、第一次运行

在本地先clean, 再package ,再Wagon一下:

在bigdata01服务器上执行以下命令 

#flink run -c 全类名 /opt/app/flink-test-1.0-SNAPSHOT.jarflink run -c com.bigdata.day06._01CheckPointDemo /opt/app/FlinkDemo-1.0-SNAPSHOT.jar#记得,先启动nc ,再启动任务,否则报错!

通过nc -lk 9999 输入以下内容:

进入bigdata01:8081页面上查看结果 

想查看运行结果,可以通过使用的slot数量判断一下:

取消flink job的运行

 查看一下这次的单词统计到哪个数字了:

2、第二次运行

#flink run -c 全类名  -s hdfs://hadoop10:8020/flink-checkpoint/293395ef7e496bda2eddd153a18d5212/chk-34  /opt/app/flink-test-1.0-SNAPSHOT.jar#启动
flink run -c com.bigdata.day06._01CheckPointDemo -s hdfs://bigdata01:9820/flink/checkpoint/bf416df7225b264fc34f8ff7e3746efe/chk-603  /opt/app/FlinkDemo-1.0-SNAPSHOT.jar

-s 指定从checkpoint目录恢复状态数据 ,注意每个人都不一样

从上一次离开时,截止的checkpoint目录

观察数据:在nc 上输入一个hello,1 得到新的结果hello,8

四、自定义检查点savePoint

checkpoint自动完成state快照、savePoint是手动的完成快照。

如果程序在没有设置checkpoint的情况,可以通过savePoint设置state快照

1、提交一个flink job  打成jar包

package com.bigdata.checkpoint;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;import java.util.concurrent.TimeUnit;public class CheckPointWordCountDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//重启策略可以单独使用,不设置checkpoint也可使用  //savepoint可以手动使用命令设置checkpoint//2分钟内重启3次,重启时间间隔是5senv.setRestartStrategy(RestartStrategies.failureRateRestart(3,Time.of(2, TimeUnit.MINUTES),Time.of(5,TimeUnit.SECONDS)));//2. source-加载数据DataStreamSource<String> source = env.socketTextStream("bigdata01", 2727);//3. transformation-数据处理转换source.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String line, Collector<String> collector) throws Exception {String[] arr = line.split(",");for (String word : arr) {collector.collect(word);}}}).map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String, Integer> map(String word) throws Exception {//自制一个bug用来测试if(word.equals("bug")){throw new Exception("出错了,请重试");}return Tuple2.of(word,1);}}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> tuple2) throws Exception {return tuple2.f0;}}).sum(1).print();//4. sink-数据输出//5. execute-执行env.execute();}
}

执行改任务 

2、输入一些数据,观察单词对应的数字的变化

 3、执行savepoint操作,添加检查点

  • 停止flink job,并且触发savepoint操作
flink stop --savepointPath  hdfs://bigdata01:9820/flink-savepoint  152e493da9cdeb327f6cbbad5a7f8e41

    后面的序号为Job 的ID

  • 不会停止flink的job,只是完成savepoint操作(执行这个操作)
flink savepoint 79f53c5c0bb3563b6b6ed3011176c411 hdfs://bigdata01:9820/flink-savepoint
  •  停止一个 flink 的任务
flink stop 6a27b580aa5c6b57766ae6241d9270ce

    后面的序号为Job 的ID

 4、查看最近完成的flink job对应的savepoint

 

发现任务中已经有检查点 

5、重新启动flink job,进行测试

停止任务后,查看最终检查点的路径

然后重新启动

flink run -c com.bigdata.day06._01CheckPointDemo -s hdfs://bigdata01:9820/flink-savepoint/savepoint-79f53c-64b5d94771eb /opt/app/flink-test-1.0-SNAPSHOT.jar

-c后是全类名,-s 后是检查的路径 ,最后一部分是jar包的位置

6、观察变化

再次输入单词,可以看到在之前的基础上累加

另外,在集群中运行我们的程序,默认并行度为1,它不会按照机器的CPU核数,而是按照配置文件中的一个默认值运行的

五、总结

有两种添加检查点的方式:

1、在java代码中自动添加

在执行任务时会在hdfs上创建检查点

// 第一句:开启快照,每隔1s保存一次快照
env.enableCheckpointing(1000);
// 第二句:设置快照保存的位置
env.setStateBackend(new FsStateBackend("hdfs://bigdata01:9820/flink/checkpoint"));
// 第三句: 通过webui的cancel按钮,取消flink的job时,不删除HDFS的checkpoint目录env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

2、在集群上通过命令在指定位置手动添加

flink savepoint 任务号  hdfs://bigdata01:9820/flink-savepoint

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/479589.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JAVA篇05 —— 内部类(Local、Anonymous、Member、Static)

欢迎来到我的主页&#xff1a;【一只认真写代码的程序猿】 本篇文章收录于专栏【小小爪哇】 如果这篇文章对你有帮助&#xff0c;希望点赞收藏加关注啦~ 目录 1 内部类Inner Class 1.1 局部内部类 1.2 匿名内部类&#xff08;※※&#xff09; 1.3 匿名类最佳实践&#xf…

Spring Boot 与 Spring Cloud Alibaba 版本兼容对照

版本选择要点 Spring Boot 3.x 与 Spring Cloud Alibaba 2022.0.x Spring Boot 3.x 基于 Jakarta EE&#xff0c;javax.* 更换为 jakarta.*。 需要使用 Spring Cloud 2022.0.x 和 Spring Cloud Alibaba 2022.0.x。 Alibaba 2022.0.x 对 Spring Boot 3.x 的支持在其发行说明中…

jsp的pageContext对象

jsp的pageContext对象 是页面的上下文对象&#xff0c;表示当前页面运行环境&#xff0c;用于获取当前页面jsp页面信息&#xff0c;作用范围为当前的jsp页面 pageContext对象可以访问当前页面的所有jsp内置对象 jsp的四种内置对象 4中作用域&#xff1a;pagecontext,request…

网络安全在数字时代保护库存数据中的作用

如今&#xff0c;通过软件管理库存已成为一种标准做法。企业使用数字工具来跟踪库存水平、管理供应链和规划财务。 然而&#xff0c;技术的便利性也带来了网络威胁的风险。黑客将库存数据视为有价值的目标。保护这些数据不仅重要&#xff0c;而且必不可少。 了解网络安全及其…

Python图像处理:打造平滑液化效果动画

液化动画中的强度变化是通过在每一帧中逐渐调整液化效果的强度参数来实现的。在提供的代码示例中&#xff0c;强度变化是通过一个简单的线性插值方法来控制的&#xff0c;即随着动画帧数的增加&#xff0c;液化效果的强度也逐渐增加。 def liquify_image(image, center, radius…

day2全局注册

全局注册代码&#xff1a; //文件核心作用&#xff1a;导入App.vue,基于App.vue创建结构渲染index.htmlimport Vue from vue import App from ./App.vue //编写导入的代码&#xff0c;往代码的顶部编写&#xff08;规范&#xff09; import HmButton from ./components/Hm-But…

wireshark基础

免责声明&#xff1a; 笔记的只是方便各位师傅学习知识&#xff0c;以下代码、网站只涉及学习内容&#xff0c;其他的都与本人无关&#xff0c;切莫逾越法律红线&#xff0c;否则后果自负。 泷羽sec官网&#xff1a;https://longyusec.com/ 泷羽sec B站地址&#xff1a;https:/…

学习笔记037——Java中【Synchronized锁】

文章目录 1、修饰方法1.1、静态方法&#xff0c;锁定的是类1.2、非静态方法&#xff0c;锁定的是方法的调用者&#xff08;对象&#xff09; 2、修饰代码块&#xff0c;锁定的是传入的对象2.1、没有锁之前&#xff1a;2.2、有锁后&#xff1a; 实现线程同步&#xff0c;让多个线…

⭐️ GitHub Star 数量前十的工作流项目

文章开始前&#xff0c;我们先做个小调查&#xff1a;在日常工作中&#xff0c;你会使用自动化工作流工具吗&#xff1f;&#x1f64b; 事实上&#xff0c;工作流工具已经变成了提升效率的关键。其实在此之前我们已经写过一篇博客&#xff0c;跟大家分享五个好用的工作流工具。…

智能桥梁安全运行监测系统守护桥梁安全卫士

一、方案背景 桥梁作为交通基础设施中不可或缺的重要组成部分&#xff0c;其安全稳定的运行直接关联到广大人民群众的生命财产安全以及整个社会的稳定与和谐。桥梁不仅是连接两地的通道&#xff0c;更是经济发展和社会进步的重要纽带。为了确保桥梁的安全运行&#xff0c;桥梁安…

[创业之路-155] :《领先的密码-BLM方法论全面解读与应用指南》- 综合管理框架

目录 一、BLM&#xff08;业务领先模型&#xff09;综合管理框架 1、BLM模型的起源与发展 2、BLM模型的核心组成 3、BLM模型的战略规划与执行流程 4、BLM模型的应用价值 二、BLM&#xff08;业务领先模型&#xff09;实施案例 1. 华为的实施案例 2. 某知名企业A的实施案…

用Java爬虫“搜刮”工厂数据:一场数据的寻宝之旅

引言&#xff1a;数据的宝藏 在这个数字化的时代&#xff0c;数据就像是隐藏在数字丛林中的宝藏&#xff0c;等待着勇敢的探险家去发掘。而我们&#xff0c;就是那些手持Java魔杖的现代海盗&#xff0c;准备用我们的爬虫船去征服那些数据的海洋。今天&#xff0c;我们将一起踏…

redis 底层数据结构

概述 Redis 6 和 Redis 7 之间对比&#xff1a; Redis6 和 Redis7 最大的区别就在于 Redis7 已经用 listpack 替代了 ziplist. 以下是基于 Redis 7基础分析。 RedisObject Redis是⼀个<k,v>型的数据库&#xff0c;其中key通常都是string类型的字符串对象&#xff0c;⽽…

UE5 实现组合键触发事件的方法

因为工作原因。 需要用大括号{和}来触发事件 但是在蓝图中搜了一下&#xff0c;发现键盘事件里根本就没有{}这两个键。 花费了一下午&#xff0c;终于找到解决的方法了&#xff0c;也就是增强输入的弦操作 首先创建一个项目 纯蓝图或者C都可行 进入到内容浏览器的默认页面 …

使用Github Action将Docker镜像转存到阿里云私有仓库,供国内服务器使用,免费易用

文章目录 一、前言二、 工具准备&#xff1a;三、最终效果示例四、具体步骤第一大部分是配置阿里云1. 首先登录阿里云容器镜像服务 [服务地址](https://cr.console.aliyun.com/cn-hangzhou/instances)2. 选择个人版本3. 创建 命名空间4. 进入访问凭证来查看&#xff0c;用户名字…

用两个栈实现队列 剑指offer

题目描述 用两个栈实现一个队列。队列声明如下图&#xff0c;请实现它的两个函数appendTail和deleteHead,分别完成在队尾插入节点和队头删除节点的功能。 代码实现 测试用例 相关题目

本地推流,服务器拉流全流程

本地推流&#xff0c;服务器拉流全流程 环境准备&#xff1a;准备一台服务器&#xff0c;其中openssl最好为1.1.1版本&#xff08;可以直接使用ubuntu20.04操作系统&#xff09; 服务器拉流 1、 安装环境依赖 sudo apt-get update sudo apt-get install unzip sudo apt-get…

打开windows 的字符映射表

快捷键 win R 打开资源管理器 输入: charmap 点击确定

Elasticsearch对于大数据量(上亿量级)的聚合如何实现?

大家好&#xff0c;我是锋哥。今天分享关于【Elasticsearch对于大数据量&#xff08;上亿量级&#xff09;的聚合如何实现&#xff1f;】面试题。希望对大家有帮助&#xff1b; Elasticsearch对于大数据量&#xff08;上亿量级&#xff09;的聚合如何实现&#xff1f; 1000道 …

解决首次加载数据空指针异常

起初效果&#xff1a; 使用async...await异步加载数据 最终效果&#xff1a; 代码&#xff1a; <template><div class"user-list-container"><!-- 加载状态 --><div v-if"loading" class"loading">正在加载用户数据..…