Flink源码解析之:如何根据JobGraph生成ExecutionGraph

Flink源码解析之:如何根据JobGraph生成ExecutionGraph

在上一篇Flink源码解析中,我们介绍了Flink如何根据StreamGraph生成JobGraph的流程,并着重分析了其算子链的合并过程和JobGraph的构造流程。

对于StreamGraph和JobGraph的生成来说,其都是在客户端生成的,本文将会讲述JobGraph到ExecutionGraph的生成过程,而这一过程会在Flink JobManager的服务端来完成。当JobGraph从客户端提交到JobManager后,JobManager会根据JobGraph生成对应的ExecutionGraph,而ExecutionGraph就是Flink作业调度时使用的核心数据结构。 本篇将会详细介绍JobGraph转换为ExecutionGraph的流程。

主体流程梳理

Flink在将JobGraph转换成ExecutionGraph后,便可以开始执行真正的任务。这一转换流程主要在Flink源码中的DefaultExecutionGraphBuilder类中的buildGraph方法中实现的。在转换过程中,涉及到了一些新的基本概念,先来简单介绍一下这些概念,对于理解ExecutionGraph有较大的帮助:

  • ExecutionJobVertex: 在ExecutionGraph中表示执行顶点,与JobGraph中的JobVertex一一对应。实际上,每个ExecutionJobVertex也是依赖JobVertex来创建的。
  • ExecutionVertex: 在ExecutionJobVertex类中创建,每个并发度都对应了一个ExecutionVertex对象,每个ExecutionVertex都代表JobVertex在某个特定并行子任务中的执行。在实际执行时,每个ExecutionVertex实际上就是一个Task,是ExecutionJobVertex并行执行的一个子任务。
  • Execution: Execution表示ExecutionVertex的一次执行。由于ExecutionVertex可以被执行多次(用于恢复、重新计算、重新分配),这个类用于跟踪该ExecutionVertex的单个执行状态和资源。
  • IntermediateResult: 在JobGraph中用IntermediateDataSet表示上游JobVertex的输出数据流,而在ExecutionGraph中,则用IntermediateResult来表示ExecutionJobVertex的输出数据流。
  • IntermediateResultPartition:这是IntermediateResult的一部分或一个分片。由于有多个并行任务(ExecutionVertex)执行相同的操作,每个任务都会产生一部分IntermediateResult。这些结果在物理存储和计算过程中,可能会被进一步划分成多个分区,每个分区对应一个 IntermediateResultPartition对象。

从上面的基本概念也可以看出,在ExecutionGraph中:

  • 相比StreamGraph和JobGraph,ExecutionGraph是实际根据任务并行度来生成拓扑结构的,在ExecutionGraph中,每个并行子任务都对应一个ExecutionVertex顶点和IntermediateResultPartition输出数据流分区。
  • 在ExecutionGraph中,上下游节点之间的连接是通过ExecutionVertex -> IntermediateResultPartition -> ExecutionVertex 对象来完成的。

整体的执行流程图如下所示:
在这里插入图片描述

入口方法:DefaultExecutionGraphBuilder.buildGraph

ExecutionGraph的生成是在DefaultExecutionGraphBuilder类的buildGraph方法中实现的:

