深度了解flink(八) JobManager(2)initializeServices剖析

前言

上篇文章梳理了Flink 集群的模式以及通过源码分析了JobManager的启动流程,这篇文章更加细粒度的分析JobManger启动流程初始的服务。

initializeServices剖析

initializeServices的代码如下:

protected void initializeServices(Configuration configuration, PluginManager pluginManager)throws Exception {LOG.info("Initializing cluster services.");synchronized (lock) {resourceId =configuration.getOptional(JobManagerOptions.JOB_MANAGER_RESOURCE_ID).map(value ->DeterminismEnvelope.deterministicValue(new ResourceID(value))).orElseGet(() ->DeterminismEnvelope.nondeterministicValue(ResourceID.generate()));LOG.debug("Initialize cluster entrypoint {} with resource id {}.",getClass().getSimpleName(),resourceId);//创建工作目录workingDirectory =ClusterEntrypointUtils.createJobManagerWorkingDirectory(configuration, resourceId);LOG.info("Using working directory: {}.", workingDirectory);//创建rpcSystemrpcSystem = RpcSystem.load(configuration);//创建RpcServicecommonRpcService =RpcUtils.createRemoteRpcService(rpcSystem,configuration,configuration.get(JobManagerOptions.ADDRESS),getRPCPortRange(configuration),configuration.get(JobManagerOptions.BIND_HOST),configuration.getOptional(JobManagerOptions.RPC_BIND_PORT));//根据配置实例化JMX,默认关闭。通过JMX收集Flink的监控信息JMXService.startInstance(configuration.get(JMXServerOptions.JMX_SERVER_PORT));// update the configuration used to create the high availability servicesconfiguration.set(JobManagerOptions.ADDRESS, commonRpcService.getAddress());configuration.set(JobManagerOptions.PORT, commonRpcService.getPort());//创建了一个固定大小的线程池,也可以理解为执行器ioExecutor =Executors.newFixedThreadPool(ClusterEntrypointUtils.getPoolSize(configuration),new ExecutorThreadFactory("cluster-io"));//创建一个Flink中的令牌管理器,主要用到需要认证的时候,进行一次认证,以后都不在认证delegationTokenManager =DefaultDelegationTokenManagerFactory.create(configuration,pluginManager,commonRpcService.getScheduledExecutor(),ioExecutor);// Obtaining delegation tokens and propagating them to the local JVM receivers in a// one-time fashion is required because BlobServer may connect to external file systemsdelegationTokenManager.obtainDelegationTokens();//根据配置创建高可用服务haServices = createHaServices(configuration, ioExecutor, rpcSystem);//创建blob serverblobServer =BlobUtils.createBlobServer(configuration,Reference.borrowed(workingDirectory.unwrap().getBlobStorageDirectory()),haServices.createBlobStore());//启动blob serverblobServer.start();configuration.set(BlobServerOptions.PORT, String.valueOf(blobServer.getPort()));//创建心跳服务heartbeatServices = createHeartbeatServices(configuration);//故障增强器初始化failureEnrichers = FailureEnricherUtils.getFailureEnrichers(configuration);//监控指标相关服务metricRegistry = createMetricRegistry(configuration, pluginManager, rpcSystem);//指标监控查询服务final RpcService metricQueryServiceRpcService =MetricUtils.startRemoteMetricsRpcService(configuration,commonRpcService.getAddress(),configuration.get(JobManagerOptions.BIND_HOST),rpcSystem);metricRegistry.startQueryService(metricQueryServiceRpcService, null);final String hostname = RpcUtils.getHostname(commonRpcService);//组织管理指标processMetricGroup =MetricUtils.instantiateProcessMetricGroup(metricRegistry,hostname,ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));//创建ExecutionGraph的存储executionGraphInfoStore =createSerializableExecutionGraphStore(configuration, commonRpcService.getScheduledExecutor());}}

该方法是线程安全的,同步代码块内对服务进行初始化

核心服务

workingDirectory

workingDirectory 主要用于存储与 JobManager 相关的临时文件和数据

主要功能

存储临时文件:workingDirectory 用于存储 JobManager 在运行过程中生成的临时文件,例如日志文件、检查点数据、配置文件等。

数据持久化:在某些情况下,workingDirectory 也可以用于存储一些需要持久化的数据,例如 BlobServer 存储的文件。

使用场景

日志文件:JobManager 的日志文件可以存储在 workingDirectory 中,便于调试和问题排查。

检查点数据:在启用检查点的情况下,检查点数据可以暂时存储在 workingDirectory 中,然后再持久化到其他存储系统。

配置文件:一些动态生成的配置文件可以存储在 workingDirectory 中,供 JobManager 使用。

BlobServer 存储:BlobServer 会使用 workingDirectory 中的一个子目录来存储上传的二进制文件。

rpcSystem/rpcService

Flink Rpc核心组件,rpcSystem用于管理Actor的生命周期和通信,rpcService用于创建RpcServer以及管理其他组件的代理地址

主要功能

节点间通信:rpcSystem 提供了一种机制,使得 Flink 集群中的不同节点(如 JobManager 和 TaskManager)能够相互通信,执行远程方法调用。

消息传递:负责在节点之间传递消息,包括任务分配、状态更新、心跳检测等。

高可用性:支持高可用性配置,确保在某个节点故障时,通信不会中断。

使用场景

任务调度:JobManager 通过 rpcSystem 向 TaskManager 分配任务。

状态报告:TaskManager 通过 rpcSystem 向 JobManager 报告任务的执行状态。

心跳检测:JobManager 和 TaskManager 通过 rpcSystem 进行心跳检测,确保节点的健康状态。

故障恢复:在节点故障时,通过 rpcSystem 进行故障检测和恢复操作。JobManager 通过 rpcSystem 向 TaskManager 分配任务。

delegationTokenManager

delegationTokenManager 是 Flink 中用于管理委托令牌(Delegation Token)的组件。委托令牌是一种安全机制,用于在分布式系统中进行身份验证和授权。在 Flink 中,delegationTokenManager 负责生成、分发和管理这些令牌,以确保任务在不同节点之间安全地执行。

主要功能

生成委托令牌:根据配置和安全策略生成委托令牌。
分发委托令牌:将生成的委托令牌分发给需要的组件,如 TaskManager。
刷新委托令牌:定期刷新委托令牌,确保其有效性和安全性。
撤销委托令牌:在必要时撤销委托令牌,防止未授权访问。
过期处理:处理过期的委托令牌,确保系统安全。

使用场景

安全认证:在 Flink 集群中,使用委托令牌进行安全认证,确保只有授权的用户和组件可以访问资源。
任务执行:在任务执行过程中,TaskManager 使用委托令牌与外部系统(如 HDFS)进行安全通信。
数据访问:在访问敏感数据时,使用委托令牌进行身份验证和授权。

haServices

haServices 是 Flink 中用于实现高可用性(High Availability, HA)的服务组件。它负责管理 Flink 集群的高可用性特性,确保在主节点(如 JobManager)发生故障时,集群能够自动恢复并继续正常运行。

主要功能

主节点选举:通过 ZooKeeper 或其他高可用性服务实现主节点的选举。

状态存储:将 Flink 集群的状态信息存储在高可用性存储中,如 ZooKeeper 或 RocksDB。

故障检测:定期检测主节点的健康状态,如果主节点故障,则触发故障恢复流程。

故障恢复:在主节点故障时,自动选举新的主节点,并恢复集群的正常运行。

使用场景

主节点故障:当主 JobManager 发生故障时,haServices 会自动选举新的主节点,确保集群继续运行。

状态持久化:在主节点切换时,haServices 确保作业状态和检查点数据不会丢失。

集群管理:管理 Flink 集群的高可用性特性,确保集群的稳定性和可靠性。

blobServer

blobServer 是 Flink 中用于管理大对象(BLOB,Binary Large Object)的组件。它负责存储和分发 Flink 作业中使用的大型二进制数据,如 JAR 文件、配置文件和其他资源文件。通过 blobServer,Flink 可以确保这些资源在集群中的各个节点之间高效、可靠地分发。

主要功能

上传大对象:将大对象上传到 blobServer,并生成一个唯一的标识符(BlobKey)。

下载大对象:根据 BlobKey 下载大对象,确保所有节点都能访问这些资源。

删除大对象:在不再需要时,删除大对象,释放存储空间。

资源缓存:缓存频繁访问的大对象,提高访问效率。

使用场景

JAR 文件分发:在提交 Flink 作业时,将 JAR 文件上传到 blobServer,并在各个 TaskManager 上下载这些 JAR 文件。

配置文件分发:将配置文件上传到 blobServer,确保所有节点都能访问这些配置文件。

资源管理:管理作业中使用的其他大型二进制数据,如模型文件、数据集等。

heartbeatServices

heartbeatServices 是 Flink 中用于实现心跳检测和管理的组件。它负责监控 Flink 集群中各个节点的健康状态,确保节点之间的通信正常,并在节点故障时及时发现和处理。通过 heartbeatServices,Flink 可以实现高可用性和故障恢复。

主要功能

发送心跳:定期向目标节点发送心跳消息,确认节点的健康状态。

接收心跳:接收来自其他节点的心跳消息,更新节点的健康状态。

故障检测:在心跳超时时,检测节点是否故障,并触发相应的故障处理机制。

状态同步:同步节点之间的状态信息,确保集群的一致性。

使用场景

节点健康监测:定期检测 JobManager 和 TaskManager 的健康状态,确保集群的正常运行。

故障恢复:在节点故障时,及时发现并触发故障恢复机制,确保作业的连续性和一致性。

状态同步:同步各个节点的状态信息,确保集群的一致性和协调性。

failureEnrichers

跟踪JobMaster故障信息 组件FailureEnricher 允许用户自定义逻辑 以分类故障、暴露自定义指标或调用外部通知系统。这对于监控调试和优化Flink作业至关重要

metricRegistry

跟踪所有已注册的 Metrics 用来作为 MetricGroups MetricReporter 之间的连接。

主要功能

注册度量:将度量指标注册到 metricRegistry,使其可以被收集和报告。

收集度量:定期收集度量数据,确保数据的实时性和准确性。

报告度量:将度量数据报告给外部系统,支持实时监控和历史数据分析。

度量查询:提供查询接口,允许用户查询特定的度量数据。

使用场景

性能监控:监控 Flink 作业和任务的性能指标,如 CPU 使用率、内存使用情况、网络带宽等。

故障诊断:通过度量数据,帮助开发者诊断和解决性能问题。

优化分析:基于度量数据,进行性能优化和资源调度。

processMetricGroup

是一个用于组织和管理指标的类,包含添加度量和组的关键功能。

executionGraphInfoStore

ExecutionGraphInfoStore 是 Flink 中用于存储和管理 ExecutionGraph 信息的组件。ExecutionGraph 是 Flink 作业的内部表示,包含了作业的所有任务及其依赖关系。ExecutionGraphInfoStore 负责持久化这些信息,以便在需要时进行查询和恢复。通过 ExecutionGraphInfoStore,Flink 可以实现作业状态的持久化和恢复,支持高可用性和故障恢复。

主要功能

存储 ExecutionGraph 信息:将 ExecutionGraph 的信息持久化到存储中。

查询 ExecutionGraph 信息:提供查询接口,允许用户获取 ExecutionGraph 的相关信息。

恢复 ExecutionGraph:在作业失败时,从存储中恢复 ExecutionGraph,支持故障恢复和高可用性

使用场景

作业状态持久化:将 ExecutionGraph 的信息持久化到存储中,确保作业状态的持久化。

故障恢复:在作业失败时,从存储中恢复 ExecutionGraph,支持故障恢复和高可用性。

状态查询:提供查询接口,允许用户获取 ExecutionGraph 的相关信息,支持监控和调试。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/459761.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Spring框架】Spring框架的开发方式

目录 Spring框架开发方式前言具体案例导入依赖创建数据库表结构创建实体类编写持久层接口和实现类编写业务层接口和实现类配置文件的编写 IoC注解开发注解开发入门(半注解)IoC常用注解Spring纯注解方式开发 Spring整合JUnit测试 Spring框架开发方式 前言…

江协科技STM32学习- P24 DMA数据转运DMA+AD多通道

🚀write in front🚀 🔎大家好,我是黄桃罐头,希望你看完之后,能对你有所帮助,不足请指正!共同学习交流 🎁欢迎各位→点赞👍 收藏⭐️ 留言📝​…

【刷题11】CTFHub技能树sql注入系列

整数型注入 看到源码了,直接sql一套秒了 字符型注入 SQL 报错注入 构造payload 1 and (select extractvalue(1,concat(’~’,(select database())))) 后续步骤跟sql基本步骤一样 SQL 布尔注入 人工测试太麻烦,这里直接使用sqlmap,知道这有sql注入漏洞&am…

面试经典 150 题.P26. 删除有序数组中的重复项(003)

本题来自:力扣-面试经典 150 题 面试经典 150 题 - 学习计划 - 力扣(LeetCode)全球极客挚爱的技术成长平台https://leetcode.cn/studyplan/top-interview-150/ 题解: class Solution {public int removeDuplicates(int[] nums) …

docker中使用ros2humble的rviz2不显示问题

这里写目录标题 docker中使用ros2humble的rviz2不显示问题删除 Docker 镜像和容器删除 Docker 容器Linux服务器下查看系统CPU个数、核心数、(make编译最大的)线程数总结: RVIZ2 不能显示数据集 docker中使用ros2humble的rviz2不显示问题 问题描述: roo…

ELK + Filebeat + Spring Boot:日志分析入门与实践(二)

目录 一、环境 1.1 ELKF环境 1.2 版本 1.3 流程 二、Filebeat安装 2.1 安装 2.2 新增配置采集日志 三、logstash 配置 3.1 配置输出日志到es 3.2 Grok 日志格式解析 3.2 启动 logstash ​3.3 启动项目查看索引 一、环境 1.1 ELKF环境 springboot项目:w…

基于SSM土家风景文化管理系统的设计

管理员账户功能包括:系统首页,个人中心,用户管理,景点分类管理,热门景点管理,门票订单管理,旅游线路管理,系统管理 前提账号功能包括:系统首页,个人中心&…

Linux特种文件系统--tmpfs文件系统

tmpfs类似于RamDisk(只能使用物理内存),使用虚拟内存(简称VM)子系统的页面存储文件。tmpfs完全依赖VM,遵循子系统的整体调度策略。说白了tmpfs跟普通进程差不多,使用的都是某种形式的虚拟内存&a…

不同概率分布的更新过程——Python实现(均匀分布、卡方分布、指数分布等作为概率分布的更新过程)

更新过程(renewal process)是描述元件或设备更新现象的一类随机过程。以下是对更新过程的详细介绍: 一、定义与特点 定义:设对某元件的工作进行观测,假定元件的使用寿命是一随机变量,当元件发生故障时就进行修理或换上新的同类元件,而且元件的更新是即时的(修理或更换…

GIT分布式版本控制系统基础操作

问题大纲 1、什么分布式版本控制系统 2、简述Git的使用分为哪几个步骤 3、克隆和拉取的区别是什么? 4、git相关的所有指令 一、分布式版本控制系统 分布式版本控制系统是一种版本控制系统,它允许每个用户都拥有完整的项目历史记录和版本控制信息。与…

ArcGIS必会的选择要素方法(AND、OR、R、IN等)位置选择等

今天来看看ArcGIS中的几个选择的重要使用方法 1、常规选择、 2、模糊查询、 3、组合复合条件查询(AND、OR、IN), 4、空值NULL查询 5、位置选择 推荐学习: 以梦为马,超过万名学员学习ArcGIS入门到实战的应用课程…

Spring Bean创建流程

Spring Bean 创建流程图 大家总是会错误的理解Bean的“实例化”和“初始化”过程,总会以为初始化就是对象执行构造函数生成对象实例的过程,其实不然,在初始化阶段实际对象已经实例化出来了,初始化阶段进行的是依赖的注入和执行一…

rtp协议:rtcp包格式和传输间隔

RTP Control Protocol -- RTCP-rtp控制协议 实时传输控制协议(RTCP)基于对会话中的所有参与者定期传输控制包,使用与数据包相同的分发机制。底层协议必须提供数据包和控制包的多路复用,例如使用UDP时使用不同的端口号。RTCP执行四…

2024年医疗人工智能研究报告-生成式AI爆发,医疗人工智能走到新的十字路口(附下载)

前言 2024的医疗AI,既是坎坷,又是新生。 快速发展的大语言模型,携着生成式AI掠过医疗领域。过往的互联网医疗、医学影像、新药研发……一个一个场景经由新一代AI重塑,焕发出前所未有的价值。 不过,发现价值并不意味着…

网络请求自定义header导致跨域问题

我记得我的项目之前已经解决了跨域问题。 后来在功能开发着,需要添加一个自定义的header,发现又出现跨域报错。 于是又开始一通摸索折腾。 我的项目前面端是用axios网络请求,通过拦截器添加header,代码如下: //添加请…

macOS 15 Sequoia dmg格式转用于虚拟机的iso格式教程

想要把dmg格式转成iso格式,然后能在虚拟机上用,最起码新版的macOS镜像是不能用UltraISO,dmg2iso这种软件了,你直接转放到VMware里绝对读不出来,办法就是,在Mac系统中转换为cdr,然后再转成iso&am…

大语言模型数据流程源码解读(基于llama3模型)

文章目录 前言一、数据进入LlamaForCausalLM(LlamaPreTrainedModel)类二、数据进入LlamaModel(LlamaPreTrainedModel)类1、input_ids的embedding编码2、position_ids位置获取3、causal_mask因果mask构建1、causal_mask调用2、因果mask代码解读(_update_causal_mask)4、hidden_s…

MATLAB人脸考勤系统

MATLAB人脸考勤系统课题介绍 该课题为基于MATLAB平台的人脸识别系统。传统的人脸识别都是直接人头的比对,现实意义不大,没有一定的新意。该课题识别原理为:先采集待识别人员的人脸,进行训练,得到人脸特征值。测试的时…

Http 状态码 301 Permanent Rediret 302 Temporary Redirect、 重定向 重写

HTTP状态码301和302是什么? 1、HTTP状态码301 HTTP状态码301表示永久性转移(Permanent Redirect),这意味着请求的资源已经被分配了一个新的URI,以后的引用应该使用资源现在所指的URI。 HTTP 301状态码表示请求的资源…

如何用猿大师办公助手实现OA系统中Word公文/合同在线编辑及流转?

在OA系统或者合同管理系统中,我们会经常遇到网页在线编辑Word文档形式的公文及合同的情况,并且需要上级对下级的公文进行批注等操作,或者不同部门的人需要签字审核,这就需要用到文档流转功能,如何用猿大师办公助手实现…