深度了解flink(七) JobManager(1) 组件启动流程分析

前言

JobManager是Flink的核心进程,主要负责Flink集群的启动和初始化,包含多个重要的组件(JboMaster,Dispatcher,WebEndpoint等),本篇文章会基于源码分析JobManagr的启动流程,对其各个组件进行介绍,希望对JobManager有一个更全面的了解。

集群启动模式

ClusterEntryPoint是Flink集群的入口点的基类,该类是抽象类,类继承关系UML图如下

通过上图可知道,Flink有3种集群模式

Flink Session集群

根据不同的资源管理器,有3个不同的子类:

  • StandaloneSessionClusterEntrypoint Standalone session模式下集群的入口类
  • KubernetesSessionClusterEntrypoint K8s session模式下集群的入口类
  • YarnSessionClusterEntrypoint Yarn session模式下集群的入口类

集群生命周期

在 Flink Session 集群中,客户端连接到一个预先存在的、长期运行的集群,该集群可以接受多个作业提交。即使所有作业完成后,集群(和 JobManager)仍将继续运行直到手动停止 session 为止。因此,Flink Session 集群的寿命不受任何 Flink 作业寿命的约束。

资源隔离:

Flink作业共享集群的ResourceManager和Dispacher等组件,TaskManager slot 由 ResourceManager 在提交作业时分配,并在作业完成时释放。由于所有作业都共享同一集群,因此在集群资源方面存在一些竞争 — 例如提交工作阶段的网络带宽。此共享设置的局限性在于,如果 TaskManager 崩溃,则在此 TaskManager 上运行 task 的所有作业都将失败;类似的,如果 JobManager 上发生一些致命错误,它将影响集群中正在运行的所有作业。

适用场景:

因为组件共享,session集群资源使用率高,集群预先存在,不需要额外申请资源,适合一些比较小的,不是长期运行的作业,例如SQL预览,交互式查询,实时任务测试环境等

Flink Per Job集群

只要Yarn提供了继承的子类:YarnJobClusterEntrypoint

集群生命周期

在 Flink Job 集群中,可用的集群管理器(例如 YARN)用于为每个提交的作业启动一个集群,并且该集群仅可用于该作业。一旦作业完成,Flink Job 集群将被拆除。

资源隔离:

每一个提交的Flink应用程序单独创建一套完整集群环境,该Job独享使用的计算资源和组件服务。

使用场景:

实时由于Per Job模式下用户应用程序的main方法在客户端执行生成JobGraph,任务量大情况下存在性能瓶颈,目前已被标记为废弃状态。

Flink Application集群

根据不同的资源管理器,有3个不同的子类:

  • StandaloneApplicationClusterEntryPoint Standalone Application模式下集群的入口类
  • KubernetesApplicationClusterEntrypoint K8s Application模式下集群的入口类
  • YarnApplicationClusterEntryPoint Yarn Application模式下集群的入口类

集群生命周期

Flink Application 集群是专用的 Flink 集群,仅从 Flink 应用程序执行作业,并且main方法在集群上而不是客户端上运行。应用程序逻辑和依赖打包成一个可执行的作业 JAR 中,并且集群入口(ApplicationClusterEntryPoint)负责调用main方法来提取 JobGraph

资源隔离:

每一个提交的Flink应用程序单独创建一套完整集群环境,该Job独享使用的计算资源和组件服务。

使用场景:

Application模式资源隔离性好,Per Job模式的替换方案,适合长期运行、具有高稳定性的大型作业

JobManager启动流程

JobManger启动流程在不同模式下基本相同,Standalone模式可以在本地运行(可以参考),方便Debug,因为使用Standalone模式的入口类StandaloneSessionClusterEntrypoint进行启动流程的分析。

main方法入口