public class DefaultExecutionGraphBuilder {public static DefaultExecutionGraph buildGraph(JobGraph jobGraph,Configuration jobManagerConfig,ScheduledExecutorService futureExecutor,Executor ioExecutor,ClassLoader classLoader,CompletedCheckpointStore completedCheckpointStore,CheckpointsCleaner checkpointsCleaner,CheckpointIDCounter checkpointIdCounter,Time rpcTimeout,MetricGroup metrics,BlobWriter blobWriter,Logger log,ShuffleMaster<?> shuffleMaster,JobMasterPartitionTracker partitionTracker,TaskDeploymentDescriptorFactory.PartitionLocationConstraint partitionLocationConstraint,ExecutionDeploymentListener executionDeploymentListener,ExecutionStateUpdateListener executionStateUpdateListener,long initializationTimestamp,VertexAttemptNumberStore vertexAttemptNumberStore,VertexParallelismStore vertexParallelismStore)throws JobExecutionException, JobException {checkNotNull(jobGraph, "job graph cannot be null");final String jobName = jobGraph.getName();final JobID jobId = jobGraph.getJobID();// 创建JobInformationfinal JobInformation jobInformation =new JobInformation(jobId,jobName,jobGraph.getSerializedExecutionConfig(),jobGraph.getJobConfiguration(),jobGraph.getUserJarBlobKeys(),jobGraph.getClasspaths());// Execution 保留的最大历史数final int maxPriorAttemptsHistoryLength =jobManagerConfig.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE);// IntermediateResultPartitions的释放策略final PartitionGroupReleaseStrategy.Factory partitionGroupReleaseStrategyFactory =PartitionGroupReleaseStrategyFactoryLoader.loadPartitionGroupReleaseStrategyFactory(jobManagerConfig);// create a new execution graph, if none exists so farfinal DefaultExecutionGraph executionGraph;try {// 创建默认的ExecutionGraph执行图对象,最后会返回该创建好的执行图对象executionGraph =new DefaultExecutionGraph(jobInformation,futureExecutor,ioExecutor,rpcTimeout,maxPriorAttemptsHistoryLength,classLoader,blobWriter,partitionGroupReleaseStrategyFactory,shuffleMaster,partitionTracker,partitionLocationConstraint,executionDeploymentListener,executionStateUpdateListener,initializationTimestamp,vertexAttemptNumberStore,vertexParallelismStore);} catch (IOException e) {throw new JobException("Could not create the ExecutionGraph.", e);}// set the basic propertiestry {executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));} catch (Throwable t) {log.warn("Cannot create JSON plan for job", t);// give the graph an empty planexecutionGraph.setJsonPlan("{}");}// initialize the vertices that have a master initialization hook// file output formats create directories here, input formats create splitsfinal long initMasterStart = System.nanoTime();log.info("Running initialization on master for job {} ({}).", jobName, jobId);for (JobVertex vertex : jobGraph.getVertices()) {String executableClass = vertex.getInvokableClassName();if (executableClass == null || executableClass.isEmpty()) {throw new JobSubmissionException(jobId,"The vertex "+ vertex.getID()+ " ("+ vertex.getName()+ ") has no invokable class.");}try {vertex.initializeOnMaster(classLoader);} catch (Throwable t) {throw new JobExecutionException(jobId,"Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(),t);}}log.info("Successfully ran initialization on master in {} ms.",(System.nanoTime() - initMasterStart) / 1_000_000);// topologically sort the job vertices and attach the graph to the existing one// 这里会先做一个排序,source源节点会放在最前面,接着开始遍历// 必须保证当前添加到集合的节点的前置节点都已经添加进去了List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();if (log.isDebugEnabled()) {log.debug("Adding {} vertices from job graph {} ({}).",sortedTopology.size(),jobName,jobId);}// 构建执行图的重点方法。生成具体的ExecutionGraphexecutionGraph.attachJobGraph(sortedTopology);if (log.isDebugEnabled()) {log.debug("Successfully created execution graph from job graph {} ({}).", jobName, jobId);}// configure the state checkpointing// checkpoint的相关配置if (isCheckpointingEnabled(jobGraph)) {JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();// Maximum number of remembered checkpointsint historySize = jobManagerConfig.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);CheckpointStatsTracker checkpointStatsTracker =new CheckpointStatsTracker(historySize,snapshotSettings.getCheckpointCoordinatorConfiguration(),metrics);// load the state backend from the application settingsfinal StateBackend applicationConfiguredBackend;final SerializedValue<StateBackend> serializedAppConfigured =snapshotSettings.getDefaultStateBackend();if (serializedAppConfigured == null) {applicationConfiguredBackend = null;} else {try {applicationConfiguredBackend =serializedAppConfigured.deserializeValue(classLoader);} catch (IOException | ClassNotFoundException e) {throw new JobExecutionException(jobId, "Could not deserialize application-defined state backend.", e);}}// StateBackend配置final StateBackend rootBackend;try {rootBackend =StateBackendLoader.fromApplicationOrConfigOrDefault(applicationConfiguredBackend,snapshotSettings.isChangelogStateBackendEnabled(),jobManagerConfig,classLoader,log);} catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e) {throw new JobExecutionException(jobId, "Could not instantiate configured state backend", e);}// load the checkpoint storage from the application settingsfinal CheckpointStorage applicationConfiguredStorage;final SerializedValue<CheckpointStorage> serializedAppConfiguredStorage =snapshotSettings.getDefaultCheckpointStorage();if (serializedAppConfiguredStorage == null) {applicationConfiguredStorage = null;} else {try {applicationConfiguredStorage =serializedAppConfiguredStorage.deserializeValue(classLoader);} catch (IOException | ClassNotFoundException e) {throw new JobExecutionException(jobId,"Could not deserialize application-defined checkpoint storage.",e);}}final CheckpointStorage rootStorage;try {rootStorage =CheckpointStorageLoader.load(applicationConfiguredStorage,null,rootBackend,jobManagerConfig,classLoader,log);} catch (IllegalConfigurationException | DynamicCodeLoadingException e) {throw new JobExecutionException(jobId, "Could not instantiate configured checkpoint storage", e);}// instantiate the user-defined checkpoint hooks// 示例化用户自定义的cp hookfinal SerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks =snapshotSettings.getMasterHooks();final List<MasterTriggerRestoreHook<?>> hooks;if (serializedHooks == null) {hooks = Collections.emptyList();} else {final MasterTriggerRestoreHook.Factory[] hookFactories;try {hookFactories = serializedHooks.deserializeValue(classLoader);} catch (IOException | ClassNotFoundException e) {throw new JobExecutionException(jobId, "Could not instantiate user-defined checkpoint hooks", e);}final Thread thread = Thread.currentThread();final ClassLoader originalClassLoader = thread.getContextClassLoader();thread.setContextClassLoader(classLoader);try {hooks = new ArrayList<>(hookFactories.length);for (MasterTriggerRestoreHook.Factory factory : hookFactories) {hooks.add(MasterHooks.wrapHook(factory.create(), classLoader));}} finally {thread.setContextClassLoader(originalClassLoader);}}final CheckpointCoordinatorConfiguration chkConfig =snapshotSettings.getCheckpointCoordinatorConfiguration();// 创建CheckpointCoordinator对象executionGraph.enableCheckpointing(chkConfig,hooks,checkpointIdCounter,completedCheckpointStore,rootBackend,rootStorage,checkpointStatsTracker,checkpointsCleaner);}// create all the metrics for the Execution Graph// 添加metrics指标metrics.gauge(RestartTimeGauge.METRIC_NAME, new RestartTimeGauge(executionGraph));metrics.gauge(DownTimeGauge.METRIC_NAME, new DownTimeGauge(executionGraph));metrics.gauge(UpTimeGauge.METRIC_NAME, new UpTimeGauge(executionGraph));return executionGraph;}

在这个方法里,会先创建一个 ExecutionGraph 对象,然后对 JobGraph 中的 JobVertex 列表做一下排序(先把有 source 节点的 JobVertex 放在最前面,然后开始遍历,只有当前 JobVertex 的前置节点都已经添加到集合后才能把当前 JobVertex 节点添加到集合中),最后通过过 attachJobGraph() 方法生成具体的ExecutionGraph。

在上面的代码中,最需要核心关注的方法是:executionGraph.attachJobGraph(sortedTopology);。该方法是创建ExecutionGraph的核心方法,包括了创建上面我们说的各种ExecutionGraph中涉及的对象,以及连接它们来形成ExecutionGraph拓扑结构。

接下来我们进入该方法来一探究竟。

生成ExecutionGraph:attachJobGraph

先来看下attachJobGraph方法的实现:

public void attachJobGraph(List<JobVertex> topologicallySorted) throws JobException {assertRunningInJobMasterMainThread();LOG.debug("Attaching {} topologically sorted vertices to existing job graph with {} "+ "vertices and {} intermediate results.",topologicallySorted.size(),tasks.size(),intermediateResults.size());final long createTimestamp = System.currentTimeMillis();// 遍历排序好的拓扑JobVertexfor (JobVertex jobVertex : topologicallySorted) {if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {this.isStoppable = false;}// 获取节点并行度信息VertexParallelismInformation parallelismInfo =parallelismStore.getParallelismInfo(jobVertex.getID());// create the execution job vertex and attach it to the graph// 创建ExecutionJobVertexExecutionJobVertex ejv =new ExecutionJobVertex(this,jobVertex,maxPriorAttemptsHistoryLength,rpcTimeout,createTimestamp,parallelismInfo,initialAttemptCounts.getAttemptCounts(jobVertex.getID()));// 重要方法!!!// 构建ExecutionGraph,连接上下游节点ejv.connectToPredecessors(this.intermediateResults);ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);if (previousTask != null) {throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]",jobVertex.getID(), ejv, previousTask));}// 遍历ExecutionJobVertex的输出IntermediateResultfor (IntermediateResult res : ejv.getProducedDataSets()) {IntermediateResult previousDataSet =this.intermediateResults.putIfAbsent(res.getId(), res);if (previousDataSet != null) {throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]",res.getId(), res, previousDataSet));}}this.verticesInCreationOrder.add(ejv);this.numVerticesTotal += ejv.getParallelism();}//将所有的执行顶点和结果分区注册到分布式资源管理系统中,以便能够进行分布式调度。
registerExecutionVerticesAndResultPartitions(this.verticesInCreationOrder);// the topology assigning should happen before notifying new vertices to failoverStrategy// 转换执行拓扑executionTopology = DefaultExecutionTopology.fromExecutionGraph(this);partitionGroupReleaseStrategy =// 创建部分组释放策略的方法,依赖于当前的调度的拓扑结构,这决定了当何时释放特定的中间数据结果所需的策略。
partitionGroupReleaseStrategyFactory.createInstance(getSchedulingTopology());
}

在上面attchGraph方法中,首先遍历输入的排序后的JobVertex列表,对每一个JobVertex:

