大数据-玩转数据-Flink 容错机制

一、概述

在分布式架构中,当某个节点出现故障,其他节点基本不受影响。在 Flink 中,有一套完整的容错机制,最重要就是检查点(checkpoint)。

二、检查点(Checkpoint)

在流处理中,我们可以用存档读档的思路,把之前的计算结果做个保存,这样重启之后就可以继续处理新数据、而不需要重新计算了。所以我们最终的选择,就是将之前某个时间点所有的状态保存下来,这份“存档”就是所谓的“检查点(checkpoint)。遇到故障重启的时候,我们可以从检查点中“读档”,恢复出之前的状态,这样就可以回到当时保存的一刻接着处理数据了。checkpoint机制是Flink可靠性的基石,可以保证Flink集群在某个算子因为某些原因(如 异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某一状态,保证应用流图状态的一致性。

三、检查点快照的实现算法

1、简单算法:暂停应用,然后开始做检查点, 再重新恢复应用 。
2、Flink的改进Checkpoint算法. Flink的checkpoint机制原理自"Chandy-Lamport algorithm"算法(分布式快照算)的一种变体: 异步 barrier 快照(asynchronous barrier snapshotting)每个需要checkpoint的应用在启动时,Flink的JobManager为其创建一个 CheckpointCoordinator,CheckpointCoordinator全权负责本应用的快照制作。

重要概念:流的barrier
流的barrier是Flink的Checkpoint中的一个核心概念. 多个barrier被插入到数据流中, 然后作为数据流的一部分随着数据流动(有点类似于Watermark)。这些barrier不会跨越流中的数据。每个barrier会把数据流分成两部分: 一部分数据进入当前的快照 , 另一部分数据进入下一个快照 。每个barrier携带着快照的id。barrier 不会暂停数据的流动, 所以非常轻量级。 在流中,同一时间可以有来源于多个不同快照的多个barrier, 这个意味着可以并发的出现不同的快照。

Flink的检查点制作过程
1、Checkpoint Coordinator 向所有 source 节点 trigger Checkpoint,然后Source Task会在数据流中安插CheckPoint barrier;

2、source 节点向下游广播 barrier,这个 barrier 就是实现 Chandy-Lamport 分布式快照算法的核心,下游的 task 只有收到所有进来的 barrier 才会执行相应的 Checkpoint(barrier对齐, 但是新版本有一种新的barrier);

3、当 task 完成 state checkpoint后,会将备份数据的地址(state handle)通知给 Checkpoint coordinator;

4、下游的 sink 节点收集齐上游两个 input 的 barrier 之后,会执行本地快照;

5、同样的,sink 节点在完成自己的 Checkpoint 之后,会将 state handle 返回通知 Coordinator;

6、最后,当 Checkpoint coordinator 收集齐所有 task 的 state handle,就认为这一次的 Checkpoint 全局完成了,向持久化存储中再备份一个 Checkpoint meta 文件。

严格一次语义: barrier对齐

在多并行度下, 如果要实现严格一次, 则要执行barrier对齐。
当 job graph 中的每个 operator 接收到 barriers 时,它就会记录下其状态。拥有两个输入流的 Operators(例如 CoProcessFunction)会执行 barrier 对齐(barrier alignment) 以便当前快照能够包含消费两个输入流 barrier 之前(但不超过)的所有 events 而产生的状态。

1、当operator收到数字流的barrier n时, 它就不能处理(但是可以接收)来自该流的任何数据记录,直到它从字母流所有输入接收到 barrier n 为止。否则,它会混合属于快照 n 的记录和属于快照 n + 1 的记录;

2、接收到 barrier n 的流(数字流)暂时被搁置。从这些流接收的记录入输入缓冲区, 不会被处理;

3、 Checkpoint barrier n之后的数据 123已结到达了算子, 存入到输入缓冲区没有被处理, 只有等到字母流的Checkpoint barrier n到达之后才会开始处理;

一旦最后所有输入流都接收到 barrier n,Operator 就会把缓冲区中 pending 的输出数据发出去,然后把 CheckPoint barrier n 接着往下游发送。这里还会对自身进行快照。

至少一次语义: barrier不对齐

假设不对齐, 在字母流的Checkpoint barrier n到达前, 已经处理了1 2 3. 等字母流Checkpoint barrier n到达之后, 会做Checkpoint n. 假设这个时候程序异常错误了, 则重新启动的时候会Checkpoint n之后的数据重新计算. 1 2 3 会被再次被计算, 所以123出现了重复计算。

savepoint原理

1、Flink 还提供了可以自定义的镜像保存功能,就是保存(savepoints)
2、原则上,创建保存点使用的算法与检查点完全相同,因此保存点可以认为就是具有一些额外元数据的检查点
3、Flink不会自动创建保存点,因此用户(或外部调度程序)必须明确地触发创建操作
4、保存点是一个强大的功能。除了故障恢复外,保存点可以用于:有计划的手动备份,更新应用程序,版本迁移,暂停和重启应用,等等。

四、Kafka+Flink+Kafka 实现端到端严格一次

我们知道,端到端的状态一致性的实现,需要每一个组件都实现,对于Flink + Kafka的数据管道系统(Kafka进、Kafka出)而言,各组件怎样保证exactly-once语义呢?

  1. 内部 —— 利用checkpoint机制,把状态存盘,发生故障的时候可以恢复,保证部的状态一致性
  2. source —— kafka consumer作为source,可以将偏移量保存下来,如果后续任务出现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据,保证一致性
  3. sink —— kafka producer作为sink,采用两阶段提交 sink,需要实现一个 TwoPhaseCommitSinkFunction
    内部的checkpoint机制我们已经有了了解,那source和sink具体又是怎样运行的呢?接下来我们逐步做一个分析。

具体的两阶段提交步骤总结如下:

  1. 某个checkpoint的第一条数据来了之后,开启一个 kafka 的事务(transaction),正常写入 kafka分区日志但标记为未提交,这就是“预提交”(第一阶段提交)
  2. jobmanager 触发 checkpoint 操作,barrier 从 source 开始向下传递,遇到 barrier
    的算子状态后端会进行相应进行checkpoint,并通jobmanagerr
  3. sink 连接器收到 barrier,保存当前状态,存入 checkpoint,通知
    jobmanager,并开启下一阶段的事务,用于提交下个检查点的数据
  4. jobmanager 收到所有任务的通知,发出确认信息,表示 checkpoint 完成
  5. sink 任务收到 jobmanager 的确认信息,正式提交这段时间的数据(第二阶段提交)
  6. 外部kafka关闭事务,提交的数据可以正常消费了

在这里插入图片描述

五、代码中测试Checkpoint

package com.lyh.flink10;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;import java.util.Properties;public class kafka_flink_kafka_checkpoint {public static void main(String[] args) throws Exception {Properties sourceproperties = new Properties();sourceproperties.setProperty("bootstrap.servers","hadoop100:9092");sourceproperties.setProperty("group.id", "kafka_flink_kafka_checkpoint");sourceproperties.setProperty("auto.offset.reset","latest");StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()).setParallelism(3);env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:8020/flink/checkpoints/rocksdb"));// 每 1000ms 开始一次 checkpointenv.enableCheckpointing(1000);// 高级选项:// 设置模式为精确一次 (这是默认值)env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 确认 checkpoints 之间的时间会进行 500 msenv.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);// Checkpoint 必须在一分钟内完成,否则就会被抛弃env.getCheckpointConfig().setCheckpointTimeout(60000);// 同一时间只允许一个 checkpoint 进行env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 开启在 job 中止后仍然保留的 externalized checkpointsenv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.addSource(new FlinkKafkaConsumer<String>("s1",new SimpleStringSchema(),sourceproperties)).map(line ->{String[] datas = line.split(",");return new WaterSensor(datas[0],Long.valueOf(datas[1]),Integer.valueOf(datas[2]));}).keyBy(WaterSensor::getId).process(new KeyedProcessFunction<String, WaterSensor, String>() {private ValueState<Integer> state;@Overridepublic void open(Configuration parameters) throws Exception {state = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("state", Integer.class));}@Overridepublic void processElement(WaterSensor value,Context ctx,Collector<String> out) throws Exception {Integer lastVC = value.getVc() == null ? 0 : value.getVc();if ((Math.abs(value.getVc())-lastVC)>=10) {out.collect(value.getId()+"红色告警");}state.update(value.getVc());}}).addSink(new FlinkKafkaProducer<String>("hadoop100:9092","alert",new SimpleStringSchema()));env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/128681.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

初识docker

目录 docker解决的问题1. 开发、测试和运维人员之间的矛盾2. 更轻量的虚拟化&#xff0c;节省了虚拟机的性能损耗 虚拟机与容器的区别1. 虚拟机2. 容器 Docker 系统架构 docker解决的问题 1. 开发、测试和运维人员之间的矛盾 “程序在我这跑得好好的&#xff0c;在你那怎么就…

Qt的窗口系统

代码仓库以及参考文件见文章底部 坐标体系 要想学好GUI,界面的坐标系首先要搞清楚 在Qt编程中,以左上角为原点,X向右增加,Y向下增加。 对于所有嵌套的窗口,其坐标是相对于父窗口来说的。 QWidget 所有窗口以及窗口控件都是从QWidget直接或者间接派生出来的。 对象模…

手写Spring:第5章-注入属性和依赖对象

文章目录 一、目标&#xff1a;注入属性和依赖对象二、设计&#xff1a;注入属性和依赖对象三、实现&#xff1a;注入属性和依赖对象3.0 引入依赖3.1 工程结构3.2 注入属性和依赖对象类图3.3 定义属性值和属性集合3.3.1 定义属性值3.3.2 定义属性集合 3.4 Bean定义补全3.5 Bean…

21.添加websocket模块

这里默认读者了解websocket协议&#xff0c;若是还不了解可以看下这篇文章wesocket协议。 websocket主要有三个步骤&#xff0c;1通过HTTP进行握手连接&#xff0c;2进行双向通信&#xff0c;3.协商断开连接 第一步的握手连接需要HTTP&#xff0c;所以还需要使用到上一节讲解…

Python实现猎人猎物优化算法(HPO)优化BP神经网络回归模型(BP神经网络回归算法)项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档视频讲解&#xff09;&#xff0c;如需数据代码文档视频讲解可以直接到文章最后获取。 1.项目背景 猎人猎物优化搜索算法(Hunter–prey optimizer, HPO)是由Naruei& Keynia于2022年提出的一种最新的…

OpenCV(三十三):计算轮廓面积与轮廓长度

1.介绍轮廓面积与轮廓长度 轮廓面积&#xff08;Contour Area&#xff09;是指轮廓所包围的区域的总面积。通常情况下&#xff0c;轮廓面积的单位是像素的平方。 轮廓长度&#xff08;Contour Length&#xff09;又称周长&#xff08;Perimeter&#xff09;&#xff0c;表示轮廓…

Unity 从0开始编写一个技能编辑器_01_分析需求

入职以来一直很想实现一个技能编辑器&#xff0c;在积累了一些经验以后&#xff0c;决定利用ScriptableObject开发一个&#xff0c;在此记录 1.简单的需求分析 在游戏开发中&#xff0c;技能系统是一个至关重要的组成部分。技能决定了游戏角色可以执行的各种动作&#xff0c;例…

代码随想录算法训练营第十八天|513. 找树左下角的值|112. 路径总和|106. 从中序与后序遍历序列构造二叉树

513. 找树左下角的值 题目&#xff1a;给定一个二叉树的 根节点 root&#xff0c;请找出该二叉树的 最底层 最左边 节点的值。 假设二叉树中至少有一个节点。 示例 1: 输入: root [2,1,3] 输出: 1 思路一&#xff1a;层序遍历&#xff0c;最后一层的第一个元素&#xff0c;即…

java实时监控mysql数据库变化

对于二次开发来说&#xff0c;很大一部分就找找文件和找数据库的变化情况 对于数据库变化。还没有发现比较好用的监控数据库变化监控软件。 今天&#xff0c;我就给大家介绍一个如何使用mysql自带的功能监控数据库变化 1、打开数据库配置文件my.ini &#xff08;一般在数据库…

c语言 2.0

1.数据类型 数据类型介绍 数据类型&#xff1a;c语言中数据类型有3种&#xff0c;分别是基本数据类型、构造数据类型、指针数据类型。 数据类型的作用&#xff1a;编译器预算数据分配的内存空间大小。 ps&#xff1a;可以通俗理解为&#xff1a;数据类型是用来规范内存的开销…

python DVWA文件上传POC练习

先直接测试POC 抓包 GET /dv/vulnerabilities/sqli/?id1%27unionselect1%2Cmd5%28123%29%23&SubmitSubmit HTTP/1.1Host: 10.9.75.161Upgrade-Insecure-Requests: 1User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrom…

Tomcat服务的部署及配置优化

文章目录 1. Tomcat的相关介绍1.1 Tomcat简介1.2 Tomcat的核心组件1.2.1 Web容器1.2.2 Servlet容器1.2.3 JSP容器 1.3 Tomcat的功能组件1.3.1 connector连接器1.3.2 container容器1.3.2.1 子容器及其相关功能 1.4 主要作用1.5 Tmocat处理请求的过程 2. Tomcata服务部署2.1 安装…

log4qt库的使用

log4qt库的使用 一,什么是log4qt?二,log4qt的下载三,如何集成log4qt?1.在vs2022中集成log4qt的方法:模块一:配置log4qt的步骤步骤一,将下好的log4qt库进行解压,然后再库文件中,新建build和Log4Qt文件夹步骤二,打开cmake,有两个填写路径的位置.步骤三,点击cmake的configure按钮…

tcp满开始和拥塞避免

tcp的拥塞控制有四种算法&#xff0c;后面的快重传和快恢复是后面新增的&#xff0c; 刚开始会初始化慢开始门限值&#xff0c;并将拥塞窗口值为1往网络中发送&#xff0c;若收到确认包则将拥塞窗口翻倍&#xff0c;执行慢开始算法&#xff0c;当拥塞窗口值达到慢开始门限后&am…

02-Tomcat打破双亲委派机制

上一篇&#xff1a;01-从JDK源码级别剖析JVM类加载机制 Tomcat 如果使用默认的双亲委派类加载机制行不行&#xff1f; 我们思考一下&#xff1a;Tomcat是个web容器&#xff0c; 那么它要解决什么问题&#xff1a; 一个web容器可能需要部署两个应用程序&#xff0c;不同的应用…

Alibaba(获得店铺的所有商品) API 接口

为了进行电商平台 的API开发&#xff0c;首先我们需要做下面几件事情。 1&#xff09;开发者注册一个账号 2&#xff09;然后为每个alibaba应用注册一个应用程序键&#xff08;App Key) 。 3&#xff09;下载alibaba API的SDK并掌握基本的API基础知识和调用 4&#xff09;利…

深入浅出Android同步屏障机制

原文链接 Android Sync Barrier机制 诡异的假死问题 前段时间&#xff0c;项目上遇到了一个假死问题&#xff0c;随机出现&#xff0c;无固定复现规律&#xff0c;大量频繁随机操作后&#xff0c;便会出现假死&#xff0c;整个应用无法操作&#xff0c;不会响应事件&#xff…

【Linux】Systemd 中的单元(Unit)和单元文件(Unit File)怎么理解?

单元&#xff08;Unit&#xff09;单元文件&#xff08;Unit File&#xff09;感谢 &#x1f496; 关于systemd是什么&#xff0c;http://t.csdn.cn/pMkG7这篇文章里有详细说明。 这篇文件我们一起来看看Systemd 中的单元&#xff08;Unit&#xff09;和单元文件&#xff08;Un…

vue使用jsencrypt实现rsa前端加密

实现 RSA 加密 介绍 vue 完成 rsa 加密传输&#xff0c;jsencrypt 实现参数的前端加密 1 安装 jsencrypt npm install jsencrypt2 编写 jsencrypt.js 在 utils 文件夹中新建 jsencrypt.js 文件&#xff0c;内容如下&#xff1a;注意点&#xff1a;一般公钥都是后端生成好的&a…

excl在建模语言中的运用

目录 1.表格的定位 2.数学函数 3.自动填充功能 4.数据透视表的应用 5.切片器 6. Date(),time(),now()&#xff0c;today() 7.文本转日期 8.分裂 9.sumif函数 10.数字转换为文本的方法 11.SUMIFS()函数&#xff1a;多个条件筛选 12.宏 13.提取多个表中&#xff0c;…