Apache SeaTunnel Zeta引擎源码解析(三) Server端接收任务的执行流程

作者:刘乃杰

编辑整理:曾辉

引入

本系列文章是基于 Apache SeaTunnel 2.3.6版本,围绕Zeta引擎给大家介绍其任务是如何从提交到运行的全流程,希望通过这篇文档,对刚刚上手SeaTunnel的朋友提供一些帮助。

file

我们整体的文章将会分成三篇,从以下方向给大家介绍:

  1. SeaTunnel Server端的初始化
  2. Client端的任务提交流程

3. Server端的接收到任务的执行流程

由于涉及源码解析,涉及篇幅较大,所以分成系列文章来记录下一个任务的整体流程。

参考

  • [ST-Engine][Design] The Design of LogicalPlan to PhysicalPlan:https://github.com/apache/seatunnel/issues/2269

作者介绍

大家好,我是刘乃杰,一名大数据开发工程师,参与Apache SeaTunnel的开发也有一年多的时间了,不仅给SeaTunnel提交了一些PR,而且添加的一些功能也非常有意思,欢迎大家来找我交流,其中包括支持Avro格式文件,SQL Transform中支持嵌套结构查询,给节点添加Tag达到资源隔离等。

接之前的文章: Apache SeaTunnel Zeta引擎源码解析(一) Server端的初始化 Apache SeaTunnel Zeta引擎源码解析(二) Client端的任务提交流程

服务端提交任务相关

这篇是系列文章的最后一篇,我们再回顾一下当服务端启动后会执行的组件:

  • coordinatorService 仅在master/standby节点启用,会监听集群状态,主备切换
  • SlotService 在worker节点中启用,会定期上报自身信息到master中
  • TaskExecutionSerive 在worker节点中启用,会定时更新执行的任务指标到IMAP中

在集群未接收到任何任务时,会运行这些组件,当Client发送一条SeaTunnelSubmitJobCodec信息到服务端后,服务端又是如何处理的呢?

接收消息

因为客户端与服务端在不同的机器上,所有这里无法使用方法调用,而是使用了消息传递,当服务端接收到一条消息后是如何进行相关的方法调用的呢 首先我们在上面的代码中,知道客户端向服务端发送的是一条类型为SeaTunnelSubmitJobCodec的消息

// 客户端相关代码
ClientMessage request =SeaTunnelSubmitJobCodec.encodeRequest(jobImmutableInformation.getJobId(),seaTunnelHazelcastClient.getSerializationService().toData(jobImmutableInformation),jobImmutableInformation.isStartWithSavePoint());
PassiveCompletableFuture<Void> submitJobFuture =seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(request);

我们进入SeaTunnelSubmitJobCodec这个类,查看他的相关调用类,可以找到一个SeaTunnelMessageTaskFactoryProvider的类,在这个里中维护了一个消息类型到MessageTask的映射关系,也可以理解为客户端消息到服务端调用类的映射关系,以SeaTunnelSubmitJobCodec为例,会返回SubmitJobTask这个类

private final Int2ObjectHashMap<MessageTaskFactory> factories = new Int2ObjectHashMap<>(60);private void initFactories() {factories.put(SeaTunnelPrintMessageCodec.REQUEST_MESSAGE_TYPE,(clientMessage, connection) ->new PrintMessageTask(clientMessage, node, connection));factories.put(SeaTunnelSubmitJobCodec.REQUEST_MESSAGE_TYPE,(clientMessage, connection) -> new SubmitJobTask(clientMessage, node, connection));.....
}

当我们查看SubmitJobTask这个类时,又会发现继续调用了SubmitJobOperation这个类

@Override
protected Operation prepareOperation() {return new SubmitJobOperation(parameters.jobId,parameters.jobImmutableInformation,parameters.isStartWithSavePoint);
}

SubmitJobOperation中我们可以看到真正调用的地方,将我们的信息交给了CoordinatorService组件,调用了其submitJob方法


@Override
protected PassiveCompletableFuture<?> doRun() throws Exception {SeaTunnelServer seaTunnelServer = getService();return seaTunnelServer.getCoordinatorService().submitJob(jobId, jobImmutableInformation, isStartWithSavePoint);
}

这时一个客户端的消息就真正的被交给服务端来进行方法调用了,至于其他类型的操作也都可以类似找到相关的类,就不再赘述。

CoordinatorService

接下来看下在CoordinatorService是如何进行任务提交的

public PassiveCompletableFuture<Void> submitJob(long jobId, Data jobImmutableInformation, boolean isStartWithSavePoint) {CompletableFuture<Void> jobSubmitFuture = new CompletableFuture<>();// 首先会根据任务id来判断,当存在相同任务的id时,直接返回if (getJobMaster(jobId) != null) {logger.warning(String.format("The job %s is currently running; no need to submit again.", jobId));jobSubmitFuture.complete(null);return new PassiveCompletableFuture<>(jobSubmitFuture);}// 初始化JobMaster对象JobMaster jobMaster =new JobMaster(jobImmutableInformation,this.nodeEngine,executorService,getResourceManager(),getJobHistoryService(),runningJobStateIMap,runningJobStateTimestampsIMap,ownedSlotProfilesIMap,runningJobInfoIMap,metricsImap,engineConfig,seaTunnelServer);// executorService.submit(() -> {try {// 由于2.3.6中任务id可以由用户传递,而在seatunnel中会根据任务id来做一些状态判断// 所以这里的检查是保证在当前的状态中,不会存在相同id的任务if (!isStartWithSavePoint&& getJobHistoryService().getJobMetrics(jobId) != null) {throw new JobException(String.format("The job id %s has already been submitted and is not starting with a savepoint.",jobId));}// 将当前任务的信息添加到IMAP中runningJobInfoIMap.put(jobId,new JobInfo(System.currentTimeMillis(), jobImmutableInformation));runningJobMasterMap.put(jobId, jobMaster);// 对JobMaster做初始化操作jobMaster.init(runningJobInfoIMap.get(jobId).getInitializationTimestamp(), false);// 当jobMaster初始化完成后,会认为任务创建成功jobSubmitFuture.complete(null);} catch (Throwable e) {String errorMsg = ExceptionUtils.getMessage(e);logger.severe(String.format("submit job %s error %s ", jobId, errorMsg));jobSubmitFuture.completeExceptionally(new JobException(errorMsg));}if (!jobSubmitFuture.isCompletedExceptionally()) {// 当任务正常提交后,调用jobMaster的run方法开始执行任务// 以及最后会检查任务状态,从内部状态中将此次任务信息删除try {jobMaster.run();} finally {// voidCompletableFuture will be cancelled when zeta master node// shutdown to simulate master failure,// don't update runningJobMasterMap is this case.if (!jobMaster.getJobMasterCompleteFuture().isCancelled()) {runningJobMasterMap.remove(jobId);}}} else {runningJobInfoIMap.remove(jobId);runningJobMasterMap.remove(jobId);}});return new PassiveCompletableFuture<>(jobSubmitFuture);
}

可以看到在服务端,会通过创建一个JobMaster对象,由这个对象来进行单个任务的管理。

在创建JobMaster对象时,会通过getResourceManager方法来获取资源管理对象,以及通过getJobHistoryService方法获取任务历史信息,jobHistoryService在启动时就会创建完成,ResourceManage则采用了懒加载的方式,在第一次有任务提交之后才会进行创建

/** Lazy load for resource manager */
public ResourceManager getResourceManager() {if (resourceManager == null) {synchronized (this) {if (resourceManager == null) {ResourceManager manager =new ResourceManagerFactory(nodeEngine, engineConfig).getResourceManager();manager.init();resourceManager = manager;}}}return resourceManager;
}
ResourceManager

目前seatunnel也仅支持standalone的部署方式,当初始化ResourceManager时,会获取到集群所有节点,然后向其发送SyncWorkerProfileOperation操作来获取节点的信息,然后更新到内部的registerWorker状态中

@Override
public void init() {log.info("Init ResourceManager");initWorker();
}private void initWorker() {log.info("initWorker... ");List<Address> aliveNode =nodeEngine.getClusterService().getMembers().stream().map(Member::getAddress).collect(Collectors.toList());log.info("init live nodes: {}", aliveNode);List<CompletableFuture<Void>> futures =aliveNode.stream().map(node ->sendToMember(new SyncWorkerProfileOperation(), node).thenAccept(p -> {if (p != null) {registerWorker.put(node, (WorkerProfile) p);log.info("received new worker register: "+ ((WorkerProfile)p).getAddress());}})).collect(Collectors.toList());futures.forEach(CompletableFuture::join);log.info("registerWorker: {}", registerWorker);
}

而我们之前在SlotService中注意到在每个节点会定时向master发送心跳信息,心跳信息里面包含了当前节点的状态,在ResourceManager中当接收到心跳信息后,也会在内部状态中更新每个节点的状态

@Override
public void heartbeat(WorkerProfile workerProfile) {if (!registerWorker.containsKey(workerProfile.getAddress())) {log.info("received new worker register: " + workerProfile.getAddress());sendToMember(new ResetResourceOperation(), workerProfile.getAddress()).join();} else {log.debug("received worker heartbeat from: " + workerProfile.getAddress());}registerWorker.put(workerProfile.getAddress(), workerProfile);
}

JobMaster

CoordinatorService中会创建JobMaster并调用其init方法,当init方法完成后会认为任务创建成功。然后再调用run方法来正式运行任务

我们看一下初始化以及init方法。