  • 判断是否停止: 对于单个 JobVertex,如果它是一个输入顶点且不可停止,则整个 Job 不可停止。这在流处理任务中是常见的,一些输入数据源可能无法停止(如Kafka)。
  • 获取并行信息并创建执行的顶点: 根据JobVertex的ID,从parallelismStore中获取并行信息。利用这些信息创建ExecutionJobVertex实例,它代表运行在特定TaskManager上的taskId,可以是待调度、运行或已完成的。
  • 判断新添加的顶点是否已经存在: 如果试图添加一个已经存在的顶点,这意味着存在程序错误,因为每个JobVertex应当有唯一的ID。这将抛出异常。
  • 判断数据集是否已经存在: 同样。如果试图添加一个已经存在的IntermediateResult,这将抛出异常。
  • 添加执行顶点到创建顺序列表和增加总的顶点数量: 记录创建顶点的顺序能够确保在执行时能够按照正确的依赖关系进行。并同时更新总的顶点数量。

遍历完成后, 注册执行顶点和结果分区,将所有的执行顶点和结果分区注册到分布式资源管理系统中,以便能够进行分布式调度。

利用DefaultExecutionTopology工具类将ExecutionGraph转换为SchedulingTopology,这样便于任务调度器进行处理。

最后,调用partitionGroupReleaseStrategyFactory.createInstance(getSchedulingTopology())根据当前的调度的拓扑结构来创建组释放策略,这决定了当何时释放特定的中间数据结果所需的策略。

上面流程中,最需要关注的方法就是new ExecutionJobVertexejv.connectToPredecessors(this.intermediateResults);

接下来,我们分别对其进行探究。

创建 ExecutionJobVertex 对象

进入到该方法的源码中:

@VisibleForTesting
public ExecutionJobVertex(InternalExecutionGraphAccessor graph,JobVertex jobVertex,int maxPriorAttemptsHistoryLength,Time timeout,long createTimestamp,VertexParallelismInformation parallelismInfo,SubtaskAttemptNumberStore initialAttemptCounts)throws JobException {if (graph == null || jobVertex == null) {throw new NullPointerException();}this.graph = graph;this.jobVertex = jobVertex;this.parallelismInfo = parallelismInfo;// verify that our parallelism is not higher than the maximum parallelismif (this.parallelismInfo.getParallelism() > this.parallelismInfo.getMaxParallelism()) {throw new JobException(String.format("Vertex %s's parallelism (%s) is higher than the max parallelism (%s). Please lower the parallelism or increase the max parallelism.",jobVertex.getName(),this.parallelismInfo.getParallelism(),this.parallelismInfo.getMaxParallelism()));}this.resourceProfile =ResourceProfile.fromResourceSpec(jobVertex.getMinResources(), MemorySize.ZERO);this.taskVertices = new ExecutionVertex[this.parallelismInfo.getParallelism()];this.inputs = new ArrayList<>(jobVertex.getInputs().size());// take the sharing groupthis.slotSharingGroup = checkNotNull(jobVertex.getSlotSharingGroup());this.coLocationGroup = jobVertex.getCoLocationGroup();// create the intermediate resultsthis.producedDataSets =new IntermediateResult[jobVertex.getNumberOfProducedIntermediateDataSets()];for (int i = 0; i < jobVertex.getProducedDataSets().size(); i++) {final IntermediateDataSet result = jobVertex.getProducedDataSets().get(i);this.producedDataSets[i] =new IntermediateResult(result.getId(),this,this.parallelismInfo.getParallelism(),result.getResultType());}// create all task verticesfor (int i = 0; i < this.parallelismInfo.getParallelism(); i++) {ExecutionVertex vertex =new ExecutionVertex(this,i,producedDataSets,timeout,createTimestamp,maxPriorAttemptsHistoryLength,initialAttemptCounts.getAttemptCount(i));this.taskVertices[i] = vertex;}// sanity check for the double referencing between intermediate result partitions and// execution verticesfor (IntermediateResult ir : this.producedDataSets) {if (ir.getNumberOfAssignedPartitions() != this.parallelismInfo.getParallelism()) {throw new RuntimeException("The intermediate result's partitions were not correctly assigned.");}}final List<SerializedValue<OperatorCoordinator.Provider>> coordinatorProviders =getJobVertex().getOperatorCoordinators();if (coordinatorProviders.isEmpty()) {this.operatorCoordinators = Collections.emptyList();} else {final ArrayList<OperatorCoordinatorHolder> coordinators =new ArrayList<>(coordinatorProviders.size());try {for (final SerializedValue<OperatorCoordinator.Provider> provider :coordinatorProviders) {coordinators.add(OperatorCoordinatorHolder.create(provider, this, graph.getUserClassLoader()));}} catch (Exception | LinkageError e) {IOUtils.closeAllQuietly(coordinators);throw new JobException("Cannot instantiate the coordinator for operator " + getName(), e);}this.operatorCoordinators = Collections.unmodifiableList(coordinators);}// set up the input splits, if the vertex has anytry {@SuppressWarnings("unchecked")InputSplitSource<InputSplit> splitSource =(InputSplitSource<InputSplit>) jobVertex.getInputSplitSource();if (splitSource != null) {Thread currentThread = Thread.currentThread();ClassLoader oldContextClassLoader = currentThread.getContextClassLoader();currentThread.setContextClassLoader(graph.getUserClassLoader());try {inputSplits =splitSource.createInputSplits(this.parallelismInfo.getParallelism());if (inputSplits != null) {splitAssigner = splitSource.getInputSplitAssigner(inputSplits);}} finally {currentThread.setContextClassLoader(oldContextClassLoader);}} else {inputSplits = null;}} catch (Throwable t) {throw new JobException("Creating the input splits caused an error: " + t.getMessage(), t);}
}

在上面这段代码中,主要实现了ExecutionVertex的创建和IntermediateResult对象的创建:

