Flink源码之JobMaster启动流程

Flink中Graph转换流程如下:

在这里插入图片描述

Flink Job提交时各种类型Graph转换流程中,JobGraph是Client端形成StreamGraph后经过Operator Chain优化后形成的,然后提交给JobManager的Restserver,最终转发给JobManager的Dispatcher处理。

CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, @RpcTimeout Time timeout);

本文主要解析从JobGraph转换为ExecutionGraph过程,执行栈如下:

Dispacher::submitJob
Dispacher::internalSubmitJob
Dispacher::persistAndRunJob
Dispacher::runJob
Dispacher::createJobManagerRunner
JobMasterServiceLeadershipRunnerFactory::createJobManagerRunner
JobMasterServiceLeadershipRunner:start
JobMasterServiceLeadershipRunner::grantLeadership
JobMasterServiceLeadershipRunner::startJobMasterServiceProcessAsync
JobMasterServiceLeadershipRunner::verifyJobSchedulingStatusAndCreateJobMasterServiceProcess
JobMasterServiceLeadershipRunner::createNewJobMasterServiceProcess
DefaultJobMasterServiceProcessFactory::create
DefaultJobMasterServiceProcess::new
DefaultJobMasterServiceFactory::createJobMasterService
DefaultJobMasterServiceFactory::internalCreateJobMasterService //创建JobMaster并调用其start
JobMaster::new //调用DefaultSlotPoolServiceSchedulerFactory::createScheduler
DefaultSlotPoolServiceSchedulerFactory::createScheduler //根据调度模式选择调度器
DefaultSchedulerFactory::createInstance //创建SchedulerNG
DefaultScheduler::new //
SchedulerBase::newSchedulerBase::createAndRestoreExecutionGraph DefaultExecutionGraphFactory::createAndRestoreExecutionGraphDefaultExecutionGraphBuilder.buildGraph//在此会将JobGraph转换为ExecutionGraphDefaultExecutionGraph::newDefaultExecutionGraph::attachJobGraph //创建ExecutionJobVertexDefaultExecutionTopology.fromExecutionGraph //创建ExecutionTopologyDefaultExecutionGraph::enableCheckpointing //创建CheckpointCoordinatorCheckpointCoordinator::new   
PipelinedRegionSchedulingStrategy.Factory.createInstance //创建PipelinedRegionSchedulingStrategyJobMaster::start
JobMaster::onStart
JobMaster::startJobExecution
JobMaster::startJobMasterServices //获取RM地址后与RM建立连接
JobMaster::startScheduling
SchedulerBase::startScheduling
DefaultScheduler::startSchedulingInternal
PipelinedRegionSchedulingStrategy::startScheduling
PipelinedRegionSchedulingStrategy::maybeScheduleRegions
DefaultScheduler::allocateSlotsAndDeploy
DefaultScheduler::allocateSlotsSlotSharingExecutionSlotAllocator::allocateSlotsFor //分配Slot
DefaultScheduler::waitForAllSlotsAndDeployDefaultScheduler::assignAllResourcesAndRegisterProducedPartitionsDefaultScheduler::assignResource //为每个Execution分配SlotDefaultScheduler::registerProducedPartitionsDefaultScheduler::deployAllDefaultScheduler::deployOrHandleErrorDefaultScheduler::deployTaskSafeDefaultExecutionVertexOperations::deployExecutionVertex::deployExecution::deploy //提交任务向TM提交DeploymenTaskManagerGateway.submitTask

在整个提交过程中,首先获取JobMasterService的Leader权限,然后对一个JobGraph生成一个JobMaster,JobMaster先将JobGraph转换为ExecutionGraph,转换核心逻辑在DefaultExecutionGraph::attachJobGraph方法中,最后为每个Execution申请Slot资源,对每个Execution向TM提交TaskDeploymentDescriptor调度执行。

在这里插入图片描述

JobMaster管理整个Job的生命周期,主要有以下功能:

  1. 将JobGraph转换为ExecutionGraph,创建调度器调度执行
  2. 通过心跳保持与ResourceManager的连接,为当前Job向RM申请Slot资源
  3. 接受TaskManager的OfferSlot, 向TM提交task, 主动发送心跳请求保持与执行当前Job的TM的连接
  4. 创建CheckpointCoordinator,触发Checkpoint