public static void main(String[] args) {// 打印系统相关信息EnvironmentInformation.logEnvironmentInfo(LOG, StandaloneSessionClusterEntrypoint.class.getSimpleName(), args);//信号注册器,注册系统级别的信号,接收到系统级别终止信号优雅的关闭SignalHandler.register(LOG);//注册一个安全的钩子,这样jvm停止之前会睡眠5s去释放资源,5s之后强制关闭JvmShutdownSafeguard.installAsShutdownHook(LOG);// 解析命令行参数,获取配置信final EntrypointClusterConfiguration entrypointClusterConfiguration =ClusterEntrypointUtils.parseParametersOrExit(args,new EntrypointClusterConfigurationParserFactory(),StandaloneSessionClusterEntrypoint.class);//加载config.yaml,构建Configuration对象Configuration configuration = loadConfiguration(entrypointClusterConfiguration);StandaloneSessionClusterEntrypoint entrypoint =new StandaloneSessionClusterEntrypoint(configuration);ClusterEntrypoint.runClusterEntrypoint(entrypoint);
}

主要步骤

  1. 打印系统信息。
  2. 注册信号处理器,注册系统级别的信号,确保优雅关闭。
  3. 注册一个安全的钩子,这样jvm停止之前会睡眠5s去释放资源,5s之后强制关闭。
  4. 解析命令行参数,加载配置文件。
  5. 初始化 StandaloneSessionClusterEntrypoint
  6. 调用 ClusterEntrypoint#runClusterEntrypoint 方法启动集群。

ClusterEntrypoint#runClusterEntrypoint

public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName();
try {//clusterEntrypoint.startCluster();
} catch (ClusterEntrypointException e) {LOG.error(String.format("Could not start cluster entrypoint %s.", clusterEntrypointName),e);System.exit(STARTUP_FAILURE_RETURN_CODE);
}//无关代码 无需关注
}

核心步骤

  • 调用 clusterEntrypoint.startCluster() 启动集群。

ClusterEntrypoint#startCluster

