Flink源码之JobManager启动流程

从启动命令flink-daemon.sh中可以看出StandaloneSession入口类为org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint, 从该类的main方法会进入ClusterEntrypoint::runCluster中, 该方法中会创建出主要服务和组件。

StandaloneSessionClusterEntrypoint::main
ClusterEntrypoint::runClusterEntrypoint
ClusterEntrypoint::startCluster
ClusterEntrypoint::runClusterprivate void runCluster(Configuration configuration, PluginManager pluginManager)throws Exception {synchronized (lock) {initializeServices(configuration, pluginManager); //初始化服务// write host information into configurationconfiguration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());final DispatcherResourceManagerComponentFactorydispatcherResourceManagerComponentFactory =createDispatcherResourceManagerComponentFactory(configuration);//创建核心组件clusterComponent =dispatcherResourceManagerComponentFactory.create(configuration,ioExecutor,commonRpcService,haServices,blobServer,heartbeatServices,metricRegistry,executionGraphInfoStore,new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),this);...ignore code}
}

可以看出关键代码是调用initializeServices以及创建Cluster Component。

protected void initializeServices(Configuration configuration, PluginManager pluginManager)throws Exception {LOG.info("Initializing cluster services.");synchronized (lock) {rpcSystem = RpcSystem.load(configuration);commonRpcService =RpcUtils.createRemoteRpcService(rpcSystem,configuration,configuration.getString(JobManagerOptions.ADDRESS),getRPCPortRange(configuration),configuration.getString(JobManagerOptions.BIND_HOST),configuration.getOptional(JobManagerOptions.RPC_BIND_PORT));JMXService.startInstance(configuration.getString(JMXServerOptions.JMX_SERVER_PORT));// update the configuration used to create the high availability servicesconfiguration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());ioExecutor =Executors.newFixedThreadPool(ClusterEntrypointUtils.getPoolSize(configuration),new ExecutorThreadFactory("cluster-io"));haServices = createHaServices(configuration, ioExecutor, rpcSystem);blobServer = new BlobServer(configuration, haServices.createBlobStore());blobServer.start();heartbeatServices = createHeartbeatServices(configuration);metricRegistry = createMetricRegistry(configuration, pluginManager, rpcSystem);final RpcService metricQueryServiceRpcService =MetricUtils.startRemoteMetricsRpcService(configuration, commonRpcService.getAddress(), rpcSystem);metricRegistry.startQueryService(metricQueryServiceRpcService, null);final String hostname = RpcUtils.getHostname(commonRpcService);processMetricGroup =MetricUtils.instantiateProcessMetricGroup(metricRegistry,hostname,ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));executionGraphInfoStore =createSerializableExecutionGraphStore(configuration, commonRpcService.getScheduledExecutor());}
}

在initializeServices中首先创建commonRpcService,这个RPCService实例是JobManager提供RPC服务的核心,可以看出它会有个地址和监听端口号,commonRpcService可将继承自Gateway的服务实例包装成AkkaActor对外提供RPC服务,比如ResourceManager、Dispatcher。此外还创建了其他服务:

haService: 可通过HAService获取ResourceManager/Dispatcher/RestEndpoint的地址,同时也提供选主服务,组件启动时需向HAService注册,如果被选主成功,则会调用监听器的grandLeadership回调函数
BlobServer: 可用来提供存储大对象存储服务
heartbeatServices:为组件间传递心跳信息
metricRegistry:提供metric上报和查询服务,监听端口不同,新建了一个RpcService专为Metric服务
processMetricGroup:注册系统运行状态信息的Metric,比如GC/Memory/Network运行时状况,添加Metric都是通过一个MetricGroup添加
executionGraphInfoStore:缓存Job执行时信息,比如ExecutionGrap

初始化服务创建完成后,通过DefaultDispatcherResourceManagerComponentFactory:create创建JobManager的三大核心组件:Dispacher/ResourceManager/RestEndpointServer, 都是通过工厂方法创建:

DefaultDispatcherRunnerFactory
StandaloneResourceManagerFactory
SessionRestEndpointFactory

这些组件是JobManager向HAService注册获取leadership后,被ElectionService回调grantLeadership函数中创建出具体组件实例。

RestServer

RestServer并不是一个RPCServer,没有继承RpcGateway,只提供HTTP接口服务,然后将请求转交给Dispatcher处理,它的生成启动流程如下:

SessionRestEndpointFactory::createRestEndpoint
DispatcherRestEndpoint::new
RestServerEndpoint::start //通过Netty启动Rest服务
DispatcherRestEndpoint::initializeHandlers //JobSubmitHeaders、JobSubmitHandler处理客户端提交Job
WebMonitorEndpoint::initializeHandlers //关联Rest请求的Header和Handler
WebMonitorEndpoint::startInternal //竞选leader

ResourceManager

RM生成启动过程是ResourceManagerServiceImpl先竞选leader成功后再创建出具体的ResourceManager

ResourceManagerServiceImpl::start
ResourceManagerServiceImpl::grantLeadership
ResourceManagerServiceImpl::startNewLeaderResourceManager
ResourceManagerServiceImpl::startResourceManagerIfIsLeader//调用start方法
StandaloneResourceManagerFactory::createResourceManager
StandaloneResourceManager::new
StandaloneResourceManager::start

Dispatcher

Dispacher生成启动过程是DefaultDispatcherRunner选主后再创建出具体实例

DefaultDispatcherRunnerFactory::createDispatcherRunner
DefaultDispatcherRunner::create
DispatcherRunnerLeaderElectionLifecycleManager.createFor
DefaultDispatcherRunner::grantLeadership //
DefaultDispatcherRunner::startNewDispatcherLeaderProcess//创建SessionDispatcherLeaderProcess并调用其start方法
DefaultDispatcherRunner::createNewDispatcherLeaderProcess
SessionDispatcherLeaderProcessFactoryFactory::createFactory
SessionDispatcherLeaderProcessFactory::create
SessionDispatcherLeaderProcess::create
SessionDispatcherLeaderProcess::start
AbstractDispatcherLeaderProcess::start
AbstractDispatcherLeaderProcess::startInternal
SessionDispatcherLeaderProcess:onstart
SessionDispatcherLeaderProcess::createDispatcherIfRunning
SessionDispatcherLeaderProcess::createDispatcher
DefaultDispatcherGatewayServiceFactory::create//创建Dispatcher并调用其start方法
SessionDispatcherFactory::createDispatcher
StandaloneDispatcher::new
StandaloneDispatcher::start
Dispatcher::onstart

总结

在这里插入图片描述
JobManager的启动过程就是创建三大组件RestServer/RM/Dispacher实例初始化的过程,RestSever通过Netty启动HTTP服务,RM/Dispacher被AkkaRpcService包装成AkkaActor提供本地或远程RPC服务,RestServer仅仅是接受请求解析消息后由具体Handler处理,JobGrap提交执行会转发给Dispatcher处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/82290.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

tomcat

文章目录 启动目录结构部署原始配置方式一方式二 在idea里配置注解配置 Servlet的执行流程生命周期相关信息其他配置request的各种方法访问页面 启动 下载->bin->点击startup.bat->启动 startup.sh 是Linux下的启动 注意: 需要是配java环境变量 目录结构…

Wav2Lip实践

1. 安装 1.1 安装 conda以指定python版本运行环境 下载:Index of /https://repo.anaconda.com/archive/index.html 1.2 如按旧项目基于python3.6版本对话,会有很多包找不到的情况,经摸索后以python3.9构建成功, conda instal…

IL汇编 ldarg 指令学习

IL汇编代码, .assembly extern mscorlib {} .assembly MathLib {.ver 1 : 0 : 1 : 0 }.module MathLib.dll.namespace MyMath { .class public ansi auto MathClass extends [mscorlib]System.Object{ .method public int32 GetSquare(int32) c…

C++进阶 智能指针

本篇博客简介:介绍C中的智能指针 智能指针 为什么会存在智能指针内存泄露内存泄漏定义内存泄漏的危害如何检测内存泄漏如何避免内存泄漏 智能指针的使用及其原理RAII设计一个智能指针C官方的智能指针 定制删除器智能指针总结 为什么会存在智能指针 我们首先来看下面…

Linux常见命令

新建标签页 (gitee.com)尹相辉 (yinxianghui66) - Gitee.com新建标签页 (gitee.com) 文章目录 文章目录 一、Linux常见命令 1.ls 2.cd 目录名 3.pwd 4.touch 文件名 5.echo 字符串->目标文件 6.cat 文件名 7.man 8.vim 文件名 9.mkdir 目录名 10.rm 文件名 11.mv 源…

WEB集群——负载均衡集群

目录 一、 LVS-DR 群集。 1、LVS-DR工作原理 2、LVS-DR模式的特点 3、部署LVS-DR集群 3.1 配置负载调度器(192.168.186.100) 3.2 第一台web节点服务器(192.168.186.103) 3.3 第二台web节点服务器(192.168.186.…

tui.calender日历在vue中的使用1.0

官网:https://ui.toast.com/tui-calendar github:https://github.com/nhn/tui.calendar/tree/main 月、周、日视图都有,拖拽也比较方便,但是自己用起来比较费劲,参考文档写得不全,做个记录日后方便参考&…

Freemarker:生成HTML文本文件

前置工作参考: Freemarker:基本使用_moreCalm的博客-CSDN博客 1、修改application.yml配置文件 server:port: 8881 #服务端口 spring:application:name: freemarker-demo #指定服务名freemarker:cache: false #关闭模板缓存,方便测试settin…

数据结构----c语言复习

数据结构----c语言复习 一.类型 1.类型的种类 char 1个字节 范围-128~127 short 2个字节 范围-32768~32767 int 4个字节 范围-2147483648~2147483647 long 4个字节 范围-2147483648~2147483647 float 4个字节 有效位为6~7位 float 8个字节 有效位为15~16为 unsigned c…

Jmeter添加cookie的两种方式

jmeter中添加cookie可以通过配置HTTP Cookie Manager,也可以通过HTTP Header Manager,因为cookie是放在头文件里发送的。 实例:博客园点击添加新随笔 https://i.cnblogs.com/EditPosts.aspx?opt1 如果未登录,跳转登录页&#xf…

空地协同智能消防系统——无人机、小车协同

1 题目 1.1 任务 设计一个由四旋翼无人机及消防车构成的空地协同智能消防系统。无人机上安装垂直向下的激光笔,用于指示巡逻航迹。巡防区域为40dm48dm。无人机巡逻时可覆盖地面8dm宽度区域。以缩短完成全覆盖巡逻时间为原则,无人机按照规划航线巡逻。发…

【elementui】解决el-select组件失去焦点blur事件每次获取的是上一次选中值的问题

目录 【问题描述】 【问题摘要】 【分析问题】 【完整Test代码】 【封装自定义指令】 ↑↑↑↑↑↑↑↑↑↑↑↑ 不想看解决问题过程的可点击上方【封装自定义指令】目录直接跳转获取结果即可~~~ 【问题描述】 一位朋友遇到这么一个开发场景:在表格里面嵌入el-…

Hololens2二维码识别

配置 目前大部分Hololens进行二维码识别的开发都是基于ZXing的包完成,首先需要完成zxing.unity.dll,很多地方应该都能下载,也可以直接上github上下载(下载点这里)。 下载时注意一下版本就好,过老的zxing兼…

2023-08-02 LeetCode每日一题(翻转卡片游戏)

2023-08-02每日一题 一、题目编号 822. 翻转卡片游戏二、题目链接 点击跳转到题目位置 三、题目描述 在桌子上有 N 张卡片,每张卡片的正面和背面都写着一个正数(正面与背面上的数有可能不一样)。 我们可以先翻转任意张卡片,…

一位年薪40W的测试被开除,回怼的一番话,令人沉思

一位年薪40W测试工程师被开除回怼道:“反正我有技术,在哪不一样” 一技傍身,万事不愁,当我们掌握了一技之长后,在职场上说话就硬气了许多,不用担心被炒,反过来还可以炒了老板,这一点…

解决 Android Studio 的 Gradle 面板上只有关于测试的 task 的问题

文章目录 问题描述解决办法 笔者出问题时的运行环境: Android Studio Flamingo | 2022.2.1 Android SDK 33 Gradle 8.0.1 JDK 17 问题描述 笔者最近发现一个奇怪的事情。笔者的 Android Studio 的 Gradle 面板上居然除了用于测试的 task 之外,其它什…

Java中String方法魔性学习

这里写目录标题 先进行专栏介绍String详解常用构造方法代码演示常用成员方法代码示例总结 先进行专栏介绍 本专栏是自己学Java的旅途,纯手敲的代码,自己跟着黑马课程学习的,并加入一些自己的理解,对代码和笔记 进行适当修改。希望…

机器学习基础知识(1)

什么是机器学习 机器学习是一种通过输入大量数据来构建一种模型(网络),这个训练好的模型将会被用来预测或执行某些操作,这个训练的过程和方法就是机器学习。 我们也可以理解为构建一个“函数”,使得这个函数面对我们…

【第一阶段】kotlin的range表达式

range:范围:从哪里到哪里的意思 in:表示在 !in:表示不在 … :表示range表达式 代码示例: fun main() {var num:Int20if(num in 0..9){println("差劲")}else if(num in 10..59){println("不及格")}else if(num in 60..89…

神码ai伪原创【php源码】

大家好,小编为大家解答python必备常用英语词汇笔记的问题。很多人还不知道python中常用的英语单词,现在让我们一起来看看吧! 火车头采集ai伪原创插件截图: 一.什么是注释 注释是对一段代码的解释,不参与程序运行&…