Apache DolphinScheduler-1.3.9源码分析(一)

引言

随着大数据的发展,任务调度系统成为了数据处理和管理中至关重要的部分。Apache DolphinScheduler 是一款优秀的开源分布式工作流调度平台,在大数据场景中得到广泛应用。

在本文中,我们将对 Apache DolphinScheduler 1.3.9 版本的源码进行深入分析,介绍 Master 启动以及调度流程。

通过这些分析,开发者可以更好地理解 DolphinScheduler 的工作机制,并在实际使用中更高效地进行二次开发或优化。

Master Server启动

启动流程图

Master调度工作流流程图

MasterServer启动方法

public void run() {// init remoting serverNettyServerConfig serverConfig = new NettyServerConfig();serverConfig.setListenPort(masterConfig.getListenPort());this.nettyRemotingServer = new NettyRemotingServer(serverConfig);this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor());this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor());this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor());this.nettyRemotingServer.start();// self tolerantthis.zkMasterClient.start();this.zkMasterClient.setStoppable(this);// scheduler startthis.masterSchedulerService.start();// start QuartzExecutors// what system should do if exceptiontry {logger.info("start Quartz server...");QuartzExecutors.getInstance().start();} catch (Exception e) {try {QuartzExecutors.getInstance().shutdown();} catch (SchedulerException e1) {logger.error("QuartzExecutors shutdown failed : " + e1.getMessage(), e1);}logger.error("start Quartz failed", e);}/*** register hooks, which are called before the process exits*/Runtime.getRuntime().addShutdownHook(new Thread(() -> {if (Stopper.isRunning()) {close("shutdownHook");}}));}
  • nettyServer会注册三种Command
  1. TASK_EXECUTE_ACK:Worker在接收到Master执行任务的请求后,会给Master发送一条Ack Command,告诉Master已经开始执行Task了。
  2. TASK_EXECUTE_RESPONSE:Worker在执行完Task之后,会给Master发送一条Response Command,告诉Master任务调度/执行结果。
  3. TASK_KILL_RESPONSE:Master接收到Task停止的请求会,会给Worker发送TASK_KILL_REQUEST Command,之后Worker会把Task_KILL_RESPONSE Command返回给Master。
  • 启动调度和定时器。
  • 添加ShutdownHook,关闭资源。

Master 配置文件

master.listen.port=5678# 限制Process Instance并发调度的线程数
master.exec.threads=100# 限制每个ProcessInstance可以执行的任务数
master.exec.task.num=20# 每一批次可以分发的任务数
master.dispatch.task.num=3# master需要选择一个稳定的worker去执行任务
# 算法有:Random,RoundRobin,LowerWeight。默认是LowerWeight
master.host.selector=LowerWeight# master需要向Zookeeper发送心跳,单位:秒
master.heartbeat.interval=10# master提交任务失败,重试次数
master.task.commit.retryTimes=5# master提交任务失败,重试时间间隔
master.task.commit.interval=1000# master最大cpu平均负载,只有当系统cpu平均负载还没有达到这个值,master才能调度任务
# 默认值为-1,系统cpu核数 * 2
master.max.cpuload.avg=-1# master为其他进程保留内存,只有当系统可用内存大于这个值,master才能调度
# 默认值0.3G
master.reserved.memory=0.3

Master Scheduler启动

MasterSchedulerService初始化方法

public void init(){// masterConfig.getMasterExecThreads(),master.properties里master.exec.threads=100// 该线程池的核心线程数和最大线程数为100this.masterExecService = (ThreadPoolExecutor)ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads());NettyClientConfig clientConfig = new NettyClientConfig();this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
}

MasterSchedulerService启动方法

public void run() {logger.info("master scheduler started");while (Stopper.isRunning()){try {// 这个方法是用来检查master cpu load和memory,判断master是否还有资源进行调度// 如果不能调度,Sleep 1 秒种boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory());if(!runCheckFlag) {Thread.sleep(Constants.SLEEP_TIME_MILLIS);continue;}if (zkMasterClient.getZkClient().getState() == CuratorFrameworkState.STARTED) {// 这里才是真正去执行调度的方法scheduleProcess();}} catch (Exception e) {logger.error("master scheduler thread error", e);}}
}

MasterSchedulerService调度方法

private void scheduleProcess() throws Exception {InterProcessMutex mutex = null;try {// 阻塞式获取分布式锁mutex = zkMasterClient.blockAcquireMutex();// 获取线程池的活跃线程数int activeCount = masterExecService.getActiveCount();// make sure to scan and delete command  table in one transaction// 获取其中一个command,必须保证操作都在一个事务里Command command = processService.findOneCommand();if (command != null) {logger.info("find one command: id: {}, type: {}", command.getId(),command.getCommandType());try{// 获取ProcessInstance,// 这个方法会根据master.exec.threads配置和活跃线程数来判断是否可以调度processInstanceProcessInstance processInstance = processService.handleCommand(logger,getLocalAddress(),this.masterConfig.getMasterExecThreads() - activeCount, command);if (processInstance != null) {logger.info("start master exec thread , split DAG ...");masterExecService.execute(new MasterExecThread(processInstance, processService, nettyRemotingClient));}}catch (Exception e){logger.error("scan command error ", e);processService.moveToErrorCommand(command, e.toString());}} else{//indicate that no command ,sleep for 1sThread.sleep(Constants.SLEEP_TIME_MILLIS);}} finally{// 释放分布式锁zkMasterClient.releaseMutex(mutex);}
}

ProcessService处理Command的方法

public ProcessInstance handleCommand(Logger logger, String host, int validThreadNum, Command command) {// 这里是去构造ProcessInstanceProcessInstance processInstance = constructProcessInstance(command, host);//cannot construct process instance, return null;if(processInstance == null){logger.error("scan command, command parameter is error: {}", command);moveToErrorCommand(command, "process instance is null");return null;}// 这里是检测当前剩余线程数是否大于等于该ProcessDefinition及其所有子Process的数量// 如果检测不通过,process instance的状态变为wait thread.并且返回空的process instanceif(!checkThreadNum(command, validThreadNum)){logger.info("there is not enough thread for this command: {}", command);return setWaitingThreadProcess(command, processInstance);}processInstance.setCommandType(command.getCommandType());processInstance.addHistoryCmd(command.getCommandType());saveProcessInstance(processInstance);this.setSubProcessParam(processInstance);delCommandByid(command.getId());return processInstance;
}

MasterExecThread初始化方法

public MasterExecThread(ProcessInstance processInstance, ProcessService processService, NettyRemotingClient nettyRemotingClient){this.processService = processService;this.processInstance = processInstance;this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class);// master.properties文件里的master.task.exec.numint masterTaskExecNum = masterConfig.getMasterExecTaskNum();this.taskExecService = ThreadUtils.newDaemonFixedThreadExecutor("Master-Task-Exec-Thread",masterTaskExecNum);this.nettyRemotingClient = nettyRemotingClient;
}

MasterExecThread启动方法

public void run() {// 省略...try {if (processInstance.isComplementData() &&  Flag.NO == processInstance.getIsSubProcess()){// 补数逻辑... 暂不看executeComplementProcess();}else{// 执行task方法executeProcess();}}catch (Exception e){logger.error("master exec thread exception", e);logger.error("process execute failed, process id:{}", processInstance.getId());processInstance.setState(ExecutionStatus.FAILURE);processInstance.setEndTime(new Date());processService.updateProcessInstance(processInstance);}finally {taskExecService.shutdown();}
}private void executeProcess() throws Exception {// 前置prepareProcess();// 执行runProcess();// 后置endProcess();
}private void runProcess(){// 从根task开始提交submitPostNode(null);boolean sendTimeWarning = false;while(!processInstance.isProcessInstanceStop() && Stopper.isRunning()){// 省略部分代码...// 根据cpu load avg和Memorry判断是否可以调度if(canSubmitTaskToQueue()){submitStandByTask();}try {Thread.sleep(Constants.SLEEP_TIME_MILLIS);} catch (InterruptedException e) {logger.error(e.getMessage(),e);}updateProcessInstanceState();}logger.info("process:{} end, state :{}", processInstance.getId(), processInstance.getState());
}// 获取可以并行的task
/**
* task 1 -> task 2 -> task3
* task 4 -> task 5
* task 6
* task 1,task4,task6可以并行跑
*/
private void submitPostNode(String parentNodeName){Set<String> submitTaskNodeList = DagHelper.parsePostNodes(parentNodeName, skipTaskNodeList, dag, completeTaskList);List<TaskInstance> taskInstances = new ArrayList<>();for(String taskNode : submitTaskNodeList){taskInstances.add(createTaskInstance(processInstance, taskNode,dag.getNode(taskNode)));}// if previous node success , post node submitfor(TaskInstance task : taskInstances){if(readyToSubmitTaskQueue.contains(task)){continue;}if(completeTaskList.containsKey(task.getName())){logger.info("task {} has already run success", task.getName());continue;}if(task.getState().typeIsPause() || task.getState().typeIsCancel()){logger.info("task {} stopped, the state is {}", task.getName(), task.getState());}else{// task添加到priorityQueueaddTaskToStandByList(task);}}
}/*** handling the list of tasks to be submitted*/
private void submitStandByTask(){try {int length = readyToSubmitTaskQueue.size();for (int i=0;i<length;i++) {// 从队列里面取task, 提交给worker执行TaskInstance task = readyToSubmitTaskQueue.peek();// 先判断task的前置依赖有没有都运行成功,如果运行成功,在提交该task运行// 如果运行失败,或者没有执行,则不提交DependResult dependResult = getDependResultForTask(task);if(DependResult.SUCCESS == dependResult){if(retryTaskIntervalOverTime(task)){submitTaskExec(task);removeTaskFromStandbyList(task);}}else if(DependResult.FAILED == dependResult){// if the dependency fails, the current node is not submitted and the state changes to failure.dependFailedTask.put(task.getName(), task);removeTaskFromStandbyList(task);logger.info("task {},id:{} depend result : {}",task.getName(), task.getId(), dependResult);} else if (DependResult.NON_EXEC == dependResult) {// for some reasons(depend task pause/stop) this task would not be submitremoveTaskFromStandbyList(task);logger.info("remove task {},id:{} , because depend result : {}", task.getName(), task.getId(), dependResult);}}} catch (Exception e) {logger.error("submit standby task error",e);}
}/**
* 创建TaskExecThread
*/
private TaskInstance submitTaskExec(TaskInstance taskInstance) {MasterBaseTaskExecThread abstractExecThread = null;if(taskInstance.isSubProcess()){abstractExecThread = new SubProcessTaskExecThread(taskInstance);}else if(taskInstance.isDependTask()){abstractExecThread = new DependentTaskExecThread(taskInstance);}else if(taskInstance.isConditionsTask()){abstractExecThread = new ConditionsTaskExecThread(taskInstance);}else {abstractExecThread = new MasterTaskExecThread(taskInstance);}Future<Boolean> future = taskExecService.submit(abstractExecThread);activeTaskNode.putIfAbsent(abstractExecThread, future);return abstractExecThread.getTaskInstance();
}

MasterBaseTaskExecThread

MasterBaseTaskExecThreadSubProcessTaskExecThreadDependentTaskExecThreadConditionsTaskExecThreadMasterTaskExecThread的父类,实现Callable接口。

  • SubProcessTaskExecThread

    任务实例不会下发到worker节点执行,在submitTask(TaskInstance taskInstance)方法中,针对子流程,会增加一条子流程实例命令,然后在waitTaskQuit方法中循环等待子流程执行完成。在当前工作流运行结束后会继续运行子工作流并做相关状态更新,子工作流完全完成才同步状态为子工作流的状态。

  • DependentTaskExecThread

    Dependent 节点,就是依赖检查节点。比如 A 流程依赖昨天的 B 流程执行成功,依赖节点会去检查 B 流程在昨天是否有执行成功的实例。

  • ConditionsTaskExecThrea

    Conditions 是一个条件节点,根据上游任务运行状态,判断应该运行哪个下游任务。截止目前 Conditions 支持多个上游任务,但只支持两个下游任务。当上游任务数超过一个时,可以通过且以及或操作符实现复杂上游依赖。

  • MasterTaskExecThread

    将任务实例下发到worker节点执行,并在waitTaskQuit方法中循环等待任务实例执行完成,任务完成后则即出。例如SQKL,Shell等任务类型。

MasterBaseTaskExecThread初始化方法

public MasterBaseTaskExecThread(TaskInstance taskInstance){this.processService = SpringApplicationContext.getBean(ProcessService.class);this.alertDao = SpringApplicationContext.getBean(AlertDao.class);this.cancel = false;this.taskInstance = taskInstance;this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class);this.taskUpdateQueue = SpringApplicationContext.getBean(TaskPriorityQueueImpl.class);initTaskParams();
}

MasterBaseTaskExecThread执行方法

@Override
public Boolean call() throws Exception {this.processInstance = processService.findProcessInstanceById(taskInstance.getProcessInstanceId());return submitWaitComplete(); // 由各子类实现
}

MasterBaseTaskExecThread公共方法

submit()

protected TaskInstance submit(){// 提交任务重试次数. master.task.commit.retryTimes=5Integer commitRetryTimes = masterConfig.getMasterTaskCommitRetryTimes();// 提交任务失败,重试间隔时间 master.task.commit.interval=1000Integer commitRetryInterval = masterConfig.getMasterTaskCommitInterval();int retryTimes = 1;boolean submitDB = false;boolean submitTask = false;TaskInstance task = null;while (retryTimes <= commitRetryTimes){try {if(!submitDB){// 持久化TaskInstance到数据库task = processService.submitTask(taskInstance);if(task != null && task.getId() != 0){submitDB = true;}}if(submitDB && !submitTask){// 分发任务到Woroker执行submitTask = dispatchTask(task);}if(submitDB && submitTask){return task;}if(!submitDB){logger.error("task commit to db failed , taskId {} has already retry {} times, please check the database", taskInstance.getId(), retryTimes);}else if(!submitTask){logger.error("task commit  failed , taskId {} has already retry {} times, please check", taskInstance.getId(), retryTimes);}Thread.sleep(commitRetryInterval);} catch (Exception e) {logger.error("task commit to mysql and dispatcht task failed",e);}retryTimes += 1;}return task;
}

dispatchTask(TaskInstance task)

public Boolean dispatchTask(TaskInstance taskInstance) {try{// 如果是子流程,条件任务,依赖任务,直接返回true,不提交给worker执行if(taskInstance.isConditionsTask()|| taskInstance.isDependTask()|| taskInstance.isSubProcess()){return true;}if(taskInstance.getState().typeIsFinished()){logger.info(String.format("submit task , but task [%s] state [%s] is already  finished. ", taskInstance.getName(), taskInstance.getState().toString()));return true;}// task cannot submit when runningif(taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION){logger.info(String.format("submit to task, but task [%s] state already be running. ", taskInstance.getName()));return true;}logger.info("task ready to submit: {}", taskInstance);/***  taskPriority*/TaskPriority taskPriority = buildTaskPriority(processInstance.getProcessInstancePriority().getCode(),processInstance.getId(),taskInstance.getProcessInstancePriority().getCode(),taskInstance.getId(),org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP);// 放入TaskPriorityQueue中,// org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl用于消费,从队列里取出TaskInstance,提交给Worker执行taskUpdateQueue.put(taskPriority);logger.info(String.format("master submit success, task : %s", taskInstance.getName()) );return true;}catch (Exception e){logger.error("submit task  Exception: ", e);logger.error("task error : %s", JSONUtils.toJson(taskInstance));return false;}
}

MasterTaskExecThread

submitWaitComplete()

public Boolean submitWaitComplete() {Boolean result = false;// 提交任务this.taskInstance = submit();if(this.taskInstance == null){logger.error("submit task instance to mysql and queue failed , please check and fix it");return result;}if(!this.taskInstance.getState().typeIsFinished()) {// 等待任务执行结果result = waitTaskQuit();}taskInstance.setEndTime(new Date());processService.updateTaskInstance(taskInstance);logger.info("task :{} id:{}, process id:{}, exec thread completed ",this.taskInstance.getName(),taskInstance.getId(), processInstance.getId() );return result;
}

waitTaskQuit()

public Boolean waitTaskQuit(){// query new statetaskInstance = processService.findTaskInstanceById(taskInstance.getId());logger.info("wait task: process id: {}, task id:{}, task name:{} complete",this.taskInstance.getProcessInstanceId(), this.taskInstance.getId(), this.taskInstance.getName());while (Stopper.isRunning()){try {if(this.processInstance == null){logger.error("process instance not exists , master task exec thread exit");return true;}// task instance add queue , waiting worker to killif(this.cancel || this.processInstance.getState() == ExecutionStatus.READY_STOP){cancelTaskInstance();}if(processInstance.getState() == ExecutionStatus.READY_PAUSE){pauseTask();}// task instance finishedif (taskInstance.getState().typeIsFinished()){// if task is final result , then remove taskInstance from cache// taskInstanceCacheManager其实现类为:org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl// taskInstance在触发ack和response Command会被添加到taskInstanceCache里taskInstanceCacheManager.removeByTaskInstanceId(taskInstance.getId());break;}if (checkTaskTimeout()) {this.checkTimeoutFlag = !alertTimeout();}// updateProcessInstance task instancetaskInstance = processService.findTaskInstanceById(taskInstance.getId());processInstance = processService.findProcessInstanceById(processInstance.getId());Thread.sleep(Constants.SLEEP_TIME_MILLIS);} catch (Exception e) {logger.error("exception",e);if (processInstance != null) {logger.error("wait task quit failed, instance id:{}, task id:{}",processInstance.getId(), taskInstance.getId());}}}return true;
}

SubProcessTaskExecThread

submitWaitComplete()

public Boolean submitWaitComplete() {Boolean result = false;try{// submit task instancethis.taskInstance = submit();if(taskInstance == null){logger.error("sub work flow submit task instance to mysql and queue failed , please check and fix it");return result;}setTaskInstanceState();waitTaskQuit();subProcessInstance = processService.findSubProcessInstance(processInstance.getId(), taskInstance.getId());// at the end of the subflow , the task state is changed to the subflow stateif(subProcessInstance != null){if(subProcessInstance.getState() == ExecutionStatus.STOP){this.taskInstance.setState(ExecutionStatus.KILL);}else{this.taskInstance.setState(subProcessInstance.getState());}}taskInstance.setEndTime(new Date());processService.updateTaskInstance(taskInstance);logger.info("subflow task :{} id:{}, process id:{}, exec thread completed ",this.taskInstance.getName(),taskInstance.getId(), processInstance.getId() );result = true;}catch (Exception e){logger.error("exception: ",e);if (null != taskInstance) {logger.error("wait task quit failed, instance id:{}, task id:{}",processInstance.getId(), taskInstance.getId());}}return result;
}

waitTaskQuit()

private void waitTaskQuit() throws InterruptedException {logger.info("wait sub work flow: {} complete", this.taskInstance.getName());if (taskInstance.getState().typeIsFinished()) {logger.info("sub work flow task {} already complete. task state:{}, parent work flow instance state:{}",this.taskInstance.getName(),this.taskInstance.getState(),this.processInstance.getState());return;}while (Stopper.isRunning()) {// waiting for subflow process instance establishmentif (subProcessInstance == null) {Thread.sleep(Constants.SLEEP_TIME_MILLIS);if(!setTaskInstanceState()){continue;}}subProcessInstance = processService.findProcessInstanceById(subProcessInstance.getId());if (checkTaskTimeout()) {this.checkTimeoutFlag = !alertTimeout();handleTimeoutFailed();}updateParentProcessState();if (subProcessInstance.getState().typeIsFinished()){break;}if(this.processInstance.getState() == ExecutionStatus.READY_PAUSE){// parent process "ready to pause" , child process "pause"pauseSubProcess();}else if(this.cancel || this.processInstance.getState() == ExecutionStatus.READY_STOP){// parent Process "Ready to Cancel" , subflow "Cancel"stopSubProcess();}Thread.sleep(Constants.SLEEP_TIME_MILLIS);}
}

ConditionsTaskExecThread

submitWaitComplete()

public Boolean submitWaitComplete() {try{this.taskInstance = submit();logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,taskInstance.getProcessDefinitionId(),taskInstance.getProcessInstanceId(),taskInstance.getId()));String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance));Thread.currentThread().setName(threadLoggerInfoName);initTaskParameters();logger.info("dependent task start");// 等待判断waitTaskQuit();// 更新最终依赖结果updateTaskState();}catch (Exception e){logger.error("conditions task run exception" , e);}return true;
}

waitTaskQuit

private void waitTaskQuit() {List<TaskInstance> taskInstances = processService.findValidTaskListByProcessId(taskInstance.getProcessInstanceId());for(TaskInstance task : taskInstances){completeTaskList.putIfAbsent(task.getName(), task.getState());}// 获取所有依赖结果List<DependResult> modelResultList = new ArrayList<>();for(DependentTaskModel dependentTaskModel : dependentParameters.getDependTaskList()){List<DependResult> itemDependResult = new ArrayList<>();for(DependentItem item : dependentTaskModel.getDependItemList()){itemDependResult.add(getDependResultForItem(item));}DependResult modelResult = DependentUtils.getDependResultForRelation(dependentTaskModel.getRelation(), itemDependResult);modelResultList.add(modelResult);}// 根据逻辑运算符,合并依赖结果conditionResult = DependentUtils.getDependResultForRelation(dependentParameters.getRelation(), modelResultList);logger.info("the conditions task depend result : {}", conditionResult);
}

DependentTaskExecThread

submitWaitComplete()

public Boolean submitWaitComplete() {try{logger.info("dependent task start");this.taskInstance = submit();logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,taskInstance.getProcessDefinitionId(),taskInstance.getProcessInstanceId(),taskInstance.getId()));String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance));Thread.currentThread().setName(threadLoggerInfoName);initTaskParameters();initDependParameters();waitTaskQuit();updateTaskState();}catch (Exception e){logger.error("dependent task run exception" , e);}return true;
}