  • 遍历当前JobVertex的输出IntermediateDataSet列表,并根据IntermediateDataSet来创建相应的IntermediateResult对象。每个IntermediateDataSet都会对应一个IntermediateResult
  • 根据当前JobVertex的并发度,来创建相同数量的ExecutionVertex对象,每个ExecutionVertex对象代表一个并行计算任务,在实际执行时就是一个Task任务。

创建ExecutionVertex对象

进一步地,我们观察创建ExecutionVertex对象的实现逻辑如下所示:

public ExecutionVertex(ExecutionJobVertex jobVertex,int subTaskIndex,IntermediateResult[] producedDataSets,Time timeout,long createTimestamp,int maxPriorExecutionHistoryLength,int initialAttemptCount) {this.jobVertex = jobVertex;this.subTaskIndex = subTaskIndex;this.executionVertexId = new ExecutionVertexID(jobVertex.getJobVertexId(), subTaskIndex);this.taskNameWithSubtask =String.format("%s (%d/%d)",jobVertex.getJobVertex().getName(),subTaskIndex + 1,jobVertex.getParallelism());this.resultPartitions = new LinkedHashMap<>(producedDataSets.length, 1);// 根据IntermediateResult创建当前subTaskIndex分区下的IntermediateResultPartitonfor (IntermediateResult result : producedDataSets) {IntermediateResultPartition irp =new IntermediateResultPartition(result,this,subTaskIndex,getExecutionGraphAccessor().getEdgeManager());// 记录当前分区的irp到ir中result.setPartition(subTaskIndex, irp);// 记录分区ip与irp的对应关系resultPartitions.put(irp.getPartitionId(), irp);}this.priorExecutions = new EvictingBoundedList<>(maxPriorExecutionHistoryLength);// 创建对应的Execution对象,初始化时initialAttempCount为0,如果后面重新调度这个task,它会自增加1this.currentExecution =new Execution(getExecutionGraphAccessor().getFutureExecutor(),this,initialAttemptCount,createTimestamp,timeout);getExecutionGraphAccessor().registerExecution(currentExecution);this.timeout = timeout;this.inputSplits = new ArrayList<>();
}

上述创建ExecutionVertex的过程主要实现了以下步骤:

  1. 生成中间结果分区IntermediateResultPartition
    中间结果分区代表一个并行任务产生的输出,同一并行任务可能会有多个输出(对应多个后续任务),也就是多个中间结果分区。
  • 基于 result,在相应的索引 subTaskIndex 上创建一个 IntermediateResultPartition 并给它赋值。IntermediateResultPartition 提供了并行任务的输出数据,对应于某个特定执行顶点 ExecutionVertex 的并行子任务。
  • 在创建过程中,需要使用 getExecutionGraphAccessor().getEdgeManager() 获取边管理器,边管理器是用于维护这个分区与其它 ExecutionVertex 之间的连接关系。
  • 记录这个 IntermediateResultPartition 到 result 中的相应索引位置,并在 resultPartitions 映射表中保存 IntermediateResultPartition。
  1. 创建执行(Execution)对象:
    这一过程是基于 Execution 的构造函数引发的。它用于代表该 ExecutionVertex 在某一特定点时间的一次尝试执行。创建 Execution 实例后,会将其注册到执行图(ExecutionGraph)中,以便于后续调度和执行任务。

通过以上流程,生成了中间结果分区,映射了每一个分区和其对应的任务关系,并且创建了 Execution 对象用于管理并跟踪任务的执行状态。

在创建好ExecutionVertex和IntermediateResultPartition后,根据上面的流程图,就是考虑如何将它们进行连接生成ExecutionGraph了。

这部分的实现逻辑就在attachJobGraph方法的ejv.connectToPredecessors(this.intermediateResults);方法中实现的。

生成ExecutionGraph

同样地,我们进入源码来深入观察一下实现逻辑:

public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> intermediateDataSets)throws JobException {List<JobEdge> inputs = jobVertex.getInputs();if (LOG.isDebugEnabled()) {LOG.debug(String.format("Connecting ExecutionJobVertex %s (%s) to %d predecessors.",jobVertex.getID(), jobVertex.getName(), inputs.size()));}for (int num = 0; num < inputs.size(); num++) {JobEdge edge = inputs.get(num);if (LOG.isDebugEnabled()) {if (edge.getSource() == null) {LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via ID %s.",num,jobVertex.getID(),jobVertex.getName(),edge.getSourceId()));} else {LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via predecessor %s (%s).",num,jobVertex.getID(),jobVertex.getName(),edge.getSource().getProducer().getID(),edge.getSource().getProducer().getName()));}}// fetch the intermediate result via ID. if it does not exist, then it either has not// been created, or the order// in which this method is called for the job vertices is not a topological orderIntermediateResult ires = intermediateDataSets.get(edge.getSourceId());if (ires == null) {throw new JobException("Cannot connect this job graph to the previous graph. No previous intermediate result found for ID "+ edge.getSourceId());}this.inputs.add(ires);EdgeManagerBuildUtil.connectVertexToResult(this, ires, edge.getDistributionPattern());}
}

这段代码主要完成了将当前的ExecutionJobVertex与其前置任务(predecessors)连接的流程。传入的参数intermediateDatasets包含了JobGraph中所有的中间计算结果,这些结果是由上游前置任务产生的。

需要注意的是,该过程要求连接操作的执行顺序应遵循任务的拓扑顺序。Flink的计算任务通常由多个阶段组成,每个阶段的输出是下一个阶段的输入,每个阶段(JobVertex)都处理一种类型的计算,例如map或reduce。

流程大致如下:

  • 获取输入: 首先获取jobVertex的输入,输入是JobEdge列表,每一条JobEdge都代表一个上游产生的中间数据集和连接上下游的方式(例如HASH, BROADCAST)。
  • 循环处理每个输入: 然后遍历这些inputs,对于每一条JobEdge:
    • 基于edge.getSourceId()从intermediateDatasets获取IntermediateResult,这是一个中间计算结果。
    • 检查该中间结果是否存在,如果不存在,则表示这不是一个拓扑排序,因为预期的情况是当你尝试访问一个中间结果时,它应该已经被创建了。如果找不到,抛出一个异常。
    • 如果存在(没有异常被抛出),将找到的IntermediateResult添加到ExecutionJobVertex的inputs(List类型)中,这样当前任务就知道它的输入来自哪些中间结果。
    • 调用EdgeManagerBuildUtil.connectVertexToResult方法来建立当前ExecutionJobVertex与找到的IntermediateResult之间的连接。 EdgeManager是Flink中负责管理输入输出边的组件,它显示地记录了发送端的分区和接收端的分区对应关系。