Flink中可通过jobmanager.scheduler配置调度类型,默认为NG:

NG:new generation scheduler
Adaptive: adaptive scheduler; supports reactive mode

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/86372.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Jmeter —— jmeter设置HTTP信息头管理器模拟请求头

HTTP信息头管理器 HTTP信息头管理器是在有需要模拟请求头部的时候进行设置的&#xff0c;添加方式 是 右击线程组 -- 配置元件 -- HTTP信息头管理器 可以通过抓包工具或者F12获取http请求的header头部信息&#xff1b;如下图&#xff1a; 复制并点击jmeter中的从剪贴板添加&am…

文盘 Rust -- tokio 绑定 cpu 实践

tokio 是 rust 生态中流行的异步运行时框架。在实际生产中我们如果希望 tokio 应用程序与特定的 cpu core 绑定该怎么处理呢&#xff1f;这次我们来聊聊这个话题。 首先我们先写一段简单的多任务程序。 use tokio::runtime; pub fn main() {let rt runtime::Builder::new_mu…

ffplay数据结构分析(一)

本文为相关课程的学习记录&#xff0c;相关分析均来源于课程的讲解&#xff0c;主要学习音视频相关的操作&#xff0c;对字幕的处理不做分析 下面我们对ffplay的相关数据结构进行分析&#xff0c;本章主要是对PacketQueue的讲解 struct MyAVPacketList和PacketQueue队列 ffp…

11-数据结构-栈和队列的应用(C语言)

栈和队列的应用 目录 栈和队列的应用 一、括号匹配&#xff08;栈&#xff09; 二、表达式的各种转换 (1)中缀转后缀(手工) (2)后缀转中缀表达式(手工) (3)中缀转后缀(栈) (4)中缀转后缀&#xff08;树&#xff09; (5)后缀表达式求值 (6)中缀表达式求值&#xff08;栈…

阻抗是什么?什么时候要考虑阻抗匹配?

在电路设计中&#xff0c;我们常常碰到跟阻抗有关的问题&#xff0c;那么到底什么是阻抗&#xff1f; 在具有电阻、电感和电容的电路里&#xff0c;对电路中电流所起的阻碍作用叫做阻抗。常用Z来表示&#xff0c;它的值由交流电的频率、电阻R、电感L、电容C相互作用来决定。由…

【黑马头条之xxl-Job分布式任务调度】

本笔记内容为黑马头条项目的分布式任务调度热点文章部分 目录 一、今日内容 1、需求分析 2、实现思路 3、定时计算 4、定时任务框架-xxljob 二、分布式任务调度 1、什么是分布式任务调度 2、xxl-Job简介 3、XXL-Job-环境搭建 4、配置部署调度中心-docker安装 5、xx…

系统学习Linux-Redis集群

目录 一、Redis主从复制 概念 作用 缺点 流程 二、Reids哨兵模式&#xff08;sentinel&#xff09; 概念 作用 缺点 结构 搭建 三、redis集群 概述 原理 架构细节 选举过程 实验环境模拟 一、Redis主从复制 概念 是指将一台Redis服务器的数据&#xff0c;复制…

安防监控视频汇聚平台EasyCVR分发的FLV视频流在VLC中无法播放是什么原因?

众所周知&#xff0c;TSINGSEE青犀视频汇聚平台EasyCVR可支持多协议方式接入&#xff0c;包括主流标准协议国标GB28181、RTSP/Onvif、RTMP等&#xff0c;以及厂家私有协议与SDK接入&#xff0c;包括海康Ehome、海大宇等设备的SDK等。在视频流的处理与分发上&#xff0c;视频监控…

java之junit Test

JUnit测试简介 1.什么是单元测试 单元测试是针对最小的功能单元编写测试代码Java程序最小的功能单元是方法单元测试就是针对单个Java方法的测试 2.测试驱动开发 3.单元测试的好处 确保单个方法运行正常如果修改了方法代码&#xff0c;只需确保其对应的单元测试通过测试代码…

佛祖保佑,永不宕机,永无bug