waitTaskQuit()

private Boolean waitTaskQuit() {logger.info("wait depend task : {} complete", this.taskInstance.getName());if (taskInstance.getState().typeIsFinished()) {logger.info("task {} already complete. task state:{}",this.taskInstance.getName(),this.taskInstance.getState());return true;}while (Stopper.isRunning()) {try{if(this.processInstance == null){logger.error("process instance not exists , master task exec thread exit");return true;}// 省略部分代码// allDependentTaskFinish()等待所有依赖任务执行结束if ( allDependentTaskFinish() || taskInstance.getState().typeIsFinished()){break;}// update process tasktaskInstance = processService.findTaskInstanceById(taskInstance.getId());processInstance = processService.findProcessInstanceById(processInstance.getId());Thread.sleep(Constants.SLEEP_TIME_MILLIS);} catch (Exception e) {logger.error("exception",e);if (processInstance != null) {logger.error("wait task quit failed, instance id:{}, task id:{}",processInstance.getId(), taskInstance.getId());}}}return true;
}

TaskPriorityQueueConsumer

@Override
public void run() {List<TaskPriority> failedDispatchTasks = new ArrayList<>();while (Stopper.isRunning()){try {// 每一批次分发任务数量,master.dispatch.task.num = 3int fetchTaskNum = masterConfig.getMasterDispatchTaskNumber();failedDispatchTasks.clear();for(int i = 0; i < fetchTaskNum; i++){if(taskPriorityQueue.size() <= 0){Thread.sleep(Constants.SLEEP_TIME_MILLIS);continue;}// if not task , blocking here// 从队列里面获取taskTaskPriority taskPriority = taskPriorityQueue.take();// 分发给worker执行boolean dispatchResult = dispatch(taskPriority);if(!dispatchResult){failedDispatchTasks.add(taskPriority);}}if (!failedDispatchTasks.isEmpty()) {// 分发失败的任务,需要重新加入队列中,等待重新分发for (TaskPriority dispatchFailedTask : failedDispatchTasks) {taskPriorityQueue.put(dispatchFailedTask);}// If there are tasks in a cycle that cannot find the worker group,// sleep for 1 secondif (taskPriorityQueue.size() <= failedDispatchTasks.size()) {TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);}}}catch (Exception e){logger.error("dispatcher task error",e);}}
}/*** dispatch task** @param taskPriority taskPriority* @return result*/
protected boolean dispatch(TaskPriority taskPriority) {boolean result = false;try {int taskInstanceId = taskPriority.getTaskId();TaskExecutionContext context = getTaskExecutionContext(taskInstanceId);ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup());if (taskInstanceIsFinalState(taskInstanceId)){// when task finish, ignore this task, there is no need to dispatch anymorereturn true;}else{// 分发任务// 分发算法支持:低负载优先算法,随机算法, 轮询算法。result = dispatcher.dispatch(executionContext);}} catch (ExecuteException e) {logger.error("dispatch error: {}",e.getMessage());}return result;
}

通过对 Apache DolphinScheduler 1.3.9 的源码分析,我们深入了解了其核心模块的设计和实现。DolphinScheduler 的 Master 架构充分保证了任务调度的高可用性和扩展性,而通过 Zookeeper 实现的集群协调则为系统提供了强大的容错机制。

如果你对 Apache DolphinScheduler 的源码有兴趣,可以深入研究其任务调度策略的细节部分,或者根据自身业务场景进行二次开发,充分发挥 DolphinScheduler 的调度能力。

本文完!

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/435974.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

html+css(如何用css做出京东页面,静态版)

<!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>京东</title><link rel"stylesheet&q…

AR 眼镜之-蓝牙电话-来电铃声与系统音效

目录 &#x1f4c2; 前言 AR 眼镜系统版本 蓝牙电话 来电铃声 系统音效 1. &#x1f531; Android9 原生的来电铃声&#xff0c;走的哪个通道&#xff1f; 2. &#x1f4a0; Android9 原生的来电铃声&#xff0c;使用什么播放&#xff1f; 2.1 来电铃声创建准备 2.2 来…

联宇集团:如何利用CRM实现客户管理精细化与业务流程高效协同

在全球化的浪潮中&#xff0c;跨境电商正成为国际贸易的新引擎。作为领先的跨境电商物流综合服务商&#xff0c;广东联宇物流有限公司(以下称“联宇集团”)以其卓越的物流服务和前瞻的数字化战略&#xff0c;在全球市场中脱颖而出。本文将基于联宇集团搭建CRM系统的实际案例&am…

PV大题--专题突破

写在前面&#xff1a; PV大题考查使用伪代码控制进程之间的同步互斥关系&#xff0c;它需要我们一定的代码分析能力&#xff0c;算法设计能力&#xff0c;有时候会给你一段伪代码让你补全使用信号量控制的操作&#xff0c;请一定不要相信某些人告诉你只要背一个什么模板&#…

国庆偷偷卷!小众降维!POD-Transformer多变量回归预测(Matlab)

目录 效果一览基本介绍程序设计参考资料 效果一览 基本介绍 1.Matlab实现POD-Transformer多变量回归预测&#xff0c;本征正交分解数据降维融合Transformer多变量回归预测&#xff0c;使用SVD进行POD分解&#xff08;本征正交分解&#xff09;&#xff1b; 2.运行环境Matlab20…

MobaXterm基本使用 -- 服务器状态、批量操作、显示/切换中文字体、修复zsh按键失灵

监控服务器资源 参考网址&#xff1a;https://www.cnblogs.com/144823836yj/p/12126314.html 显示效果 MobaXterm提供有这项功能&#xff0c;在会话窗口底部&#xff0c;显示服务器资源使用情况 如内存、CPU、网速、磁盘使用等&#xff1a; &#xff08;完整窗口&#xff0…

BEVDet---论文+源码解读

论文链接&#xff1a;https://arxiv.org/pdf/2112.11790.pdf&#xff1b; Github仓库源码&#xff1a;https://github.com/HuangJunJie2017/BEVDet&#xff1b; BEVDet这篇论文主要是提出了一种基于BEV空间下的3D目标检测范式&#xff0c;BEVDet算法模型的整体流程图如下&…

汽车总线之---- LIN总线

Introduction LIN总线的简介&#xff0c;对于传统的这种点对点的连接方式&#xff0c;我们可以看到ECU相关的传感器和执行器是直接连接到ECU的&#xff0c;当传感器和执行器的数量较少时&#xff0c;这样的连接方式是能满足要求的&#xff0c;但是随着汽车电控功能数量的不断增…

基于单片机的指纹打卡系统

目录 一、主要功能 二、硬件资源 三、程序编程 四、实现现象 一、主要功能 基于STC89C52RC&#xff0c;采用两个按键替代指纹&#xff0c;一个按键按下&#xff0c;LCD12864显示比对成功&#xff0c;则 采用ULN2003驱动步进电机转动&#xff0c;表示开门&#xff0c;另一个…

RTMP、RTSP直播播放器的低延迟设计探讨

技术背景 没有多少开发者会相信RTMP或RTSP播放器&#xff0c;延迟会做到150-300ms内&#xff0c;除非测试过大牛直播SDK的&#xff0c;以Android平台启动轻量级RTSP服务和推送RTMP&#xff0c;然后Windows分别播放RTSP和RTMP为例&#xff0c;整体延迟如下&#xff1a; 大牛直播…

深度学习后门攻击分析与实现(二)

前言 在本系列的第一部分中&#xff0c;我们已经掌握了深度学习中的后门攻击的特点以及基础的攻击方式&#xff0c;现在我们在第二部分中首先来学习深度学习后门攻击在传统网络空间安全中的应用。然后再来分析与实现一些颇具特点的深度学习后门攻击方式。 深度学习与网络空间…

探索甘肃非遗:Spring Boot网站开发案例

1 绪论 1.1 研究背景 当前社会各行业领域竞争压力非常大&#xff0c;随着当前时代的信息化&#xff0c;科学化发展&#xff0c;让社会各行业领域都争相使用新的信息技术&#xff0c;对行业内的各种相关数据进行科学化&#xff0c;规范化管理。这样的大环境让那些止步不前&#…

SpringBoot框架下体育馆管理系统的构建

1引言 1.1课题背景 当今时代是飞速发展的信息时代。在各行各业中离不开信息处理&#xff0c;这正是计算机被广泛应用于信息管理系统的环境。计算机的最大好处在于利用它能够进行信息管理。使用计算机进行信息控制&#xff0c;不仅提高了工作效率&#xff0c;而且大大的提高了其…

工具介绍---效率高+实用

Visual Studio Code (VS Code) 功能特点&#xff1a; 智能代码提示&#xff1a;内置的智能代码提示功能可以自动完成函数、变量等的输入&#xff0c;提高代码编写速度。插件丰富&#xff1a;支持成千上万的扩展插件&#xff0c;例如代码片段、主题、Linting等&#xff0c;能够…

通信工程学习:什么是CSMA/CD载波监听多路访问/冲突检测

CSMA/CD&#xff1a;载波监听多路访问/冲突检测 CSMA/CD&#xff08;Carrier Sense Multiple Access/Collision Detect&#xff09;&#xff0c;即载波监听多路访问/冲突检测&#xff0c;是一种用于数据通信的介质访问控制协议&#xff0c;广泛应用于局域网&#xff08;特别是以…

vue仿chatGpt的AI聊天功能--大模型通义千问(阿里云)

vue仿chatGpt的AI聊天功能–大模型通义千问&#xff08;阿里云&#xff09; 通义千问是由阿里云自主研发的大语言模型&#xff0c;用于理解和分析用户输入的自然语言。 1. 创建API-KEY并配置环境变量 打开通义千问网站进行登录&#xff0c;登陆之后创建api-key&#xff0c;右…

李宏毅机器学习2023-HW10-Adversarial Attack

文章目录 TaskBaselineFGSM (Fast Gradient Sign Method (FGSM)I-FGSM(Iterative Fast Gradient Sign Method)MI-FGSM(Momentum Iterative Fast Gradient Sign Method)M-DI2-FGSM(Diverse Input Momentum Iterative Fast Gradient Sign Method) Reportfgsm attackJepg Compress…

Iceberg 基本操作和快速入门

安装 Iceberg 是一种适用于大型分析表的高性能工具&#xff0c;通过spark启动并运行iceberg&#xff0c;文章是通过docker来进行安装并测试的 新建一个docker-compose.yml文件 文件内容 version: "3" services: spark-iceberg: image: tabulario/spark-iceberg co…

数据结构之链表(2),双向链表

目录 前言 一、链表的分类详细 二、双向链表 三、双向链表的实现 四、List.c文件的完整代码 五、使用演示 总结 前言 接着上一篇单链表来详细说说链表中什么是带头和不带头&#xff0c;“哨兵位”是什么&#xff0c;什么是单向什么是双向&#xff0c;什么是循环和不循环。然后实…

微信小程序map组件自定义气泡真机不显示

最近遇到一个需求需要使用uniapp的map自定义气泡 &#xff0c;做完之后发现在模拟器上好好的&#xff0c;ios真机不显示&#xff0c;安卓页数时好时不好的 一番查询发现是小程序的老问题了&#xff0c;网上的方法都试了也没能解决 后来看到有人说用nvue可以正常显示&#xff0c…