public void startCluster() throws ClusterEntrypointException {//无关代码 无需关注try {FlinkSecurityManager.setFromConfiguration(configuration);//插件管理类,用来加载插件。插件加载两种方式。//1).通过如下参数配置FLINK_PLUGINS_DIR。//2).将插件jar包放入到plugins下PluginManager pluginManager =PluginUtils.createPluginManagerFromRootFolder(configuration);//初始化文件系统的配置configureFileSystems(configuration, pluginManager);//初始化安全上下文环境 默认HadoopSecurityContext,Hadoop安全上下文,//使用先前初始化的UGI(UserGroupInformation)和适当的安全凭据。比如Kerberos。//总结:初始化安全环境,创建安全环境的时候会做一系列的检查。SecurityContext securityContext = installSecurityContext(configuration);ClusterEntrypointUtils.configureUncaughtExceptionHandler(configuration);//安全的情况下调用runCluster开始初始化组件securityContext.runSecured((Callable<Void>)() -> {runCluster(configuration, pluginManager);return null;});} catch (Throwable t) {//异常处理代码 无需关注}}

startCluster方法主要做了一些环境和配置初始化的工作

主要步骤

  1. 初始化插件管理器,用来加载插件。
  2. 初始化文件系统设置 例如 hdfs、本地file。此时只是初始化的配置。
  3. 初始化安全环境。
  4. 安全环境下调用 runCluster 方法。

ClusterEntrypoint#runCluster

private void  runCluster(Configuration configuration, PluginManager pluginManager)throws Exception {synchronized (lock) {//初始化集群所需要的服务:例如通信服务,监控服务,高可用服务等initializeServices(configuration, pluginManager);// write host information into configurationconfiguration.set(JobManagerOptions.ADDRESS, commonRpcService.getAddress());configuration.set(JobManagerOptions.PORT, commonRpcService.getPort());//创建Dispatcher和ResourceManger组件的工厂类final DispatcherResourceManagerComponentFactorydispatcherResourceManagerComponentFactory =createDispatcherResourceManagerComponentFactory(configuration);//创建Dispatcher和ResourceManger组件clusterComponent =dispatcherResourceManagerComponentFactory.create(configuration,resourceId.unwrap(),ioExecutor,commonRpcService,haServices,blobServer,heartbeatServices,delegationTokenManager,metricRegistry,executionGraphInfoStore,new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),failureEnrichers,this);//组件停止运行后的异步方法clusterComponent.getShutDownFuture().whenComplete(//代码省略)}}

主要步骤

1.初始化集群所需要的服务:例如通信服务,监控服务,高可用服务等

2.创建Dispatcher和ResourceManger组件的工厂类

3.创建Dispatcher和ResourceManger组件

4.定义组件停止运行后的异步方法

总结

本篇文章分享了Flink任务的集群模式,通过源码的方式分析了JobManger的启动流程,后续会对JobManger相关的服务和组件进行更详细的分析。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/461461.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深度学习模型入门教程指南

在当前的人工智能生成内容&#xff08;AIGC&#xff09;领域中&#xff0c;深度学习模型无疑是支撑其技术核心的关键组件。深度学习模型的广泛应用极大地推动了图像生成、自然语言处理和自动化工作流的发展&#xff0c;本文将从多个角度介绍深度学习模型的概念、构建过程、实际…

C语言指针的介绍

零.导言 在日常生活中&#xff0c;我们常常在外出时居住酒店&#xff0c;细心的你一定能发现酒店不同的房间上有着不同的门牌号&#xff0c;上面写着像308&#xff0c;512之类的数字。当你定了酒店之后&#xff0c;你就会拿到一个写有门牌号的钥匙&#xff0c;凭着钥匙就能进入…

【Spring MVC】DispatcherServlet 请求处理流程

一、 请求处理 Spring MVC 是 Spring 框架的一部分&#xff0c;用于构建 Web 应用程序。它遵循 MVC&#xff08;Model-View-Controller&#xff09;设计模式&#xff0c;将应用程序分为模型&#xff08;Model&#xff09;、**视图&#xff08;View&#xff09;和控制器&#x…

[ 问题解决篇 ] win11远程桌面报错:出现身份验证错误要求的函数不受支持(附完整解决方案)

&#x1f36c; 博主介绍 &#x1f468;‍&#x1f393; 博主介绍&#xff1a;大家好&#xff0c;我是 _PowerShell &#xff0c;很高兴认识大家~ ✨主攻领域&#xff1a;【渗透领域】【数据通信】 【通讯安全】 【web安全】【面试分析】 &#x1f389;点赞➕评论➕收藏 养成习…

汽车免拆诊断案例 | 2010款起亚赛拉图车发动机转速表指针不动

故障现象  一辆2010款起亚赛拉图车&#xff0c;搭载G4ED 发动机&#xff0c;累计行驶里程约为17.2万km。车主反映&#xff0c;车辆行驶正常&#xff0c;但组合仪表上的发动机转速表指针始终不动。 故障诊断  接车后进行路试&#xff0c;车速表、燃油存量表及发动机冷却温度…

自动化运维

自动化运维是指使用工具和脚本自动化管理、配置、监控和维护IT基础设施的过程。通过自动化运维&#xff0c;可以提高工作效率&#xff0c;减少人为错误&#xff0c;增加系统的可预测性和稳定性。以下是实现自动化运维的常见步骤和工具&#xff1a; 常见步骤&#xff1a; 1. 定义…

驱动——线程断链和信息获取

实验环境&#xff1a;win7 x32 断链&#xff1a; #include <ntifs.h>NTSTATUS EnumThread(ULONG ulPid, ULONG ulTid) {PEPROCESS pProcessAddr PsGetCurrentProcess();PLIST_ENTRY pHeadlink (PLIST_ENTRY)((ULONG)pProcessAddr 0xb8);PLIST_ENTRY pNextlink pHead…

AWD挨打记录

前言 昨天参加了星盟的AWD集训&#xff0c;本来寻思能猛猛乱杀&#xff0c;结果加固时间只有20分钟&#xff0c;WAF还没push上去就被三家上了不死马QAQ cms是站帮主&#xff0c;之前没打过&#xff0c;D盾啥也没扫出来&#xff0c;还寻思是个贼安全的系统&#xff0c;结果洞满…

鸿蒙打包hvigorw clean报错No npmrc file is matched in the current user folder解决

问题 在执行hvigorw clean等命令时&#xff0c;报错如下&#xff1a; Error: The hvigor depends on the npmrc file. No npmrc file is matched in the current user folder. Configure the npmrc file first解决方案 在用户当前目录下新建.npmrc文件&#xff0c;并配置如下…

前端如何实现进度条

将进度条的宽度动态控制&#xff0c;通过css的transition动画来控制 <template><div class"container"><div class"base-progress"><div class"inner" :style"{ width: w % }"><div class"text&qu…

SWAT-MODFLOW地表水与地下水耦合实践技术

耦合模型被应用到很多科学和工程领域来改善模型的性能、效率和结果&#xff0c;SWAT作为一个地表水模型可以较好的模拟主要的水文过程&#xff0c;包括地表径流、降水、蒸发、风速、温度、渗流、侧向径流等&#xff0c;但是对于地下水部分的模拟相对粗糙&#xff0c;考虑到SWAT…

江协科技STM32学习- P27 实验-串口发送/串口接收

&#x1f680;write in front&#x1f680; &#x1f50e;大家好&#xff0c;我是黄桃罐头&#xff0c;希望你看完之后&#xff0c;能对你有所帮助&#xff0c;不足请指正&#xff01;共同学习交流 &#x1f381;欢迎各位→点赞&#x1f44d; 收藏⭐️ 留言&#x1f4dd;​…

Linux笔记--基础入门

文章目录 Linux基础知识点文件目录*磁盘分区**基础命令*Linux运行级别关机重启手册alias别名ntsysv系统服务管理程序 Linux常用命令命令分类命令行格式选项参数 命令行辅助操作 真常用命令()help命令&#xff1a;帮助指令man手册页manual page绝对路径与相对路径绝对路径&#…

11月1日星期五今日早报简报微语报早读

11月1日星期五&#xff0c;农历十月初一&#xff0c;早报#微语早读。 1、六大行今日起实施存量房贷利率新机制。 2、谷歌被俄罗斯罚款35位数&#xff0c;罚款远超全球GDP。 3、山西吕梁&#xff1a;女性35岁前登记结婚&#xff0c;给予1500元奖励。 4、我国人均每日上网时间…

Pandas DataFrame学习补充

1. 从字典创建&#xff1a;字典的键成为列名&#xff0c;值成为列数据。 import pandas as pd# 通过字典创建 DataFrame df pd.DataFrame({Column1: [1, 2, 3], Column2: [4, 5, 6]}) 2. 从列表的列表创建&#xff1a;外层列表代表行&#xff0c;内层列表代表列。 df pd.Da…

<项目代码>YOLOv8 煤矸石识别<目标检测>

YOLOv8是一种单阶段&#xff08;one-stage&#xff09;检测算法&#xff0c;它将目标检测问题转化为一个回归问题&#xff0c;能够在一次前向传播过程中同时完成目标的分类和定位任务。相较于两阶段检测算法&#xff08;如Faster R-CNN&#xff09;&#xff0c;YOLOv8具有更高的…

推荐一款功能强大的文字处理工具:Atlantis Word Processor

Atlantis word proCEssor是一款功能强大的文字处理工具。该软件可以让用户放心的去设计文档&#xff0c;并且软件的界面能够按用户的意愿去自定义&#xff0c;比如工具栏、字体选择、排版、打印栏等等&#xff0c;当然还有更多的功能&#xff0c;比如你还可以吧软件界面中的任何…

「虚拟现实中的心理咨询:探索心灵世界的新方法」

内容概要 当我们想到虚拟现实时&#xff0c;很多人会联想到游戏或娱乐&#xff0c;但如今其在心理咨询领域的应用正在逐渐崭露头角。传统的心理咨询方式常常局限在咨询室内&#xff0c;面临着空间和情感隔阂的问题。然而&#xff0c;沉浸式环境的出现&#xff0c;使得治疗者能…

图像修复与重建——几何失真(畸变)的概念

一 几何失真&#xff08;畸变&#xff09;的概念 在实际的成像系统中&#xff0c;图像捕捉介质平面和物体平面之间不可避免地存在有一定的转角和倾斜角。转角对图像的影响是产生图像旋转&#xff0c;倾斜角的影响表现为图像发生投影变形。另外一种情况是由于摄像机系统本身的原…

Spark的集群环境部署

一、Standalone集群 1.1、架构 架构&#xff1a;普通分布式主从架构 主&#xff1a;Master&#xff1a;管理节点&#xff1a;管理从节点、接客、资源管理和任务 调度&#xff0c;等同于YARN中的ResourceManager 从&#xff1a;Worker&#xff1a;计算节点&#xff1a;负责利…