这个流程重要的是建立了Job中每个任务的执行依赖关系,并明确了数据传输的方式,让任务在执行时清楚自己的输入来自哪里,当任务执行完成后,它产生的输出会通过何种方式被发送到哪些任务。

具体的连接方式,我们需要继续进入到EdgeManagerBuildUtil.connectVertexToResult方法中。其源码如下所示:

/*** Calculate the connections between {@link ExecutionJobVertex} and {@link IntermediateResult} ** based on the {@link DistributionPattern}.** @param vertex the downstream consumer {@link ExecutionJobVertex}* @param intermediateResult the upstream consumed {@link IntermediateResult}* @param distributionPattern the {@link DistributionPattern} of the edge that connects the*     upstream {@link IntermediateResult} and the downstream {@link IntermediateResult}*/
static void connectVertexToResult(ExecutionJobVertex vertex,IntermediateResult intermediateResult,DistributionPattern distributionPattern) {switch (distributionPattern) {// 点对点的连接方式case POINTWISE:connectPointwise(vertex.getTaskVertices(), intermediateResult);break;// 全连接的方式case ALL_TO_ALL:connectAllToAll(vertex.getTaskVertices(), intermediateResult);break;default:throw new IllegalArgumentException("Unrecognized distribution pattern.");}
}

会根据DistributionPattern选择不同的连接方式,这里主要分两种情况,DistributionPattern是跟Partitioner的配置有关。

这里以POINTWISE的连接方式来举例,看一下其是如何在构造ExecutionGraph时连接上下游节点的。

private static void connectPointwise(ExecutionVertex[] taskVertices, IntermediateResult intermediateResult) {final int sourceCount = intermediateResult.getPartitions().length;final int targetCount = taskVertices.length;if (sourceCount == targetCount) {for (int i = 0; i < sourceCount; i++) {ExecutionVertex executionVertex = taskVertices[i];IntermediateResultPartition partition = intermediateResult.getPartitions()[i];ConsumerVertexGroup consumerVertexGroup =ConsumerVertexGroup.fromSingleVertex(executionVertex.getID());partition.addConsumers(consumerVertexGroup);ConsumedPartitionGroup consumedPartitionGroup =createAndRegisterConsumedPartitionGroupToEdgeManager(partition.getPartitionId(), intermediateResult);executionVertex.addConsumedPartitionGroup(consumedPartitionGroup);}} else if (sourceCount > targetCount) {for (int index = 0; index < targetCount; index++) {ExecutionVertex executionVertex = taskVertices[index];ConsumerVertexGroup consumerVertexGroup =ConsumerVertexGroup.fromSingleVertex(executionVertex.getID());int start = index * sourceCount / targetCount;int end = (index + 1) * sourceCount / targetCount;List<IntermediateResultPartitionID> consumedPartitions =new ArrayList<>(end - start);for (int i = start; i < end; i++) {IntermediateResultPartition partition = intermediateResult.getPartitions()[i];partition.addConsumers(consumerVertexGroup);consumedPartitions.add(partition.getPartitionId());}ConsumedPartitionGroup consumedPartitionGroup =createAndRegisterConsumedPartitionGroupToEdgeManager(consumedPartitions, intermediateResult);executionVertex.addConsumedPartitionGroup(consumedPartitionGroup);}} else {for (int partitionNum = 0; partitionNum < sourceCount; partitionNum++) {IntermediateResultPartition partition =intermediateResult.getPartitions()[partitionNum];ConsumedPartitionGroup consumedPartitionGroup =createAndRegisterConsumedPartitionGroupToEdgeManager(partition.getPartitionId(), intermediateResult);int start = (partitionNum * targetCount + sourceCount - 1) / sourceCount;int end = ((partitionNum + 1) * targetCount + sourceCount - 1) / sourceCount;List<ExecutionVertexID> consumers = new ArrayList<>(end - start);for (int i = start; i < end; i++) {ExecutionVertex executionVertex = taskVertices[i];executionVertex.addConsumedPartitionGroup(consumedPartitionGroup);consumers.add(executionVertex.getID());}ConsumerVertexGroup consumerVertexGroup =ConsumerVertexGroup.fromMultipleVertices(consumers);partition.addConsumers(consumerVertexGroup);}}
}

上面这段代码的目的是通过“点对点”的方式(即每个生产者产生的数据只被一个消费者消费)建立任务节点(ExecutionVertex)与中间结果集(IntermediateResultPartition)之间的连接关系。

这个方法的逻辑主要是根据上游任务产生的IntermediateResultPartition的数量(源)和下游ExecutionVertex节点数量(目标)的比例关系,做不同的操作:

  • 源和目标数量相等:方法会将每个源中间结果分区与对应的下游ExecutionVertex节点连接。这种情况下,每个任务都完全独立,只会消费一个特定的上游中间结果分区
  • 源数量大于目标数量:源中间结果分区会被尽可能平均地分配给下游ExecutionVertex节点,即每个ExecutionVertex可能会消费多个源中间结果分区数据。
  • 源数量小于目标数量:每个源中间结果分区可能会被分配给多个下游ExecutionVertex节点消费,即多个ExecutionVertex节点可能消费同一个源中间结果分区数据。

⠀在执行连接的过程中,会创建ConsumerVertexGroup和ConsumedPartitionGroup:

  • ConsumerVertexGroup包含一组接收同一个中间结果分区(IntermediateResultPartition)的顶点集合。
  • ConsumedPartitionGroup包含顶点要消费的一组中间结果分区。

⠀注意,当源数量小于目标数量时,会有多个任务消费同一个源数据,所以需要使用ConsumerVertexGroup.fromMultipleVertices(consumers)来创建ConsumerVertexGroup。

几种连接情况的示例图如下所示:
在这里插入图片描述

到这里,这个作业的 ExecutionGraph 就创建完成了,有了 ExecutionGraph,JobManager 才能对这个作业做相应的调度。

总结

本文详细介绍了JobGraph生成ExecutionGraph的流程,介绍了ExecutionJobVertex、ExecutionVertex、IntermediateResult、IntermediateResultPartition相关概念的原理和生成过程。最后我们介绍了Flink在生成ExecutionGraph时是如何实现IntermediateResultPartition和ExecutionVertex的连接的。

到这里,StreamGraph、JobGraph和Execution的生成过程,在最近的三篇文章中都已经详细讲解完成了,当然除了我们介绍的内容外,还有更多的实现细节没有介绍,有兴趣的读者可以参考文本来阅读源码,以此加深自己的理解和对更多实现细节的挖掘。

最后,再对StreamGraph、JobGraph和ExecutionGraph做一个总结:

  • StreamGraph. StreamGraph 是表示 Flink 流计算的图模型,它是用户定义的计算逻辑的内部表示形式,是最原始的用户逻辑,一个没有做任何优化的DataFlow;
  • JobGraph. JobGraph 由一个或多个 JobVertex 对象和它们之间的 JobEdge 对象组成,包含并行任务的信息。在JobGraph中对StreamGraph进行了优化,将能够合并在同个算子链中的操作符进行合并,以此减少任务执行时的上下文切换,提任务执行性能。
  • ExecutionGraph. ExecutionGraph是 JobGraph 的并发执行版本,由 ExecutionVertex 和 IntermediateResultPartition 组成。每个 JobVertex 会被转换为一个或多个 ExecutionVertex,ExecutorGraph 包含了每个任务的全部实例,包含它们的状态、位置、输入输出结果。ExecutionGraph 是 Flink 中最核心的部分,它用于任务的调度、失败恢复等。

参考:
https://matt33.com/2019/12/20/flink-execution-graph-4/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/500552.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode算法题——有序数组的平方

题目描述 给你一个按非递减顺序排序的整数数组nums&#xff0c;返回每个数字的平方组成的新数组&#xff0c;要求也按非递减顺序排序。 题解 解法一&#xff1a;暴力解法 思路&#xff1a; 该题目可通过暴力解法解决&#xff0c;即利用for循环遍历数组&#xff0c;对数组每…

【Python】FastAPI之SQLAlchemy、关联关系

第四节&#xff1a;SQLAlchemy操作数据库 一、SQLAlchemy介绍 SQLAlchemy 是一个功能强大且灵活的 Python SQL 工具包及对象关系映射&#xff08;ORM&#xff09;库&#xff0c;它提供了全面的数据库访问抽象层。通过 SQLAlchemy&#xff0c;开发者可以使用 Python 代码来定义…

GRAPE——RLAIF微调VLA模型:通过偏好对齐提升机器人策略的泛化能力(含24年具身模型汇总)

前言 过去的这两年&#xff0c;工作之余&#xff0c;我狂写大模型与具身的文章&#xff0c;加之具身大火&#xff0c;每周都有各种朋友通过CSDN私我及我司「七月在线」寻求帮助/指导(当然&#xff0c;也欢迎各大开发团队与我司合作共同交付&#xff09;&#xff1a; 要么是做…

基于 LangChain 实现数据库问答机器人

基于 LangChain 实现数据库问答机器人 一、简介二、应用场景三、实战案例1、需求说明2、实现思路3、对应源码 一、简介 在 Retrieval 或者 ReACT 的一些场景中&#xff0c;常常需要数据库与人工智能结合。而 LangChain 本身就封装了许多相关的内容&#xff0c;在其官方文档-SQ…

Kali 自动化换源脚本编写与使用

1. 背景与需求 在使用 Kali Linux 的过程中&#xff0c;软件源的配置对系统的更新与软件安装速度至关重要。 Kali 的默认官方源提供了安全且最新的软件包&#xff0c;但有时由于网络条件或地理位置的限制&#xff0c;使用官方源可能会出现速度较慢的问题。 为了解决这一问题&a…

1Panel自建RustDesk服务器方案实现Windows远程macOS

文章目录 缘起RustDesk 基本信息实现原理中继服务器的配置建议 中继服务器自建指南准备服务器安装1Panel安装和配置 RustDesk 中继服务防火墙配置和安全组配置查看key下载&安装&配置客户端设置永久密码测试连接 macOS安装客户端提示finder写入失败hbbs和hbbr说明**hbbs…

Maple软件的安装和使用

文章目录 1.前言说明2.我为什么要学习Maple3.软件的安装4.如何使用4.1基本的赋值语句4.2函数的定义4.3三个类型的书写介质 5.指数运算5.1使用面板5.2自己输入 6.对数的使用 1.前言说明 众所周知&#xff0c;我虽然是一名这个计算机专业的学生&#xff0c;但是我对于数学&#…

Nacos配置中心总结

Nacos配置中心总结 Nacos配置文件的加载顺序和优先级 加载顺序 nacos作为配置中心时&#xff0c;需要在bootstrap.yml文件中添加nacos config相关的配置&#xff0c;这样系统启动时就能先去拉取nacos server上的配置了。拉取过来后会和本地配置文件进行合并。 bootstrap.ym…

Java开发-后端请求成功,前端显示失败

文章目录 报错解决方案1. 后端未配置跨域支持2. 后端响应的 Content-Type 或 CORS 配置问题3. 前端 request 配置问题4. 浏览器缓存或代理问题5. 后端端口未被正确映射 报错 如下图&#xff0c;后端显示请求成功&#xff0c;前端显示失败 解决方案 1. 后端未配置跨域支持 …

springboot523基于Spring Boot的大学校园生活信息平台的设计与实现(论文+源码)_kaic

摘 要 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&#xff0c;集中处理数据信息的管理方式。本大学校园生活信息平台就是在这样的大环境下诞生&#xff0c;其可以帮助管理者在短时间内处理完毕庞大的数据…

【Ubuntu使用技巧】Ubuntu22.04无人值守Crontab工具实战详解

一个愿意伫立在巨人肩膀上的农民...... Crontab是Linux和类Unix操作系统下的一个任务调度工具&#xff0c;用于周期性地执行指定的任务或命令。Crontab允许用户创建和管理计划任务&#xff0c;以便在特定的时间间隔或时间点自动运行命令或脚本。这些任务可以按照分钟、小时、日…

Linux(14)——网络管理

目录 一、检测网络配置&#xff1a; 1、查看网络接口&#xff08;ip&#xff09;&#xff1a; 2、查看性能&#xff08;ip&#xff09;&#xff1a; 3、查看 IP 地址&#xff08;ip&#xff09;&#xff1a; 4、查看路由表&#xff08;ip&#xff09;&#xff1a; 5、追踪…

《机器学习》——线性回归模型

文章目录 线性回归模型简介一元线性回归模型多元线性回归模型误差项分析一元线性模型实例完整代码 多元线性模型实例完整代码 线性回归模型简介 线性回归是利用数理统计中回归分析&#xff0c;来确定两种或两种以上变量间相互依赖的定量关系的一种统计分析方法。 相关关系&…

GeoTrust True BusinessID Wildcard

GeoTrust由DigiCert 提供支持&#xff0c;是最受信任和尊重的品牌之一&#xff0c;以提供高保证的网站安全而闻名。 GeoTrust True BusinessID通配符证书 – 以低成本保护多个主机名。即使将其用于您的公司主页或电子邮件服务器主机名&#xff0c;保护所有敏感信息也是您的目标…

华为配置 之 链路聚合

简介&#xff1a; 链路聚合&#xff08;Link Aggregation&#xff09;是一种计算机网络技术&#xff0c;通过将多个物理端口汇聚在一起&#xff0c;形成一个逻辑端口&#xff0c;以实现出/入流量吞吐量在各成员端口的负荷分担。当交换机检测到其中一个成员端口的链路发生故障时…

Angular Firebase CRUD 项目推荐

Angular Firebase CRUD 项目推荐 angular-firebase-crud Angular CRUD with Firebase using cloud firestore as a database, angular material and Bootstrap 4. Step by Step tutorial and working angular 7 example app. 项目地址: https://gitcode.com/gh_mirrors/an/an…

SqlSession的线程安全问题源码分析

&#x1f3ae; 作者主页&#xff1a;点击 &#x1f381; 完整专栏和代码&#xff1a;点击 &#x1f3e1; 博客主页&#xff1a;点击 文章目录 SqlSession 是线程安全的吗&#xff1f;为什么说是线程不安全的&#xff1f;事务管理问题 数据库连接的共享问题 一级缓存线程安全问题…

gitlab的搭建及使用

1、环境准备 服务器准备 CentOS Linux release 7.9.2009 (Core)&#xff0c;内存至少4G。 修改主机名和配置ip地址 hostnamectl set-hostname <hostname> 关闭主机的防火墙 # 关闭防火墙 systemctl stop firewalld #临时关闭防火墙 systemctl disable firewalld …

【面试系列】深入浅出 Spring Boot

熟悉SpringBoot&#xff0c;对常用注解、自动装配原理、Jar启动流程、自定义Starter有一定的理解&#xff1b; 面试题 Spring Boot 的核心注解是哪个&#xff1f;它主要由哪几个注解组成的&#xff1f;Spring Boot的自动配置原理是什么&#xff1f;你如何理解 Spring Boot 配置…

2024国城杯 Web

这四道题目Jasper大佬都做了镜像可以直接拉取进行复现 https://jaspersec.top/2024/12/16/0x12%20%E5%9B%BD%E5%9F%8E%E6%9D%AF2024%20writeup%20with%20docker/ n0ob_un4er 这道题没有复现成功, 不知道为啥上传了文件, 也在 /tmp目录下生成了sess_PHPSESSID的文件, 但是就是…