public JobMaster(@NonNull Data jobImmutableInformationData,@NonNull NodeEngine nodeEngine,@NonNull ExecutorService executorService,@NonNull ResourceManager resourceManager,@NonNull JobHistoryService jobHistoryService,@NonNull IMap runningJobStateIMap,@NonNull IMap runningJobStateTimestampsIMap,@NonNull IMap ownedSlotProfilesIMap,@NonNull IMap<Long, JobInfo> runningJobInfoIMap,@NonNull IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> metricsImap,EngineConfig engineConfig,SeaTunnelServer seaTunnelServer) {this.jobImmutableInformationData = jobImmutableInformationData;this.nodeEngine = nodeEngine;this.executorService = executorService;flakeIdGenerator =this.nodeEngine.getHazelcastInstance().getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME);this.ownedSlotProfilesIMap = ownedSlotProfilesIMap;this.resourceManager = resourceManager;this.jobHistoryService = jobHistoryService;this.runningJobStateIMap = runningJobStateIMap;this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;this.runningJobInfoIMap = runningJobInfoIMap;this.engineConfig = engineConfig;this.metricsImap = metricsImap;this.seaTunnelServer = seaTunnelServer;this.releasedSlotWhenTaskGroupFinished = new ConcurrentHashMap<>();
}

在初始化时只是进行简单的变量赋值,并没有进行什么操作,我们需要着重看下init方法

public synchronized void init(long initializationTimestamp, boolean restart) throws Exception {// 服务端接收到客户端传递过来的消息是一个二进制的对象// 首先将其转换为JobImmutableInformation对象,而这个对象也正是客户端发送给服务端的对象jobImmutableInformation =nodeEngine.getSerializationService().toObject(jobImmutableInformationData);// 获取checkpoint的相关配置,例如周期,超时时间等jobCheckpointConfig =createJobCheckpointConfig(engineConfig.getCheckpointConfig(), jobImmutableInformation.getJobConfig());LOGGER.info(String.format("Init JobMaster for Job %s (%s) ",jobImmutableInformation.getJobConfig().getName(),jobImmutableInformation.getJobId()));LOGGER.info(String.format("Job %s (%s) needed jar urls %s",jobImmutableInformation.getJobConfig().getName(),jobImmutableInformation.getJobId(),jobImmutableInformation.getPluginJarsUrls()));ClassLoader appClassLoader = Thread.currentThread().getContextClassLoader();// 获取ClassLoaderClassLoader classLoader =seaTunnelServer.getClassLoaderService().getClassLoader(jobImmutableInformation.getJobId(),jobImmutableInformation.getPluginJarsUrls());// 将客户端传递的信息反序列化为逻辑计划logicalDag =CustomClassLoadedObject.deserializeWithCustomClassLoader(nodeEngine.getSerializationService(),classLoader,jobImmutableInformation.getLogicalDag());try {Thread.currentThread().setContextClassLoader(classLoader);// 在服务端会执行savemode的功能,例如对表进行创建,删除操作。if (!restart&& !logicalDag.isStartWithSavePoint()&& ReadonlyConfig.fromMap(logicalDag.getJobConfig().getEnvOptions()).get(EnvCommonOptions.SAVEMODE_EXECUTE_LOCATION).equals(SaveModeExecuteLocation.CLUSTER)) {logicalDag.getLogicalVertexMap().values().stream().map(LogicalVertex::getAction).filter(action -> action instanceof SinkAction).map(sink -> ((SinkAction<?, ?, ?, ?>) sink).getSink()).forEach(JobMaster::handleSaveMode);}// 逻辑计划到物理计划的解析final Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> planTuple =PlanUtils.fromLogicalDAG(logicalDag,nodeEngine,jobImmutableInformation,initializationTimestamp,executorService,flakeIdGenerator,runningJobStateIMap,runningJobStateTimestampsIMap,engineConfig.getQueueType(),engineConfig);this.physicalPlan = planTuple.f0();this.physicalPlan.setJobMaster(this);this.checkpointPlanMap = planTuple.f1();} finally {// 重置当前线程的ClassLoader,并且释放上面创建的classLoaderThread.currentThread().setContextClassLoader(appClassLoader);seaTunnelServer.getClassLoaderService().releaseClassLoader(jobImmutableInformation.getJobId(),jobImmutableInformation.getPluginJarsUrls());}Exception initException = null;try {// 初始化checkpointManagerthis.initCheckPointManager(restart);} catch (Exception e) {initException = e;}// 添加一些回调函数做任务状态监听this.initStateFuture();if (initException != null) {if (restart) {cancelJob();}throw initException;}
}

最后再看下run方法

public void run() {try {physicalPlan.startJob();} catch (Throwable e) {LOGGER.severe(String.format("Job %s (%s) run error with: %s",physicalPlan.getJobImmutableInformation().getJobConfig().getName(),physicalPlan.getJobImmutableInformation().getJobId(),ExceptionUtils.getMessage(e)));} finally {jobMasterCompleteFuture.join();if (engineConfig.getConnectorJarStorageConfig().getEnable()) {List<ConnectorJarIdentifier> pluginJarIdentifiers =jobImmutableInformation.getPluginJarIdentifiers();seaTunnelServer.getConnectorPackageService().cleanUpWhenJobFinished(jobImmutableInformation.getJobId(), pluginJarIdentifiers);}}
}

此方法比较简单,调用physicalPlan.startJob()对生成的物理计划调用run方法

通过以上代码可以看出,当服务端接收到客户端提交任务请求后,会初始化JobMaster类,在JobMaster中完成了从逻辑计划到物理计划的生成,最终执行生成的物理计划。

下面需要深入看下如何从逻辑计划生成物理计划

逻辑计划到物理计划

物理计划的生成是由JobMaster中调用生成的

final Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> planTuple =PlanUtils.fromLogicalDAG(logicalDag,nodeEngine,jobImmutableInformation,initializationTimestamp,executorService,flakeIdGenerator,runningJobStateIMap,runningJobStateTimestampsIMap,engineConfig.getQueueType(),engineConfig);

在生成的方法中可以看到中间会先从逻辑计划生成执行计划,然后再由执行计划生成物理计划

public static Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> fromLogicalDAG(@NonNull LogicalDag logicalDag,@NonNull NodeEngine nodeEngine,@NonNull JobImmutableInformation jobImmutableInformation,long initializationTimestamp,@NonNull ExecutorService executorService,@NonNull FlakeIdGenerator flakeIdGenerator,@NonNull IMap runningJobStateIMap,@NonNull IMap runningJobStateTimestampsIMap,@NonNull QueueType queueType,@NonNull EngineConfig engineConfig) {return new PhysicalPlanGenerator(new ExecutionPlanGenerator(logicalDag, jobImmutableInformation, engineConfig).generate(),nodeEngine,jobImmutableInformation,initializationTimestamp,executorService,flakeIdGenerator,runningJobStateIMap,runningJobStateTimestampsIMap,queueType).generate();
}
执行计划的生成
public ExecutionPlanGenerator(@NonNull LogicalDag logicalPlan,@NonNull JobImmutableInformation jobImmutableInformation,@NonNull EngineConfig engineConfig) {checkArgument(logicalPlan.getEdges().size() > 0, "ExecutionPlan Builder must have LogicalPlan.");this.logicalPlan = logicalPlan;this.jobImmutableInformation = jobImmutableInformation;this.engineConfig = engineConfig;
}public ExecutionPlan generate() {log.debug("Generate execution plan using logical plan:");Set<ExecutionEdge> executionEdges = generateExecutionEdges(logicalPlan.getEdges());log.debug("Phase 1: generate execution edge list {}", executionEdges);executionEdges = generateShuffleEdges(executionEdges);log.debug("Phase 2: generate shuffle edge list {}", executionEdges);executionEdges = generateTransformChainEdges(executionEdges);log.debug("Phase 3: generate transform chain edge list {}", executionEdges);List<Pipeline> pipelines = generatePipelines(executionEdges);log.debug("Phase 4: generate pipeline list {}", pipelines);ExecutionPlan executionPlan = new ExecutionPlan(pipelines, jobImmutableInformation);log.debug("Phase 5: generate execution plan: {}", executionPlan);return executionPlan;
}

首先看下执行计划这个类里面有什么内容:

public class ExecutionPlan {private final List<Pipeline> pipelines;private final JobImmutableInformation jobImmutableInformation;
}public class Pipeline {/** The ID of the pipeline. */private final Integer id;private final List<ExecutionEdge> edges;private final Map<Long, ExecutionVertex> vertexes;}public class ExecutionEdge {private ExecutionVertex leftVertex;private ExecutionVertex rightVertex;  }   public class ExecutionVertex {private Long vertexId;private Action action;private int parallelism;
}        

我们再与逻辑计划比较一下

public class LogicalDag implements IdentifiedDataSerializable {@Getter private JobConfig jobConfig;private final Set<LogicalEdge> edges = new LinkedHashSet<>();private final Map<Long, LogicalVertex> logicalVertexMap = new LinkedHashMap<>();private IdGenerator idGenerator;private boolean isStartWithSavePoint = false;}public class LogicalEdge implements IdentifiedDataSerializable {private LogicalVertex inputVertex;private LogicalVertex targetVertex;private Long inputVertexId;private Long targetVertexId;}public class LogicalVertex implements IdentifiedDataSerializable {private Long vertexId;private Action action;private int parallelism;  }        