当我们的程序编译通过&#xff0c;能预防的bug也都预防了&#xff0c;其它的就只能交给天意了。当然请求佛祖的保佑也是必不可少的。 下面是一些常用的保佑图&#xff1a; 佛祖保佑图 ——————————————————————————————————————————…

【uniapp】uniapp设置安全区域:

文章目录 一、效果图:二、实现代码: 一、效果图: 二、实现代码: {"path": "pages/index/index","style": {"navigationStyle": "custom","navigationBarTextStyle": "white","navigationBarTitle…

List list=new ArrayList()抛出的ArrayIndexOutOfBoundsException异常

1.应用场景&#xff0c;今天生产日志监控到一组new ArrayList() 进行add 异常&#xff0c;具体日志如下&#xff1a; eptionHandler.handler(178): TXXYBUSSINESS|执行异常 java.util.concurrent.CompletionException: java.lang.ArrayIndexOutOfBoundsException: Index 1 out…

PyTorch翻译官网教程-NLP FROM SCRATCH: GENERATING NAMES WITH A CHARACTER-LEVEL RNN

官网链接 NLP From Scratch: Generating Names with a Character-Level RNN — PyTorch Tutorials 2.0.1cu117 documentation 使用字符级RNN生成名字 这是我们关于“NLP From Scratch”的三篇教程中的第二篇。在第一个教程中</intermediate/char_rnn_classification_tutor…

MongoDB的下载和安装

一、MongoD下载 下载地址&#xff1a;https://www.mongodb.com/try/download/community 二、安装 因为选择下载的是 .zip 文件&#xff0c;直接跳过安装&#xff0c;一步到位。 选择在任一磁盘创建空文件夹&#xff08;不要使用中文路径&#xff09;&#xff0c;解压之后把文…

android 如何分析应用的内存(十七)——使用MAT查看Android堆

android 如何分析应用的内存&#xff08;十七&#xff09;——使用MAT查看Android堆 前一篇文章&#xff0c;介绍了使用Android profiler中的memory profiler来查看Android的堆情况。 如Android 堆中有哪些对象&#xff0c;这些对象的引用情况是什么样子的。 可是我们依然面临…

flink kafka消费者如何处理kafka主题的rebalance

背景&#xff1a; 我们日常使用kafka客户端消费kafka主题的消息时&#xff0c;当消费者退出/加入消费者组&#xff0c;kafka主题分区数有变等事件发生时&#xff0c;都会导致rebalance的发生&#xff0c;此时一般情况下&#xff0c;如果我们不自己处理offset&#xff0c;我们不…

深入理解PyTorch中的NoamOpt优化器

深入理解PyTorch中的NoamOpt优化器 作者&#xff1a;安静到无声 个人主页 今天&#xff0c;我们将深入探讨一个在自然语言处理领域广泛使用的优化器——NoamOpt。这个优化器是基于PyTorch实现的&#xff0c;并且在"Attention is All You Need"这篇论文中首次提出。…

c++11-14-17_内存管理(RAII)_多线程

文章目录 前言&#xff1a;什么是RAII&#xff1f;指针/智能指针&#xff1a;使用智能指针管理内存资源&#xff1a;unique_ptr的使用&#xff1a;自定义删除器&#xff1a; shared_ptr的使用&#xff1a;shared_ptr指向同一个对象的不同成员&#xff1a;自定义删除函数&#x…

centos7 安装桌面

先装 xrdp $ sudo yum install -y epel-release $ sudo yum install -y xrdp $ sudo systemctl enable xrdp $ sudo systemctl start xrdp开防火墙端口 $ sudo firewall-cmd --add-port3389/tcp --permanent $ sudo firewall-cmd --reload比较喜欢 GNOME $ sudo yum groupin…

Stable Diffusion - 幻想 (Fantasy) 风格与糖果世界 (Candy Land) 人物提示词配置

欢迎关注我的CSDN&#xff1a;https://spike.blog.csdn.net/ 本文地址&#xff1a;https://spike.blog.csdn.net/article/details/132212193 图像由 DreamShaper8 模型生成&#xff0c;融合糖果世界。 幻想 (Fantasy) 风格图像是一种以想象力为主导的艺术形式&#xff0c;创造了…