我们看这两个类的内容,感觉每个Pipeline都像一个逻辑计划,为什么需要这一步转换呢,我们来具体看下逻辑计划的生成过程。 在上面可以看到生成执行计划共有5步,我们逐步看下

  • 第一步:逻辑计划转执行计划

    // 入参是逻辑计划的边,每个边存储了上下游的节点
    private Set<ExecutionEdge> generateExecutionEdges(Set<LogicalEdge> logicalEdges) {Set<ExecutionEdge> executionEdges = new LinkedHashSet<>();Map<Long, ExecutionVertex> logicalVertexIdToExecutionVertexMap = new HashMap();// 按照顺序进行排序,首先按照输入节点的顺序进行排序,当输入节点相同时,按照输出节点进行排序List<LogicalEdge> sortedLogicalEdges = new ArrayList<>(logicalEdges);Collections.sort(sortedLogicalEdges,(o1, o2) -> {if (o1.getInputVertexId() != o2.getInputVertexId()) {return o1.getInputVertexId() > o2.getInputVertexId() ? 1 : -1;}if (o1.getTargetVertexId() != o2.getTargetVertexId()) {return o1.getTargetVertexId() > o2.getTargetVertexId() ? 1 : -1;}return 0;});// 循环将每个逻辑计划的边转换为执行计划的边         for (LogicalEdge logicalEdge : sortedLogicalEdges) {LogicalVertex logicalInputVertex = logicalEdge.getInputVertex();ExecutionVertex executionInputVertex =logicalVertexIdToExecutionVertexMap.computeIfAbsent(logicalInputVertex.getVertexId(),vertexId -> {long newId = idGenerator.getNextId();// 对每个逻辑计划节点重新创建ActionAction newLogicalInputAction =recreateAction(logicalInputVertex.getAction(),newId,logicalInputVertex.getParallelism());// 转换为执行计划节点return new ExecutionVertex(newId,newLogicalInputAction,logicalInputVertex.getParallelism());});// 与输入节点类似,重新创建执行计划节点LogicalVertex logicalTargetVertex = logicalEdge.getTargetVertex();ExecutionVertex executionTargetVertex =logicalVertexIdToExecutionVertexMap.computeIfAbsent(logicalTargetVertex.getVertexId(),vertexId -> {long newId = idGenerator.getNextId();Action newLogicalTargetAction =recreateAction(logicalTargetVertex.getAction(),newId,logicalTargetVertex.getParallelism());return new ExecutionVertex(newId,newLogicalTargetAction,logicalTargetVertex.getParallelism());});// 生成执行计划的边ExecutionEdge executionEdge =new ExecutionEdge(executionInputVertex, executionTargetVertex);executionEdges.add(executionEdge);}return executionEdges;
    }
  • 第二步

    private Set<ExecutionEdge> generateShuffleEdges(Set<ExecutionEdge> executionEdges) {// 以上游节点编号为key,list存储下游所有节点Map<Long, List<ExecutionVertex>> targetVerticesMap = new LinkedHashMap<>();// 仅存储类型为Source的节点Set<ExecutionVertex> sourceExecutionVertices = new HashSet<>();executionEdges.forEach(edge -> {ExecutionVertex leftVertex = edge.getLeftVertex();ExecutionVertex rightVertex = edge.getRightVertex();if (leftVertex.getAction() instanceof SourceAction) {sourceExecutionVertices.add(leftVertex);}targetVerticesMap.computeIfAbsent(leftVertex.getVertexId(), id -> new ArrayList<>()).add(rightVertex);});if (sourceExecutionVertices.size() != 1) {return executionEdges;}ExecutionVertex sourceExecutionVertex = sourceExecutionVertices.stream().findFirst().get();Action sourceAction = sourceExecutionVertex.getAction();List<CatalogTable> producedCatalogTables = new ArrayList<>();if (sourceAction instanceof SourceAction) {try {producedCatalogTables =((SourceAction<?, ?, ?>) sourceAction).getSource().getProducedCatalogTables();} catch (UnsupportedOperationException e) {}} else if (sourceAction instanceof TransformChainAction) {return executionEdges;} else {throw new SeaTunnelException("source action must be SourceAction or TransformChainAction");}// 数据源仅产生单表或// 数据源仅有一个下游输出时,直接返回if (producedCatalogTables.size() <= 1|| targetVerticesMap.get(sourceExecutionVertex.getVertexId()).size() <= 1) {return executionEdges;}List<ExecutionVertex> sinkVertices =targetVerticesMap.get(sourceExecutionVertex.getVertexId());// 检查是否有其他类型的Action,在当前步骤下游节点尽可能有两种类型,Transform与Sink,这里是判断仅能有Sink类型Optional<ExecutionVertex> hasOtherAction =sinkVertices.stream().filter(vertex -> !(vertex.getAction() instanceof SinkAction)).findFirst();checkArgument(!hasOtherAction.isPresent());// 当以上代码全部走完之后,当前的场景为:// 仅有一个数据源,该数据源会产生多张表,下游还有多个sink节点依赖与产生的多表// 也就是说当前任务仅有两类节点,一个会产生多张表的Source节点,一组依赖与该Source的Sink节点// 那么会新生成一个shuffle节点,添加到两者之间// 将依赖关系修改与source->shuffle->多个sinkSet<ExecutionEdge> newExecutionEdges = new LinkedHashSet<>();// 这里的Shuffle策略此次不深入了解了ShuffleStrategy shuffleStrategy =ShuffleMultipleRowStrategy.builder().jobId(jobImmutableInformation.getJobId()).inputPartitions(sourceAction.getParallelism()).catalogTables(producedCatalogTables).queueEmptyQueueTtl((int)(engineConfig.getCheckpointConfig().getCheckpointInterval()* 3)).build();ShuffleConfig shuffleConfig =ShuffleConfig.builder().shuffleStrategy(shuffleStrategy).build();long shuffleVertexId = idGenerator.getNextId();String shuffleActionName = String.format("Shuffle [%s]", sourceAction.getName());ShuffleAction shuffleAction =new ShuffleAction(shuffleVertexId, shuffleActionName, shuffleConfig);shuffleAction.setParallelism(sourceAction.getParallelism());ExecutionVertex shuffleVertex =new ExecutionVertex(shuffleVertexId, shuffleAction, shuffleAction.getParallelism());ExecutionEdge sourceToShuffleEdge = new ExecutionEdge(sourceExecutionVertex, shuffleVertex);newExecutionEdges.add(sourceToShuffleEdge);// 将多个sink节点的并行度修改为1for (ExecutionVertex sinkVertex : sinkVertices) {sinkVertex.setParallelism(1);sinkVertex.getAction().setParallelism(1);ExecutionEdge shuffleToSinkEdge = new ExecutionEdge(shuffleVertex, sinkVertex);newExecutionEdges.add(shuffleToSinkEdge);}return newExecutionEdges;
    }

这一步Shuffle是针对某些特殊场景,source支持多表读取,并且有多个sink节点依赖与该source节点时会在中间添加一个shuffle节点

  • 第三步
private Set<ExecutionEdge> generateTransformChainEdges(Set<ExecutionEdge> executionEdges) {// 使用了三个结构,存储所有的Source节点,以及每个输入,输出节点// inputVerticesMap中以下游节点id为key,存储了所有的上游输入节点// targetVerticesMap则以上游节点id为key,存储了所有的下游输出节点Map<Long, List<ExecutionVertex>> inputVerticesMap = new HashMap<>();Map<Long, List<ExecutionVertex>> targetVerticesMap = new HashMap<>();Set<ExecutionVertex> sourceExecutionVertices = new HashSet<>();executionEdges.forEach(edge -> {ExecutionVertex leftVertex = edge.getLeftVertex();ExecutionVertex rightVertex = edge.getRightVertex();if (leftVertex.getAction() instanceof SourceAction) {sourceExecutionVertices.add(leftVertex);}inputVerticesMap.computeIfAbsent(rightVertex.getVertexId(), id -> new ArrayList<>()).add(leftVertex);targetVerticesMap.computeIfAbsent(leftVertex.getVertexId(), id -> new ArrayList<>()).add(rightVertex);});Map<Long, ExecutionVertex> transformChainVertexMap = new HashMap<>();Map<Long, Long> chainedTransformVerticesMapping = new HashMap<>();// 对每个source进行循环,即从DAG中所有的头节点开始变量for (ExecutionVertex sourceVertex : sourceExecutionVertices) {List<ExecutionVertex> vertices = new ArrayList<>();vertices.add(sourceVertex);for (int index = 0; index < vertices.size(); index++) {ExecutionVertex vertex = vertices.get(index);fillChainedTransformExecutionVertex(vertex,chainedTransformVerticesMapping,transformChainVertexMap,executionEdges,Collections.unmodifiableMap(inputVerticesMap),Collections.unmodifiableMap(targetVerticesMap));// 当当前节点存在下游节点时,将所有下游节点放入list中,二层循环会重新计算刚刚加入进去的下游节点,可能是Transform节点也可能是Sink节点if (targetVerticesMap.containsKey(vertex.getVertexId())) {vertices.addAll(targetVerticesMap.get(vertex.getVertexId()));}}}// 循环完成,会将可以链化的Transform节点进行链化,在链化过程中会将可以链化的关系边从执行计划中删除// 所以此时的逻辑计划已经无法构成图的关系,需要重新构建Set<ExecutionEdge> transformChainEdges = new LinkedHashSet<>();// 对现存关系进行循环for (ExecutionEdge executionEdge : executionEdges) {ExecutionVertex leftVertex = executionEdge.getLeftVertex();ExecutionVertex rightVertex = executionEdge.getRightVertex();boolean needRebuild = false;// 会从链化的map中查询当前边的输入,输出节点// 如果在链化的map中存在,则表明该节点已经被链化,需要从映射关系中找到链化之后的节点// 重新修正DAGif (chainedTransformVerticesMapping.containsKey(leftVertex.getVertexId())) {needRebuild = true;leftVertex =transformChainVertexMap.get(chainedTransformVerticesMapping.get(leftVertex.getVertexId()));}if (chainedTransformVerticesMapping.containsKey(rightVertex.getVertexId())) {needRebuild = true;rightVertex =transformChainVertexMap.get(chainedTransformVerticesMapping.get(rightVertex.getVertexId()));}if (needRebuild) {executionEdge = new ExecutionEdge(leftVertex, rightVertex);}transformChainEdges.add(executionEdge);}return transformChainEdges;
}private void fillChainedTransformExecutionVertex(ExecutionVertex currentVertex,Map<Long, Long> chainedTransformVerticesMapping,Map<Long, ExecutionVertex> transformChainVertexMap,Set<ExecutionEdge> executionEdges,Map<Long, List<ExecutionVertex>> inputVerticesMap,Map<Long, List<ExecutionVertex>> targetVerticesMap) {// 当map中以及包含当前节点则退出if (chainedTransformVerticesMapping.containsKey(currentVertex.getVertexId())) {return;}List<ExecutionVertex> transformChainedVertices = new ArrayList<>();collectChainedVertices(currentVertex,transformChainedVertices,executionEdges,inputVerticesMap,targetVerticesMap);// 当list不为空时,表示list里面的transform节点可以被合并成一个if (transformChainedVertices.size() > 0) {long newVertexId = idGenerator.getNextId();List<SeaTunnelTransform> transforms = new ArrayList<>(transformChainedVertices.size());List<String> names = new ArrayList<>(transformChainedVertices.size());Set<URL> jars = new HashSet<>();Set<ConnectorJarIdentifier> identifiers = new HashSet<>();transformChainedVertices.stream().peek(// 在mapping中添加所有历史节点编号与新节点编号的映射vertex ->chainedTransformVerticesMapping.put(vertex.getVertexId(), newVertexId)).map(ExecutionVertex::getAction).map(action -> (TransformAction) action).forEach(action -> {transforms.add(action.getTransform());jars.addAll(action.getJarUrls());identifiers.addAll(action.getConnectorJarIdentifiers());names.add(action.getName());});String transformChainActionName =String.format("TransformChain[%s]", String.join("->", names));// 将多个TransformAction合并成一个TransformChainActionTransformChainAction transformChainAction =new TransformChainAction(newVertexId, transformChainActionName, jars, identifiers, transforms);transformChainAction.setParallelism(currentVertex.getAction().getParallelism());ExecutionVertex executionVertex =new ExecutionVertex(newVertexId, transformChainAction, currentVertex.getParallelism());// 在状态中将修改完成的节点信息放入transformChainVertexMap.put(newVertexId, executionVertex);chainedTransformVerticesMapping.put(currentVertex.getVertexId(), executionVertex.getVertexId());}
}private void collectChainedVertices(ExecutionVertex currentVertex,List<ExecutionVertex> chainedVertices,Set<ExecutionEdge> executionEdges,Map<Long, List<ExecutionVertex>> inputVerticesMap,Map<Long, List<ExecutionVertex>> targetVerticesMap) {Action action = currentVertex.getAction();// 仅对TransformAction进行合并if (action instanceof TransformAction) {if (chainedVertices.size() == 0) {// 需要进行合并的节点list为空时,将自身添加到list中// 进入该分支的条件为当前节点为TransformAction并且所需链化列表为空// 此时可能有几种场景:第一个Transform节点进入,该Transform节点无任何限制chainedVertices.add(currentVertex);} else if (inputVerticesMap.get(currentVertex.getVertexId()).size() == 1) {// 当进入该条件分支则表明:// 所需链化的列表chainedVertices已经至少有一个TransformAction了// 此时的场景为:上游的Transform节点仅有一个下游节点,即当前节点。此限制是由下方的判断保证// 将当前TransformAction节点与上一个TransformAction节点进行链化// 在执行计划中将该关系删除executionEdges.remove(new ExecutionEdge(chainedVertices.get(chainedVertices.size() - 1), currentVertex));// 将自身加入需要链化的list中chainedVertices.add(currentVertex);} else {return;}} else {return;}// It cannot chain to any target vertex if it has multiple target vertices.if (targetVerticesMap.get(currentVertex.getVertexId()).size() == 1) {// 当当前节点仅有一个下游节点时,再次尝试链化// 如果当前节点存在多个下游节点,则不会将下游的节点进行链化,所以能保证上面的链化时两个节点是一对一的关系// 这里会调用的场景为Transform节点仅有一个下游节点collectChainedVertices(targetVerticesMap.get(currentVertex.getVertexId()).get(0),chainedVertices,executionEdges,inputVerticesMap,targetVerticesMap);}
}
  • 第四步

    private List<Pipeline> generatePipelines(Set<ExecutionEdge> executionEdges) {// 存储每个执行计划节点Set<ExecutionVertex> executionVertices = new LinkedHashSet<>();for (ExecutionEdge edge : executionEdges) {executionVertices.add(edge.getLeftVertex());executionVertices.add(edge.getRightVertex());}// 调用Pipeline执行器将执行计划转换为PipelinePipelineGenerator pipelineGenerator =new PipelineGenerator(executionVertices, new ArrayList<>(executionEdges));List<Pipeline> pipelines = pipelineGenerator.generatePipelines();Set<String> duplicatedActionNames = new HashSet<>();Set<String> actionNames = new HashSet<>();for (Pipeline pipeline : pipelines) {Integer pipelineId = pipeline.getId();for (ExecutionVertex vertex : pipeline.getVertexes().values()) {// 获取当前Pipeline的每个执行节点,重新设置Action的名称,添加了pipeline的名称Action action = vertex.getAction();String actionName = String.format("pipeline-%s [%s]", pipelineId, action.getName());action.setName(actionName);if (actionNames.contains(actionName)) {duplicatedActionNames.add(actionName);}actionNames.add(actionName);}}// 检查,不能存在重复的Action NamecheckArgument(duplicatedActionNames.isEmpty(),"Action name is duplicated: " + duplicatedActionNames);return pipelines;
    }

public PipelineGenerator(Collection vertices, List edges) { this.vertices = vertices; this.edges = edges; }

public List generatePipelines() { List executionEdges = expandEdgeByParallelism(edges);

// 将执行计划进行拆分,按照关联关系,将执行计划进行拆分
// 拆分为几个不相关的执行计划
List<List<ExecutionEdge>> edgesList = splitUnrelatedEdges(executionEdges);edgesList =edgesList.stream().flatMap(e -> this.splitUnionEdge(e).stream()).collect(Collectors.toList());// just convert execution plan to pipeline at now. We should split it to multi pipeline with
// cache in the future
IdGenerator idGenerator = new IdGenerator();
// 将执行计划图转换为Pipeline
return edgesList.stream().map(e -> {Map<Long, ExecutionVertex> vertexes = new HashMap<>();List<ExecutionEdge> pipelineEdges =e.stream().map(edge -> {if (!vertexes.containsKey(edge.getLeftVertexId())) {vertexes.put(edge.getLeftVertexId(),edge.getLeftVertex());}ExecutionVertex source =vertexes.get(edge.getLeftVertexId());if (!vertexes.containsKey(edge.getRightVertexId())) {vertexes.put(edge.getRightVertexId(),edge.getRightVertex());}ExecutionVertex destination =vertexes.get(edge.getRightVertexId());return new ExecutionEdge(source, destination);}).collect(Collectors.toList());return new Pipeline((int) idGenerator.getNextId(), pipelineEdges, vertexes);}).collect(Collectors.toList());

}


- 第五步第五步则是生成执行计划实例,传递了第四步生成的Pipeline参数**小结一下:**
执行计划会将逻辑计划做这几件事情
1. 当source会生成多张表,并且有多个sink节点依赖于此source时,会在中间添加一个shuffle节点
2. 尝试对transform节点进行链化合并,将多个transform节点合并为一个节点
3. 将任务进行拆分,将一个`配置文件/LogicalDag`拆分为几个不相关的任务`List<Pipeline>`![](https://img-blog.csdnimg.cn/3e3c46a29e1b412fa772e08bafb69735.png)#### 物理计划的生成在看物理计划生成之前,先看下生成的物理计划中包含了什么信息,
我们对物理计划以及内部相关的内都拿出来看一下相关信息
```TypeScript
public class PhysicalPlan {private final List<SubPlan> pipelineList;private final AtomicInteger finishedPipelineNum = new AtomicInteger(0);private final AtomicInteger canceledPipelineNum = new AtomicInteger(0);private final AtomicInteger failedPipelineNum = new AtomicInteger(0);private final JobImmutableInformation jobImmutableInformation;private final IMap<Object, Object> runningJobStateIMap;private final IMap<Object, Long[]> runningJobStateTimestampsIMap;private CompletableFuture<JobResult> jobEndFuture;private final AtomicReference<String> errorBySubPlan = new AtomicReference<>();private final String jobFullName;private final long jobId;private JobMaster jobMaster;private boolean makeJobEndWhenPipelineEnded = true;private volatile boolean isRunning = false;}

这个类中有一个关键字段pipelineList,是一个SubPlan的列表

public class SubPlan {private final int pipelineMaxRestoreNum;private final int pipelineRestoreIntervalSeconds;private final List<PhysicalVertex> physicalVertexList;private final List<PhysicalVertex> coordinatorVertexList;private final int pipelineId;private final AtomicInteger finishedTaskNum = new AtomicInteger(0);private final AtomicInteger canceledTaskNum = new AtomicInteger(0);private final AtomicInteger failedTaskNum = new AtomicInteger(0);private final String pipelineFullName;private final IMap<Object, Object> runningJobStateIMap;private final Map<String, String> tags;private final IMap<Object, Long[]> runningJobStateTimestampsIMap;private CompletableFuture<PipelineExecutionState> pipelineFuture;private final PipelineLocation pipelineLocation;private AtomicReference<String> errorByPhysicalVertex = new AtomicReference<>();private final ExecutorService executorService;private JobMaster jobMaster;private PassiveCompletableFuture<Void> reSchedulerPipelineFuture;private Integer pipelineRestoreNum;private final Object restoreLock = new Object();private volatile PipelineStatus currPipelineStatus;public volatile boolean isRunning = false;private Map<TaskGroupLocation, SlotProfile> slotProfiles;} 

在SubPlan中,又维护了PhysicalVertex物理节点的一个列表,并且拆分成了物理计划节点和协调器节点。

public class PhysicalVertex {private final TaskGroupLocation taskGroupLocation;private final String taskFullName;private final TaskGroupDefaultImpl taskGroup;private final ExecutorService executorService;private final FlakeIdGenerator flakeIdGenerator;private final Set<URL> pluginJarsUrls;private final Set<ConnectorJarIdentifier> connectorJarIdentifiers;private final IMap<Object, Object> runningJobStateIMap;private CompletableFuture<TaskExecutionState> taskFuture;private final IMap<Object, Long[]> runningJobStateTimestampsIMap;private final NodeEngine nodeEngine;private JobMaster jobMaster;private volatile ExecutionState currExecutionState = ExecutionState.CREATED;public volatile boolean isRunning = false;private AtomicReference<String> errorByPhysicalVertex = new AtomicReference<>();}  
public class TaskGroupDefaultImpl implements TaskGroup {private final TaskGroupLocation taskGroupLocation;private final String taskGroupName;// 存储了当前物理节点所需要执行的task// 这里的每个task可能是一个读取数据的任务,也可能是一个写入数据的任务// 或者是数据拆分,checkpoint的任务等等private final Map<Long, Task> tasks;
public PhysicalPlanGenerator(@NonNull ExecutionPlan executionPlan,@NonNull NodeEngine nodeEngine,@NonNull JobImmutableInformation jobImmutableInformation,long initializationTimestamp,@NonNull ExecutorService executorService,@NonNull FlakeIdGenerator flakeIdGenerator,@NonNull IMap runningJobStateIMap,@NonNull IMap runningJobStateTimestampsIMap,@NonNull QueueType queueType) {this.pipelines = executionPlan.getPipelines();this.nodeEngine = nodeEngine;this.jobImmutableInformation = jobImmutableInformation;this.initializationTimestamp = initializationTimestamp;this.executorService = executorService;this.flakeIdGenerator = flakeIdGenerator;// the checkpoint of a pipelinethis.pipelineTasks = new HashSet<>();this.startingTasks = new HashSet<>();this.subtaskActions = new HashMap<>();this.runningJobStateIMap = runningJobStateIMap;this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;this.queueType = queueType;
}public Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> generate() {// 获取用户配置中的节点过滤条件,用于选择任务将要运行的节点Map<String, String> tagFilter =(Map<String, String>)jobImmutableInformation.getJobConfig().getEnvOptions().get(EnvCommonOptions.NODE_TAG_FILTER.key());// TODO Determine which tasks do not need to be restored according to stateCopyOnWriteArrayList<PassiveCompletableFuture<PipelineStatus>>waitForCompleteBySubPlanList = new CopyOnWriteArrayList<>();Map<Integer, CheckpointPlan> checkpointPlans = new HashMap<>();final int totalPipelineNum = pipelines.size();Stream<SubPlan> subPlanStream =pipelines.stream().map(pipeline -> {// 每次都将状态清空this.pipelineTasks.clear();this.startingTasks.clear();this.subtaskActions.clear();final int pipelineId = pipeline.getId();// 获取当前任务的信息final List<ExecutionEdge> edges = pipeline.getEdges();// 获取所有的SourceActionList<SourceAction<?, ?, ?>> sources = findSourceAction(edges);// 生成Source数据切片任务,即SourceSplitEnumeratorTask,// 这个任务会调用连接器中的SourceSplitEnumerator类,如果该连接器支持的话List<PhysicalVertex> coordinatorVertexList =getEnumeratorTask(sources, pipelineId, totalPipelineNum);// 生成Sink提交任务,即SinkAggregatedCommitterTask// 这个任务会调用连接器中的SinkAggregatedCommitter类,如果该连接器支持的话// 这两个任务是作为协调任务来执行的coordinatorVertexList.addAll(getCommitterTask(edges, pipelineId, totalPipelineNum));List<PhysicalVertex> physicalVertexList =getSourceTask(edges, sources, pipelineId, totalPipelineNum);//physicalVertexList.addAll(getShuffleTask(edges, pipelineId, totalPipelineNum));CompletableFuture<PipelineStatus> pipelineFuture =new CompletableFuture<>();waitForCompleteBySubPlanList.add(new PassiveCompletableFuture<>(pipelineFuture));// 添加checkpoint的任务checkpointPlans.put(pipelineId,CheckpointPlan.builder().pipelineId(pipelineId).pipelineSubtasks(pipelineTasks).startingSubtasks(startingTasks).pipelineActions(pipeline.getActions()).subtaskActions(subtaskActions).build());return new SubPlan(pipelineId,totalPipelineNum,initializationTimestamp,physicalVertexList,coordinatorVertexList,jobImmutableInformation,executorService,runningJobStateIMap,runningJobStateTimestampsIMap,tagFilter);});PhysicalPlan physicalPlan =new PhysicalPlan(subPlanStream.collect(Collectors.toList()),executorService,jobImmutableInformation,initializationTimestamp,runningJobStateIMap,runningJobStateTimestampsIMap);return Tuple2.tuple2(physicalPlan, checkpointPlans);
}

生成物理计划的过程就是去将执行计划转换成SeaTunnelTask,并且在执行过程中添加各种协调任务,例如数据切分任务,数据提交任务,checkpoint任务。

SeaTunnelTask中,会将任务转换成SourceFlowLifeCycleSinkFlowLifeCycleTransformFlowLifeCycleShuffleSinkFlowLifeCycleShuffleSourceFlowLifeCycle

我们以SourceFlowLifeCycleSinkFlowLifeCycle为例

  • SourceFlowLifeCycle
    @Override
    public void init() throws Exception {this.splitSerializer = sourceAction.getSource().getSplitSerializer();this.reader =sourceAction.getSource().createReader(new SourceReaderContext(indexID,sourceAction.getSource().getBoundedness(),this,metricsContext,eventListener));this.enumeratorTaskAddress = getEnumeratorTaskAddress();
    }
    

@Override public void open() throws Exception { reader.open(); register(); }

public void collect() throws Exception { if (!prepareClose) { if (schemaChanging()) { log.debug("schema is changing, stop reader collect records");

        Thread.sleep(200);return;}reader.pollNext(collector);if (collector.isEmptyThisPollNext()) {Thread.sleep(100);} else {collector.resetEmptyThisPollNext();/*** The current thread obtain a checkpoint lock in the method {@link* SourceReader#pollNext(Collector)}. When trigger the checkpoint or savepoint,* other threads try to obtain the lock in the method {@link* SourceFlowLifeCycle#triggerBarrier(Barrier)}. When high CPU load, checkpoint* process may be blocked as long time. So we need sleep to free the CPU.*/Thread.sleep(0L);}if (collector.captureSchemaChangeBeforeCheckpointSignal()) {if (schemaChangePhase.get() != null) {throw new IllegalStateException("previous schema changes in progress, schemaChangePhase: "+ schemaChangePhase.get());}schemaChangePhase.set(SchemaChangePhase.createBeforePhase());runningTask.triggerSchemaChangeBeforeCheckpoint().get();log.info("triggered schema-change-before checkpoint, stopping collect data");} else if (collector.captureSchemaChangeAfterCheckpointSignal()) {if (schemaChangePhase.get() != null) {throw new IllegalStateException("previous schema changes in progress, schemaChangePhase: "+ schemaChangePhase.get());}schemaChangePhase.set(SchemaChangePhase.createAfterPhase());runningTask.triggerSchemaChangeAfterCheckpoint().get();log.info("triggered schema-change-after checkpoint, stopping collect data");}
} else {Thread.sleep(100);
}

}


可以看到Source的数据读取,是在`SourceFlowLifeCycle`的`collect`方法中被真正的调用,数据读取到之后,会放入`SeaTunnelSourceCollector`中,在这个collector中,当接收到数据时,会进行指标的更新,并将数据发送给相关的下游```Java
@Override
public void collect(T row) {try {if (row instanceof SeaTunnelRow) {String tableId = ((SeaTunnelRow) row).getTableId();int size;if (rowType instanceof SeaTunnelRowType) {size = ((SeaTunnelRow) row).getBytesSize((SeaTunnelRowType) rowType);} else if (rowType instanceof MultipleRowType) {size = ((SeaTunnelRow) row).getBytesSize(rowTypeMap.get(tableId));} else {throw new SeaTunnelEngineException("Unsupported row type: " + rowType.getClass().getName());}sourceReceivedBytes.inc(size);sourceReceivedBytesPerSeconds.markEvent(size);flowControlGate.audit((SeaTunnelRow) row);if (StringUtils.isNotEmpty(tableId)) {String tableName = getFullName(TablePath.of(tableId));Counter sourceTableCounter = sourceReceivedCountPerTable.get(tableName);if (Objects.nonNull(sourceTableCounter)) {sourceTableCounter.inc();} else {Counter counter =metricsContext.counter(SOURCE_RECEIVED_COUNT + "#" + tableName);counter.inc();sourceReceivedCountPerTable.put(tableName, counter);}}}sendRecordToNext(new Record<>(row));emptyThisPollNext = false;sourceReceivedCount.inc();sourceReceivedQPS.markEvent();} catch (IOException e) {throw new RuntimeException(e);}
}public void sendRecordToNext(Record<?> record) throws IOException {synchronized (checkpointLock) {for (OneInputFlowLifeCycle<Record<?>> output : outputs) {output.received(record);}}
}
  • SinkFlowLifeCycle

    @Override
    public void received(Record<?> record) {try {if (record.getData() instanceof Barrier) {long startTime = System.currentTimeMillis();Barrier barrier = (Barrier) record.getData();if (barrier.prepareClose(this.taskLocation)) {prepareClose = true;}if (barrier.snapshot()) {try {lastCommitInfo = writer.prepareCommit();} catch (Exception e) {writer.abortPrepare();throw e;}List<StateT> states = writer.snapshotState(barrier.getId());if (!writerStateSerializer.isPresent()) {runningTask.addState(barrier, ActionStateKey.of(sinkAction), Collections.emptyList());} else {runningTask.addState(barrier,ActionStateKey.of(sinkAction),serializeStates(writerStateSerializer.get(), states));}if (containAggCommitter) {CommitInfoT commitInfoT = null;if (lastCommitInfo.isPresent()) {commitInfoT = lastCommitInfo.get();}runningTask.getExecutionContext().sendToMember(new SinkPrepareCommitOperation<CommitInfoT>(barrier,committerTaskLocation,commitInfoSerializer.isPresent()? commitInfoSerializer.get().serialize(commitInfoT): null),committerTaskAddress).join();}} else {if (containAggCommitter) {runningTask.getExecutionContext().sendToMember(new BarrierFlowOperation(barrier, committerTaskLocation),committerTaskAddress).join();}}runningTask.ack(barrier);log.debug("trigger barrier [{}] finished, cost {}ms. taskLocation [{}]",barrier.getId(),System.currentTimeMillis() - startTime,taskLocation);} else if (record.getData() instanceof SchemaChangeEvent) {if (prepareClose) {return;}SchemaChangeEvent event = (SchemaChangeEvent) record.getData();writer.applySchemaChange(event);} else {if (prepareClose) {return;}writer.write((T) record.getData());sinkWriteCount.inc();sinkWriteQPS.markEvent();if (record.getData() instanceof SeaTunnelRow) {long size = ((SeaTunnelRow) record.getData()).getBytesSize();sinkWriteBytes.inc(size);sinkWriteBytesPerSeconds.markEvent(size);String tableId = ((SeaTunnelRow) record.getData()).getTableId();if (StringUtils.isNotBlank(tableId)) {String tableName = getFullName(TablePath.of(tableId));Counter sinkTableCounter = sinkWriteCountPerTable.get(tableName);if (Objects.nonNull(sinkTableCounter)) {sinkTableCounter.inc();} else {Counter counter =metricsContext.counter(SINK_WRITE_COUNT + "#" + tableName);counter.inc();sinkWriteCountPerTable.put(tableName, counter);}}}}} catch (Exception e) {throw new RuntimeException(e);}
    }

同样,在这个类中是真正调用Sink的Writer方法,将数据写入到下游中。

任务执行

CoordinatorService中通过init方法生成了物理计划,然后会再调用run来真正的将任务运行起来。

CoordinatorService {jobMaster.init(runningJobInfoIMap.get(jobId).getInitializationTimestamp(), false);...jobMaster.run();}JobMaster { public void run() {... physicalPlan.startJob();...}
}    

在JobMaster中启动任务,会调用PhysicalPlan的startJob方法

public void startJob() {isRunning = true;log.info("{} state process is start", getJobFullName());stateProcess();
}private synchronized void stateProcess() {if (!isRunning) {log.warn(String.format("%s state process is stopped", jobFullName));return;}switch (getJobStatus()) {case CREATED:updateJobState(JobStatus.SCHEDULED);break;case SCHEDULED:getPipelineList().forEach(subPlan -> {if (PipelineStatus.CREATED.equals(subPlan.getCurrPipelineStatus())) {subPlan.startSubPlanStateProcess();}});updateJobState(JobStatus.RUNNING);break;case RUNNING:case DOING_SAVEPOINT:break;case FAILING:case CANCELING:jobMaster.neverNeedRestore();getPipelineList().forEach(SubPlan::cancelPipeline);break;case FAILED:case CANCELED:case SAVEPOINT_DONE:case FINISHED:stopJobStateProcess();jobEndFuture.complete(new JobResult(getJobStatus(), errorBySubPlan.get()));return;default:throw new IllegalArgumentException("Unknown Job State: " + getJobStatus());}
}

在PhysicalPlan中,启动任务会将任务的状态更新为SCHEDULED状态,然后会继续调用SubPlan的启动方法。

public void startSubPlanStateProcess() {isRunning = true;log.info("{} state process is start", getPipelineFullName());stateProcess();
}private synchronized void stateProcess() {if (!isRunning) {log.warn(String.format("%s state process not start", pipelineFullName));return;}PipelineStatus state = getCurrPipelineStatus();switch (state) {case CREATED:updatePipelineState(PipelineStatus.SCHEDULED);break;case SCHEDULED:try {ResourceUtils.applyResourceForPipeline(jobMaster.getResourceManager(), this);log.debug("slotProfiles: {}, PipelineLocation: {}",slotProfiles,this.getPipelineLocation());updatePipelineState(PipelineStatus.DEPLOYING);} catch (Exception e) {makePipelineFailing(e);}break;case DEPLOYING:coordinatorVertexList.forEach(task -> {if (task.getExecutionState().equals(ExecutionState.CREATED)) {task.startPhysicalVertex();task.makeTaskGroupDeploy();}});physicalVertexList.forEach(task -> {if (task.getExecutionState().equals(ExecutionState.CREATED)) {task.startPhysicalVertex();task.makeTaskGroupDeploy();}});updatePipelineState(PipelineStatus.RUNNING);break;case RUNNING:break;case FAILING:case CANCELING:coordinatorVertexList.forEach(task -> {task.startPhysicalVertex();task.cancel();});physicalVertexList.forEach(task -> {task.startPhysicalVertex();task.cancel();});break;case FAILED:case CANCELED:if (checkNeedRestore(state) && prepareRestorePipeline()) {jobMaster.releasePipelineResource(this);restorePipeline();return;}subPlanDone(state);stopSubPlanStateProcess();pipelineFuture.complete(new PipelineExecutionState(pipelineId, state, errorByPhysicalVertex.get()));return;case FINISHED:subPlanDone(state);stopSubPlanStateProcess();pipelineFuture.complete(new PipelineExecutionState(pipelineId, getPipelineState(), errorByPhysicalVertex.get()));return;default:throw new IllegalArgumentException("Unknown Pipeline State: " + getPipelineState());}
}

在SubPlan中,当状态为SCHEDULED时,会进行资源的申请,

public static void applyResourceForPipeline(@NonNull ResourceManager resourceManager, @NonNull SubPlan subPlan) {Map<TaskGroupLocation, CompletableFuture<SlotProfile>> futures = new HashMap<>();Map<TaskGroupLocation, SlotProfile> slotProfiles = new HashMap<>();// TODO If there is no enough resources for tasks, we need add some wait profilesubPlan.getCoordinatorVertexList().forEach(coordinator ->futures.put(coordinator.getTaskGroupLocation(),applyResourceForTask(resourceManager, coordinator, subPlan.getTags())));subPlan.getPhysicalVertexList().forEach(task ->futures.put(task.getTaskGroupLocation(),applyResourceForTask(resourceManager, task, subPlan.getTags())));futures.forEach((key, value) -> {try {slotProfiles.put(key, value == null ? null : value.join());} catch (CompletionException e) {// do nothing}});// set it first, avoid can't get it when get resource not enough exception and need release// applied resourcesubPlan.getJobMaster().setOwnedSlotProfiles(subPlan.getPipelineLocation(), slotProfiles);if (futures.size() != slotProfiles.size()) {throw new NoEnoughResourceException();}
}public static CompletableFuture<SlotProfile> applyResourceForTask(ResourceManager resourceManager, PhysicalVertex task, Map<String, String> tags) {// TODO custom resource sizereturn resourceManager.applyResource(task.getTaskGroupLocation().getJobId(), new ResourceProfile(), tags);
}public CompletableFuture<List<SlotProfile>> applyResources(long jobId, List<ResourceProfile> resourceProfile, Map<String, String> tagFilter)throws NoEnoughResourceException {waitingWorkerRegister();ConcurrentMap<Address, WorkerProfile> matchedWorker = filterWorkerByTag(tagFilter);if (matchedWorker.isEmpty()) {log.error("No matched worker with tag filter {}.", tagFilter);throw new NoEnoughResourceException();}return new ResourceRequestHandler(jobId, resourceProfile, matchedWorker, this).request(tagFilter);
}

在一个SubPlan中会将所有的任务进行资源的申请,申请资源是通过ResourceManager进行的。申请时首先会按照用户任务中设置的tag来选择将要运行任务的节点,这样就可以将任务运行在我们指定的节点上,达到资源隔离的目的。

public Optional<WorkerProfile> preCheckWorkerResource(ResourceProfile r) {// Shuffle the order to ensure random selection of workersList<WorkerProfile> workerProfiles =Arrays.asList(registerWorker.values().toArray(new WorkerProfile[0]));Collections.shuffle(workerProfiles);// Check if there are still unassigned slotsOptional<WorkerProfile> workerProfile =workerProfiles.stream().filter(worker ->Arrays.stream(worker.getUnassignedSlots()).anyMatch(slot ->slot.getResourceProfile().enoughThan(r))).findAny();if (!workerProfile.isPresent()) {// Check if there are still unassigned resourcesworkerProfile =workerProfiles.stream().filter(WorkerProfile::isDynamicSlot).filter(worker -> worker.getUnassignedResource().enoughThan(r)).findAny();}return workerProfile;
}private CompletableFuture<SlotAndWorkerProfile> singleResourceRequestToMember(int i, ResourceProfile r, WorkerProfile workerProfile) {CompletableFuture<SlotAndWorkerProfile> future =resourceManager.sendToMember(new RequestSlotOperation(jobId, r), workerProfile.getAddress());return future.whenComplete(withTryCatch(LOGGER,(slotAndWorkerProfile, error) -> {if (error != null) {throw new RuntimeException(error);} else {resourceManager.heartbeat(slotAndWorkerProfile.getWorkerProfile());addSlotToCacheMap(i, slotAndWorkerProfile.getSlotProfile());}}));
}

当拿到全部可用节点后,会将节点先打乱,然后再随机查找一个可用资源比所需资源大的节点,随即与该节点通信,发送RequestSlotOperation给该节点

@Override
public synchronized SlotAndWorkerProfile requestSlot(long jobId, ResourceProfile resourceProfile) {initStatus = false;SlotProfile profile = selectBestMatchSlot(resourceProfile);if (profile != null) {profile.assign(jobId);assignedResource.accumulateAndGet(profile.getResourceProfile(), ResourceProfile::merge);unassignedResource.accumulateAndGet(profile.getResourceProfile(), ResourceProfile::subtract);unassignedSlots.remove(profile.getSlotID());assignedSlots.put(profile.getSlotID(), profile);contexts.computeIfAbsent(profile.getSlotID(),p -> new SlotContext(profile.getSlotID(), taskExecutionService));}LOGGER.fine(String.format("received slot request, jobID: %d, resource profile: %s, return: %s",jobId, resourceProfile, profile));return new SlotAndWorkerProfile(getWorkerProfile(), profile);
}

该节点的SlotService中接收到requestSlot请求后,会将自身信息进行更新,然后返回给master节点信息。 在请求资源的过程中,如果最终请求的资源没有达到预期结果,会得到NoEnoughResourceException异常,任务运行失败。 当资源请求成功后,会开始进行任务的部署,task.makeTaskGroupDeploy()将任务发送到worker节点上来运行任务

TaskDeployState deployState =deploy(jobMaster.getOwnedSlotProfiles(taskGroupLocation));public TaskDeployState deploy(@NonNull SlotProfile slotProfile) {try {if (slotProfile.getWorker().equals(nodeEngine.getThisAddress())) {return deployOnLocal(slotProfile);} else {return deployOnRemote(slotProfile);}} catch (Throwable th) {return TaskDeployState.failed(th);}
}private TaskDeployState deployOnRemote(@NonNull SlotProfile slotProfile) {return deployInternal(taskGroupImmutableInformation -> {try {return (TaskDeployState)NodeEngineUtil.sendOperationToMemberNode(nodeEngine,new DeployTaskOperation(slotProfile,nodeEngine.getSerializationService().toData(taskGroupImmutableInformation)),slotProfile.getWorker()).get();} catch (Exception e) {if (getExecutionState().isEndState()) {log.warn(ExceptionUtils.getMessage(e));log.warn(String.format("%s deploy error, but the state is already in end state %s, skip this error",getTaskFullName(), currExecutionState));return TaskDeployState.success();} else {return TaskDeployState.failed(e);}}});
}

部署任务时,会将任务信息发送到刚刚在资源分配时获取到的节点上

public TaskDeployState deployTask(@NonNull Data taskImmutableInformation) {TaskGroupImmutableInformation taskImmutableInfo =nodeEngine.getSerializationService().toObject(taskImmutableInformation);return deployTask(taskImmutableInfo);
}public TaskDeployState deployTask(@NonNull TaskGroupImmutableInformation taskImmutableInfo) {logger.info(String.format("received deploying task executionId [%s]",taskImmutableInfo.getExecutionId()));TaskGroup taskGroup = null;try {Set<ConnectorJarIdentifier> connectorJarIdentifiers =taskImmutableInfo.getConnectorJarIdentifiers();Set<URL> jars = new HashSet<>();ClassLoader classLoader;if (!CollectionUtils.isEmpty(connectorJarIdentifiers)) {// Prioritize obtaining the jar package file required for the current task execution// from the local, if it does not exist locally, it will be downloaded from the// master node.jars =serverConnectorPackageClient.getConnectorJarFromLocal(connectorJarIdentifiers);} else if (!CollectionUtils.isEmpty(taskImmutableInfo.getJars())) {jars = taskImmutableInfo.getJars();}classLoader =classLoaderService.getClassLoader(taskImmutableInfo.getJobId(), Lists.newArrayList(jars));if (jars.isEmpty()) {taskGroup =nodeEngine.getSerializationService().toObject(taskImmutableInfo.getGroup());} else {taskGroup =CustomClassLoadedObject.deserializeWithCustomClassLoader(nodeEngine.getSerializationService(),classLoader,taskImmutableInfo.getGroup());}logger.info(String.format("deploying task %s, executionId [%s]",taskGroup.getTaskGroupLocation(), taskImmutableInfo.getExecutionId()));synchronized (this) {if (executionContexts.containsKey(taskGroup.getTaskGroupLocation())) {throw new RuntimeException(String.format("TaskGroupLocation: %s already exists",taskGroup.getTaskGroupLocation()));}deployLocalTask(taskGroup, classLoader, jars);return TaskDeployState.success();}} catch (Throwable t) {logger.severe(String.format("TaskGroupID : %s  deploy error with Exception: %s",taskGroup != null && taskGroup.getTaskGroupLocation() != null? taskGroup.getTaskGroupLocation().toString(): "taskGroupLocation is null",ExceptionUtils.getMessage(t)));return TaskDeployState.failed(t);}
}

当worker节点接收到任务后,会调用TaskExecutionServicedeployTask方法将任务提交到启动时创建的线程池中。

当任务提交到线程池中

private final class BlockingWorker implements Runnable {private final TaskTracker tracker;private final CountDownLatch startedLatch;private BlockingWorker(TaskTracker tracker, CountDownLatch startedLatch) {this.tracker = tracker;this.startedLatch = startedLatch;}@Overridepublic void run() {TaskExecutionService.TaskGroupExecutionTracker taskGroupExecutionTracker =tracker.taskGroupExecutionTracker;ClassLoader classLoader =executionContexts.get(taskGroupExecutionTracker.taskGroup.getTaskGroupLocation()).getClassLoader();ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader();Thread.currentThread().setContextClassLoader(classLoader);final Task t = tracker.task;ProgressState result = null;try {startedLatch.countDown();t.init();do {result = t.call();} while (!result.isDone()&& isRunning&& !taskGroupExecutionTracker.executionCompletedExceptionally());...}
}

会调用Task.call 方法,从而数据同步的任务会真正的被调用起来。

ClassLoader

在SeaTunnel中,修改了默认的ClassLoader的类,修改为子类优先,从而避免了与其他组件类冲突的问题

@Override
public synchronized ClassLoader getClassLoader(long jobId, Collection<URL> jars) {log.debug("Get classloader for job {} with jars {}", jobId, jars);if (cacheMode) {// with cache mode, all jobs share the same classloader if the jars are the samejobId = 1L;}if (!classLoaderCache.containsKey(jobId)) {classLoaderCache.put(jobId, new ConcurrentHashMap<>());classLoaderReferenceCount.put(jobId, new ConcurrentHashMap<>());}Map<String, ClassLoader> classLoaderMap = classLoaderCache.get(jobId);String key = covertJarsToKey(jars);if (classLoaderMap.containsKey(key)) {classLoaderReferenceCount.get(jobId).get(key).incrementAndGet();return classLoaderMap.get(key);} else {ClassLoader classLoader = new SeaTunnelChildFirstClassLoader(jars);log.info("Create classloader for job {} with jars {}", jobId, jars);classLoaderMap.put(key, classLoader);classLoaderReferenceCount.get(jobId).put(key, new AtomicInteger(1));return classLoader;}
}

RestAPI任务提交

SeaTunnel也支持RestAPI的提交方式,当需要此功能时,首先需要在hazelcast.yaml文件中添加这样一段配置

network:rest-api:enabled: trueendpoint-groups:CLUSTER_WRITE:enabled: trueDATA:enabled: true

当添加这样一段配置后,hazelcast节点启动后就可以接收http请求了

我们同样以提交任务为例,看下执行流程。

当我们使用RestAPI来提交任务时,客户端的就变成了我们发送http请求的节点,服务端就是seatunnel集群。

当服务端接收到请求后,会根据请求的链接,调用相应的方法

public void handle(HttpPostCommand httpPostCommand) {String uri = httpPostCommand.getURI();try {if (uri.startsWith(SUBMIT_JOB_URL)) {handleSubmitJob(httpPostCommand, uri);} else if (uri.startsWith(STOP_JOB_URL)) {handleStopJob(httpPostCommand, uri);} else if (uri.startsWith(ENCRYPT_CONFIG)) {handleEncrypt(httpPostCommand);} else {original.handle(httpPostCommand);}} catch (IllegalArgumentException e) {prepareResponse(SC_400, httpPostCommand, exceptionResponse(e));} catch (Throwable e) {logger.warning("An error occurred while handling request " + httpPostCommand, e);prepareResponse(SC_500, httpPostCommand, exceptionResponse(e));}this.textCommandService.sendResponse(httpPostCommand);
}

会根据路径来查找相应的方法

private void handleSubmitJob(HttpPostCommand httpPostCommand, String uri)throws IllegalArgumentException {Map<String, String> requestParams = new HashMap<>();RestUtil.buildRequestParams(requestParams, uri);Config config = RestUtil.buildConfig(requestHandle(httpPostCommand), false);ReadonlyConfig envOptions = ReadonlyConfig.fromConfig(config.getConfig("env"));String jobName = envOptions.get(EnvCommonOptions.JOB_NAME);JobConfig jobConfig = new JobConfig();jobConfig.setName(StringUtils.isEmpty(requestParams.get(RestConstant.JOB_NAME))? jobName: requestParams.get(RestConstant.JOB_NAME));boolean startWithSavePoint =Boolean.parseBoolean(requestParams.get(RestConstant.IS_START_WITH_SAVE_POINT));String jobIdStr = requestParams.get(RestConstant.JOB_ID);Long finalJobId = StringUtils.isNotBlank(jobIdStr) ? Long.parseLong(jobIdStr) : null;SeaTunnelServer seaTunnelServer = getSeaTunnelServer();RestJobExecutionEnvironment restJobExecutionEnvironment =new RestJobExecutionEnvironment(seaTunnelServer,jobConfig,config,textCommandService.getNode(),startWithSavePoint,finalJobId);JobImmutableInformation jobImmutableInformation = restJobExecutionEnvironment.build();long jobId = jobImmutableInformation.getJobId();if (!seaTunnelServer.isMasterNode()) {NodeEngineUtil.sendOperationToMasterNode(getNode().nodeEngine,new SubmitJobOperation(jobId,getNode().nodeEngine.toData(jobImmutableInformation),jobImmutableInformation.isStartWithSavePoint())).join();} else {submitJob(seaTunnelServer, jobImmutableInformation, jobConfig);}this.prepareResponse(httpPostCommand,new JsonObject().add(RestConstant.JOB_ID, String.valueOf(jobId)).add(RestConstant.JOB_NAME, jobConfig.getName()));
}

这里的逻辑与客户端差不多,由于没有local模式,那么就不需要去创建本地服务了。

在客户端我们会通过ClientJobExecutionEnvironment这个类来进行逻辑计划解析等操作,同样这样也有一个RestJobExecutionEnvironment来做同样的事情。

最终提交任务时,如果当前节点非master节点,那么就会向master节点发送信息,master节点接收到信息后与从命令行客户端接收信息的处理逻辑就一致了。

如果当前节点是master节点,会直接调用submitJob方法,这里直接调用了coordinatorService.submitJob方法进行后续的处理。

private void submitJob(SeaTunnelServer seaTunnelServer,JobImmutableInformation jobImmutableInformation,JobConfig jobConfig) {CoordinatorService coordinatorService = seaTunnelServer.getCoordinatorService();Data data =textCommandService.getNode().nodeEngine.getSerializationService().toData(jobImmutableInformation);PassiveCompletableFuture<Void> voidPassiveCompletableFuture =coordinatorService.submitJob(Long.parseLong(jobConfig.getJobContext().getJobId()),data,jobImmutableInformation.isStartWithSavePoint());voidPassiveCompletableFuture.join();
}

可以看出,两种提交方式,都是会在提交任务的一端做逻辑计划解析,然后将信息发送给master节点,再由master节点做任务的物理计划解析,分配等操作。

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/426593.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

计算机人工智能前沿进展-大语言模型方向-2024-09-12

计算机人工智能前沿进展-大语言模型方向-2024-09-12 1. PharmaBench: Enhancing ADMET benchmarks with large language models Z Niu, X Xiao, W Wu, Q Cai, Y Jiang, W Jin, M Wang… - Scientific Data, 2024 大语言模型在药物发现中的应用&#xff1a;PharmaBench 文章由…

【GO开发】MacOS上搭建GO的基础环境-Hello World

文章目录 一、引言二、安装Go语言三、配置环境变量&#xff08;可跳过&#xff09;四、Hello World五、总结 一、引言 Go语言&#xff08;Golang&#xff09;因其简洁、高效、并发性强等特点&#xff0c;受到了越来越多开发者的喜爱。本文将带你一步步在Mac操作系统上搭建Go语…

JVM相关

1.JVM内存区域 一个运行起来的java进程就是一个Java虚拟机&#xff0c;就需要从操作系统中申请一大块内存。 内存中会根据作用的不同被划分成不同的区域&#xff1a; &#xff08;1&#xff09;栈&#xff1a;存储的内容是代码在执行过程中&#xff0c;方法之间的调用关系&a…

Git提交有乱码

服务器提交记录如图 可知application.properties中文注释拉黄线 &#xff0c;提示Unsupported characters for the charset ISO-8859-1 打开settings - Editor - File Encodings 因为我们项目的其他文件都是UTF-8&#xff0c;所以&#xff0c;我们将默认值都改成UTF-8 然后…

OTA升级

方式1: 方式2: 服务器与mcu通讯机制

k8s的一些命令

kubectl get nodes &#xff1a;查看节点的状态 查看Pod的状态&#xff1a; kubectl get pod --all -namespacesPending,ContainerCreating,ImagePullBackOff都表明Pod没有就绪&#xff0c;Running才是就绪状态 查看Pod的具体情况&#xff1a; kubectl describe pod podnamek…

GEO数据库提取疾病样本和正常样本|GEO数据库区分疾病和正常样本|直接用|生物信息|生信

GEO数据库提取疾病样本和正常样本|GEO数据库区分疾病和正常样本|直接用|生物信息|生信 代码都可以直接用&#xff0c;修改GSE就可以&#xff01; 通过代码查看数据的分类&#xff0c;是疾病还是正常样本 ##############################查看对饮GSE样本疾病or正常信息# 指定…

【机器学习】--- 自然语言推理(NLI)

引言 随着自然语言处理&#xff08;NLP&#xff09;的迅速发展&#xff0c;**自然语言推理&#xff08;Natural Language Inference, NLI&#xff09;**已成为一项重要的研究任务。它的目标是判断两个文本片段之间的逻辑关系。这一任务广泛应用于机器阅读理解、问答系统、对话…

图文讲解HarmonyOS应用发布流程

HarmonyOS应用的开发和发布过程可以分为以下几个步骤&#xff1a;证书生成、应用开发、应用签名和发布。 1. 证书生成&#xff1a; 在开始开发HarmonyOS应用之前&#xff0c;首先需要生成一个开发者证书。开发者证书用于标识应用的开发者身份并确保应用的安全性。可以通过Har…

渗透测试综合靶场 DC-1 通关详解

Vulnhub是一个提供各种漏洞环境的靶场平台&#xff0c;非常适合安全爱好者和渗透测试初学者进行学习和实践。在这个平台上&#xff0c;你可以下载多种虚拟机&#xff0c;这些虚拟机预装了各种漏洞&#xff0c;让你可以在本地环境中进行渗透测试、提权、漏洞利用和代码审计等操作…

nginx进阶篇(二)

文章目录 概图一、 Nginx服务器基础配置实例二、Nginx服务操作的问题三、Nginx配置成系统服务四、Nginx命令配置到系统环境五、Nginx静态资源部署5.1 Nginx静态资源概述5.2 Nginx静态资源的配置指令5.2.1. listen指令5.2.2. server_name指令配置方式匹配执行顺序 5.2.3 locatio…

【与C++的邂逅】--- C++的IO流

Welcome to 9ilks Code World (๑•́ ₃ •̀๑) 个人主页: 9ilk (๑•́ ₃ •̀๑) 文章专栏&#xff1a; 与C的邂逅 本篇博客我们来了解C中io流的相关知识。 &#x1f3e0; C语言输入输出 C语言中我们用到的最频繁的输入输出方式就是scanf ()与printf()。 sc…

【C++】模拟实现vector

在上篇中我们已经了解过的vector各种接口的功能使用&#xff0c;接下来我们就试着模拟实现一下吧&#xff01; 注意&#xff1a;我们在此实现的和C标准库中实现的有所不同&#xff0c;其目的主要是帮助大家大概理解底层原理。 我们模拟vector容器的大致框架是&#xff1a; t…

Java入门程序-HelloWorld

Java程序开发的三个步骤 1.编写代码得到 .java 源代码文件 2.使用javac编译得到 .class 字节码文件 3.使用java运行 注意事项 建议代码文件名全英文&#xff0c;首字母大写&#xff0c;满足驼峰命名法&#xff0c;源代码文件的后缀必须是.java 开发HelloWorld程序 &…

进程的属性

tips&#xff1a; task_struct就是linux下的PCB 操作系统不相信任何外部用户&#xff0c;而是只提供窗口&#xff0c;不可能直接与用户打交道&#xff0c;而是通过操作系统 tast_struct用来描述所有进程&#xff0c;用来管理 &#xff1b; 和 && 可以同时跑两个命令 进…

AI修手有救了?在comfyui中使用Flux模型实现局部重绘案例

&#x1f431;‍&#x1f409;背景 局部重绘相关的话题我们已经讨论和测试过很多次了&#xff0c;比如说inpaint模型、brushnet模型、powerpaint模型等等&#xff0c;最近对于flux模型重绘画面的案例也越来越多了&#xff0c;那我们就结合flux模型的重绘来试试看效果。 &…

高级大数据开发协会

知识星球——高级大数据开发协会 协会内容: 教你参与开源项目提供新技术学习指导提供工作遇到的疑难问题技术支持参与大数据开源软件源码提升优化以互利共赢为原则&#xff0c;推动大数据技术发展探讨大数据职业发展和规划共享企业实际工作经验 感兴趣的私聊我&#xff0c;…

『功能项目』窗口可拖拽脚本【59】

本章项目成果展示 我们打开上一篇58第三职业弓弩的平A的项目&#xff0c; 本章要做的事情是给坐骑界面挂载一个脚本让其显示出来的时候可以进行拖拽 创建脚本&#xff1a;DraggableWindow.cs using UnityEngine; using UnityEngine.EventSystems; public class DraggableWindo…

linux网络编程2

24.9.18学习目录 一.数据包的传送1.数据包在每层间的传送2.链路层的封包3.网络层、传输层封包格式 二.字节序1.概念2.字节序转换函数 三.IP地址转换四.UDP1.概述2.网络编程接口socket3.UDP的C/S架构4.UDP编程 一.数据包的传送 1.数据包在每层间的传送 传送方数据从运用层到链…

MySQL之内置函数

目录 一&#xff1a;日期函数 二:字符串函数 三&#xff1a;数学函数 四&#xff1a;其他函数 一&#xff1a;日期函数 举例: (1) mysql> select current_date(); ---------------- | current_date() | ---------------- | 2024-09-17 | ---------------- 1 row …