Apache DolphinScheduler-1.3.9源码分析(二)

引言

随着大数据的发展,任务调度系统成为了数据处理和管理中至关重要的部分。Apache DolphinScheduler 是一款优秀的开源分布式工作流调度平台,在大数据场景中得到广泛应用。

在本文中,我们将对 Apache DolphinScheduler 1.3.9 版本的源码进行深入分析,主要分析一下Master和Worker的交互设计。

感兴趣的朋友也可以回顾我们上一篇文章:Apache DolphinScheduler-1.3.9源码分析(一)

Worker配置文件

# worker listener port
worker.listen.port=1234# worker execute thread number to limit task instances in parallel
# worker可并行的任务数限制
worker.exec.threads=100# worker heartbeat interval, the unit is second
# worker发送心跳间隔
worker.heartbeat.interval=10# worker max cpuload avg, only higher than the system cpu load average, worker server can be dispatched tasks. default value -1: the number of cpu cores * 2
# worker最大cpu平均负载,只有系统cpu平均负载低于该值,才能执行任务
# 默认值为-1,则最大cpu平均负载=系统cpu核数 * 2
worker.max.cpuload.avg=-1# worker reserved memory, only lower than system available memory, worker server can be dispatched tasks. default value 0.3, the unit is G
# worker的预留内存,只有当系统可用内存大于等于该值,才能执行任务,单位为GB
# 默认0.3G
worker.reserved.memory=0.3# default worker groups separated by comma, like 'worker.groups=default,test'
# 工作组名称,多个用,隔开
worker.groups=default

WorkerServer启动

public void run() {// init remoting serverNettyServerConfig serverConfig = new NettyServerConfig();serverConfig.setListenPort(workerConfig.getListenPort());this.nettyRemotingServer = new NettyRemotingServer(serverConfig);this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor());this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor());this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor());this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor());this.nettyRemotingServer.start();// worker registrytry {this.workerRegistry.registry();this.workerRegistry.getZookeeperRegistryCenter().setStoppable(this);Set<String> workerZkPaths = this.workerRegistry.getWorkerZkPaths();this.workerRegistry.getZookeeperRegistryCenter().getRegisterOperator().handleDeadServer(workerZkPaths, ZKNodeType.WORKER, Constants.DELETE_ZK_OP);} catch (Exception e) {logger.error(e.getMessage(), e);throw new RuntimeException(e);}// retry report task statusthis.retryReportTaskStatusThread.start();/*** register hooks, which are called before the process exits*/Runtime.getRuntime().addShutdownHook(new Thread(() -> {if (Stopper.isRunning()) {close("shutdownHook");}}));
}
注册四个Command:
  1. TASK_EXECUTE_REQUEST:task执行请求
  2. TASK_KILL_REQUEST:task停止请求
  3. DB_TASK_ACK:Worker接受到Master的调度请求,回应master
  4. DB_TASK_RESPONSE:
  • 注册WorkerServer到Zookeeper,并发送心跳
  • 报告Task执行状态

RetryReportTaskStatusThread

这是一个兜底机制,主要负责定时轮询向Master汇报任务的状态,直到Master回复状态的ACK,避免任务状态丢失;

每隔5分钟,检查一下responceCache中的ACK Cache和Response Cache是否为空,如果不为空则向Master发送ack_commandresponse command请求。

public void run() {ResponceCache responceCache = ResponceCache.get();while (Stopper.isRunning()){// sleep 5 minutesThreadUtils.sleep(RETRY_REPORT_TASK_STATUS_INTERVAL);try {if (!responceCache.getAckCache().isEmpty()){Map<Integer,Command> ackCache =  responceCache.getAckCache();for (Map.Entry<Integer, Command> entry : ackCache.entrySet()){Integer taskInstanceId = entry.getKey();Command ackCommand = entry.getValue();taskCallbackService.sendAck(taskInstanceId,ackCommand);}}if (!responceCache.getResponseCache().isEmpty()){Map<Integer,Command> responseCache =  responceCache.getResponseCache();for (Map.Entry<Integer, Command> entry : responseCache.entrySet()){Integer taskInstanceId = entry.getKey();Command responseCommand = entry.getValue();taskCallbackService.sendResult(taskInstanceId,responseCommand);}}}catch (Exception e){logger.warn("retry report task status error", e);}}
}

Master与Worker的交互设计

Apache DolphinScheduler Master和Worker模块是两个独立的JVM进程,可以部署在不同的服务器上,Master与Worker的通信都是通过Netty实现RPC交互的,一共用到7种处理器。

模块处理器作用
mastermasterTaskResponseProcessor处理TaskExecuteResponseCommand消息,将消息添加到TaskResponseService的任务响应队列中
mastermasterTaskAckProcessor处理TaskExecuteAckCommand消息,将消息添加到TaskResponseService的任务响应队列中
mastermasterTaskKillResponseProcessor处理TaskKillResponseCommand消息,并在日志中打印消息内容
workerworkerTaskExecuteProcessor处理TaskExecuteRequestCommand消息,并发送TaskExecuteAckCommand到master,提交任务执行
workerworkerTaskKillProcessor处理TaskKillRequestCommand消息,调用kill -9 pid杀死任务对应的进程,并向master发送TaskKillResponseCommand消息
workerworkerDBTaskAckProcessor处理DBTaskAckCommand消息,针对执行成功的任务,从ResponseCache中删除
workerworkerDBTaskResponseProcessor处理DBTaskResponseCommand消息,针对执行成功的任务,从ResponseCache中删除

分发任务如何交互

master#TaskPriorityQueueConsumer

Master任务里有一个TaskPriorityQueueConsumer,会从TaskPriorityQueue里每次取3个Task分发给Worker执行,这里会创建TaskExecuteRequestCommand

TaskPriorityQueueConsumer#run()

@Override
public void run() {List<TaskPriority> failedDispatchTasks = new ArrayList<>();while (Stopper.isRunning()){try {// 每一批次分发任务数量,master.dispatch.task.num = 3int fetchTaskNum = masterConfig.getMasterDispatchTaskNumber();failedDispatchTasks.clear();for(int i = 0; i < fetchTaskNum; i++){if(taskPriorityQueue.size() <= 0){Thread.sleep(Constants.SLEEP_TIME_MILLIS);continue;}// if not task , blocking here// 从队列里面获取taskTaskPriority taskPriority = taskPriorityQueue.take();// 分发给worker执行boolean dispatchResult = dispatch(taskPriority);if(!dispatchResult){failedDispatchTasks.add(taskPriority);}}if (!failedDispatchTasks.isEmpty()) {// 分发失败的任务,需要重新加入队列中,等待重新分发for (TaskPriority dispatchFailedTask : failedDispatchTasks) {taskPriorityQueue.put(dispatchFailedTask);}// If there are tasks in a cycle that cannot find the worker group,// sleep for 1 secondif (taskPriorityQueue.size() <= failedDispatchTasks.size()) {TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);}}}catch (Exception e){logger.error("dispatcher task error",e);}}
}

dispatcher

/*** dispatch task** @param taskPriority taskPriority* @return result*/
protected boolean dispatch(TaskPriority taskPriority) {boolean result = false;try {int taskInstanceId = taskPriority.getTaskId();TaskExecutionContext context = getTaskExecutionContext(taskInstanceId);// 这里创建TaskExecuteRequestCommandExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup());if (taskInstanceIsFinalState(taskInstanceId)){// when task finish, ignore this task, there is no need to dispatch anymorereturn true;}else{// 分发任务// 分发算法支持:低负载优先算法,随机算法, 轮询算法。result = dispatcher.dispatch(executionContext);}} catch (ExecuteException e) {logger.error("dispatch error: {}",e.getMessage());}return result;
}

TaskExecutionContext

// 摘录自org.apache.dolphinscheduler.server.entity.TaskExecutionContext#toCommand
public Command toCommand(){TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand();requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(this));return requestCommand.convert2Command();
}

分发算法实现

随机算法

public class RandomSelector<T> implements Selector<T> {private final Random random = new Random();public T select(final Collection<T> source) {if (source == null || source.size() == 0) {throw new IllegalArgumentException("Empty source.");}if (source.size() == 1) {return (T) source.toArray()[0];}int size = source.size();int randomIndex = random.nextInt(size);return (T) source.toArray()[randomIndex];}}

轮询算法

public class RoundRobinSelector<T> implements Selector<T> {private final AtomicInteger index = new AtomicInteger(0);public T select(Collection<T> source) {if (source == null || source.size() == 0) {throw new IllegalArgumentException("Empty source.");}if (source.size() == 1) {return (T)source.toArray()[0];}int size = source.size();return (T) source.toArray()[index.getAndIncrement() % size];}
}

低负载优先算法

public class LowerWeightRoundRobin implements Selector<HostWeight>{public HostWeight select(Collection<HostWeight> sources){int totalWeight = 0;int lowWeight = 0;HostWeight lowerNode = null;for (HostWeight hostWeight : sources) {totalWeight += hostWeight.getWeight();hostWeight.setCurrentWeight(hostWeight.getCurrentWeight() + hostWeight.getWeight());if (lowerNode == null || lowWeight > hostWeight.getCurrentWeight() ) {lowerNode = hostWeight;lowWeight = hostWeight.getCurrentWeight();}}lowerNode.setCurrentWeight(lowerNode.getCurrentWeight() + totalWeight);return lowerNode;}
}

TaskExecuteRequestCommand

TaskExecuteProcessor
构造方法
public TaskExecuteProcessor() {this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);// worker.exec.threads,默认100this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads());this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
}
process()方法
public void process(Channel channel, Command command) {Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(),String.format("invalid command type : %s", command.getType()));// 序列化TaskExecuteRequestCommandTaskExecuteRequestCommand taskRequestCommand = FastJsonSerializer.deserialize(command.getBody(), TaskExecuteRequestCommand.class);logger.info("received command : {}", taskRequestCommand);if (taskRequestCommand == null) {logger.error("task execute request command is null");return;}String contextJson = taskRequestCommand.getTaskExecutionContext();TaskExecutionContext taskExecutionContext = JSONObject.parseObject(contextJson, TaskExecutionContext.class);if (taskExecutionContext == null) {logger.error("task execution context is null");return;}// 存入taskExecutionContextCacheManagersetTaskCache(taskExecutionContext);// 创建任务日志Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,taskExecutionContext.getProcessDefineId(),taskExecutionContext.getProcessInstanceId(),taskExecutionContext.getTaskInstanceId()));taskExecutionContext.setHost(NetUtils.getAddr(workerConfig.getListenPort()));taskExecutionContext.setStartTime(new Date());taskExecutionContext.setLogPath(getTaskLogPath(taskExecutionContext));// local execute pathString execLocalPath = getExecLocalPath(taskExecutionContext);logger.info("task instance local execute path : {}", execLocalPath);taskExecutionContext.setExecutePath(execLocalPath);// ThreadLocal存储任务日志FileUtils.taskLoggerThreadLocal.set(taskLogger);try {// 创建执行FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, taskExecutionContext.getTenantCode());} catch (Throwable ex) {String errorLog = String.format("create execLocalPath : %s", execLocalPath);LoggerUtils.logError(Optional.ofNullable(logger), errorLog, ex);LoggerUtils.logError(Optional.ofNullable(taskLogger), errorLog, ex);taskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());}FileUtils.taskLoggerThreadLocal.remove();taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),new NettyRemoteChannel(channel, command.getOpaque()));// 向master发送TaskExecuteAckCommandthis.doAck(taskExecutionContext);// submit taskworkerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger));
}private void doAck(TaskExecutionContext taskExecutionContext){// tell master that task is in executingTaskExecuteAckCommand ackCommand = buildAckCommand(taskExecutionContext);ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(),ackCommand.convert2Command(),Event.ACK);taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand.convert2Command());
}

TaskExecuteThread

构造方法
public TaskExecuteThread(TaskExecutionContext taskExecutionContext, TaskCallbackService taskCallbackService, Logger taskLogger) {this.taskExecutionContext = taskExecutionContext;this.taskCallbackService = taskCallbackService;this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);this.taskLogger = taskLogger;
}
运行方法
public void run() {TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId());try {logger.info("script path : {}", taskExecutionContext.getExecutePath());// task nodeTaskNode taskNode = JSONObject.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class);// copy hdfs/minio file to local// 下载需要的资源,例如Spark/Flink jar,udf等downloadResource(taskExecutionContext.getExecutePath(),taskExecutionContext.getResources(),logger);taskExecutionContext.setTaskParams(taskNode.getParams());taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());taskExecutionContext.setDefinedParams(getGlobalParamsMap());// set task timeoutsetTaskTimeout(taskExecutionContext, taskNode);taskExecutionContext.setTaskAppId(String.format("%s_%s_%s",taskExecutionContext.getProcessDefineId(),taskExecutionContext.getProcessInstanceId(),taskExecutionContext.getTaskInstanceId()));// 创建任务task = TaskManager.newTask(taskExecutionContext, taskLogger);// 初始化任务task.init();// 构建任务所需要的参数preBuildBusinessParams();// 执行任务task.handle();// 任务执行完成后的动作task.after();responseCommand.setStatus(task.getExitStatus().getCode());responseCommand.setEndTime(new Date());responseCommand.setProcessId(task.getProcessId());responseCommand.setAppIds(task.getAppIds());logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), task.getExitStatus());} catch (Exception e) {logger.error("task scheduler failure", e);// 如果出现异常,kill taskkill();responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());responseCommand.setEndTime(new Date());responseCommand.setProcessId(task.getProcessId());responseCommand.setAppIds(task.getAppIds());} finally {// 从cache中去除任务执行上下文。taskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());// 缓存responseCommandResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT);// 向master发送ResponseCommandtaskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());// 清除task执行路径clearTaskExecPath();}
}

master#TaskResponseService

Worker在正常执行分发任务的时候,会向Master发送ACK Command 和 Response Command。

在Master中,则由TaskAckProcessorTaskResponseProcessor进行处理。

TaskAckProcessor

public void process(Channel channel, Command command) {Preconditions.checkArgument(CommandType.TASK_EXECUTE_ACK == command.getType(), String.format("invalid command type : %s", command.getType()));TaskExecuteAckCommand taskAckCommand = FastJsonSerializer.deserialize(command.getBody(), TaskExecuteAckCommand.class);logger.info("taskAckCommand : {}", taskAckCommand);// 添加缓存taskInstanceCacheManager.cacheTaskInstance(taskAckCommand);String workerAddress = ChannelUtils.toAddress(channel).getAddress();ExecutionStatus ackStatus = ExecutionStatus.of(taskAckCommand.getStatus());// TaskResponseEventTaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ackStatus,taskAckCommand.getStartTime(),workerAddress,taskAckCommand.getExecutePath(),taskAckCommand.getLogPath(),taskAckCommand.getTaskInstanceId(),channel);// 主要处理逻辑taskResponseService.addResponse(taskResponseEvent);
}

TaskResponseProcessor

public void process(Channel channel, Command command) {Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));TaskExecuteResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), TaskExecuteResponseCommand.class);logger.info("received command : {}", responseCommand);// 缓存taskInstanceCacheManager.cacheTaskInstance(responseCommand);// TaskResponseEventTaskResponseEvent taskResponseEvent = TaskResponseEvent.newResult(ExecutionStatus.of(responseCommand.getStatus()),responseCommand.getEndTime(),responseCommand.getProcessId(),responseCommand.getAppIds(),responseCommand.getTaskInstanceId(),channel);// 主要处理逻辑taskResponseService.addResponse(taskResponseEvent);
}

TaskResponseService

通过TaskResponseProcessorTaskAckProcessor发现,其主要逻辑都在TaskResponseService类中,而TaskResponseService中处理事件,是通过TaskResponseWorker线程实现的。

// TaskResponseEvent队列是阻塞队列
private final BlockingQueue<TaskResponseEvent> eventQueue = new LinkedBlockingQueue<>(5000);class TaskResponseWorker extends Thread {@Overridepublic void run() {while (Stopper.isRunning()){try {// 如果没有任务事件,则会阻塞在这里TaskResponseEvent taskResponseEvent = eventQueue.take();// 任务实例状态持久化到数据库persist(taskResponseEvent);} catch (InterruptedException e){break;} catch (Exception e){logger.error("persist task error",e);}}logger.info("TaskResponseWorker stopped");}}/*** persist  taskResponseEvent* @param taskResponseEvent taskResponseEvent*/private void persist(TaskResponseEvent taskResponseEvent){Event event = taskResponseEvent.getEvent();Channel channel = taskResponseEvent.getChannel();switch (event){case ACK:try {TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());if (taskInstance != null) {ExecutionStatus status = taskInstance.getState().typeIsFinished() ? taskInstance.getState() : taskResponseEvent.getState();processService.changeTaskState(status,taskResponseEvent.getStartTime(),taskResponseEvent.getWorkerAddress(),taskResponseEvent.getExecutePath(),taskResponseEvent.getLogPath(),taskResponseEvent.getTaskInstanceId());}// 向worker发送DB_TASK_ACK请求DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());channel.writeAndFlush(taskAckCommand.convert2Command());}catch (Exception e){logger.error("worker ack master error",e);DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(),-1);channel.writeAndFlush(taskAckCommand.convert2Command());}break;case RESULT:try {TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());if (taskInstance != null){processService.changeTaskState(taskResponseEvent.getState(),taskResponseEvent.getEndTime(),taskResponseEvent.getProcessId(),taskResponseEvent.getAppIds(),taskResponseEvent.getTaskInstanceId());}// 向worker发送DB_TASK_RESPONSE请求DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.SUCCESS.getCode(),taskResponseEvent.getTaskInstanceId());channel.writeAndFlush(taskResponseCommand.convert2Command());}catch (Exception e){logger.error("worker response master error",e);DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(),-1);channel.writeAndFlush(taskResponseCommand.convert2Command());}break;default:throw new IllegalArgumentException("invalid event type : " + event);}}

Worker#DBTaskAckProcessor和DBTaskResponseProcessor

Worker接受到Master的db_task_ack commanddb_task_response command,对应的处理器为DBTaskAckProcessorDBTaskResponseProcessor,其逻辑都是从ResponceCache删除对应的task instance command

DBTaskAckProcessor

public void process(Channel channel, Command command) {Preconditions.checkArgument(CommandType.DB_TASK_ACK == command.getType(),String.format("invalid command type : %s", command.getType()));DBTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize(command.getBody(), DBTaskAckCommand.class);if (taskAckCommand == null){return;}if (taskAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()){ResponceCache.get().removeAckCache(taskAckCommand.getTaskInstanceId());}
}

DBTaskResponseProcessor

public void process(Channel channel, Command command) {Preconditions.checkArgument(CommandType.DB_TASK_RESPONSE == command.getType(),String.format("invalid command type : %s", command.getType()));DBTaskResponseCommand taskResponseCommand = FastJsonSerializer.deserialize(command.getBody(), DBTaskResponseCommand.class);if (taskResponseCommand == null){return;}if (taskResponseCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()){ResponceCache.get().removeResponseCache(taskResponseCommand.getTaskInstanceId());}
}

停止任务如何交互

MasterTaskExecThread#waitTaskQuit

public Boolean waitTaskQuit(){// query new statetaskInstance = processService.findTaskInstanceById(taskInstance.getId());while (Stopper.isRunning()){try {// 省略代码...// task instance add queue , waiting worker to kill// 如果master接受到cancal请求,或者工作流状态为准备停止的状态// master会给worker发送kill request command请求if(this.cancel || this.processInstance.getState() == ExecutionStatus.READY_STOP){cancelTaskInstance();}// 省略代码...} catch (Exception e) {// 省略代码...}}return true;
}private void cancelTaskInstance() throws Exception{if(alreadyKilled){return;}alreadyKilled = true;taskInstance = processService.findTaskInstanceById(taskInstance.getId());if(StringUtils.isBlank(taskInstance.getHost())){taskInstance.setState(ExecutionStatus.KILL);taskInstance.setEndTime(new Date());processService.updateTaskInstance(taskInstance);return;}// 构造TaskKillRequestCommandTaskKillRequestCommand killCommand = new TaskKillRequestCommand();killCommand.setTaskInstanceId(taskInstance.getId());ExecutionContext executionContext = new ExecutionContext(killCommand.convert2Command(), ExecutorType.WORKER);Host host = Host.of(taskInstance.getHost());executionContext.setHost(host);nettyExecutorManager.executeDirectly(executionContext);logger.info("master kill taskInstance name :{} taskInstance id:{}",taskInstance.getName(), taskInstance.getId() );
}

Worker#TaskKillProcessor

TaskKillProcessor用于处理Master发送的Kill request command

public void process(Channel channel, Command command) {Preconditions.checkArgument(CommandType.TASK_KILL_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));TaskKillRequestCommand killCommand = FastJsonSerializer.deserialize(command.getBody(), TaskKillRequestCommand.class);logger.info("received kill command : {}", killCommand);Pair<Boolean, List<String>> result = doKill(killCommand);taskCallbackService.addRemoteChannel(killCommand.getTaskInstanceId(),new NettyRemoteChannel(channel, command.getOpaque()));// 向master发送kill response commandTaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(killCommand,result);taskCallbackService.sendResult(taskKillResponseCommand.getTaskInstanceId(), taskKillResponseCommand.convert2Command());taskExecutionContextCacheManager.removeByTaskInstanceId(taskKillResponseCommand.getTaskInstanceId());
}private Pair<Boolean, List<String>> doKill(TaskKillRequestCommand killCommand){boolean processFlag = true;List<String> appIds = Collections.emptyList();int taskInstanceId = killCommand.getTaskInstanceId();TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);try {Integer processId = taskExecutionContext.getProcessId();if (processId.equals(0)) {taskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);logger.info("the task has not been executed and has been cancelled, task id:{}", taskInstanceId);return Pair.of(true, appIds);}// 执行Kill -9 命令直接删除进程// spark or flink如果是提交到集群,暂时Kill不掉String pidsStr = ProcessUtils.getPidsStr(taskExecutionContext.getProcessId());if (StringUtils.isNotEmpty(pidsStr)) {String cmd = String.format("sudo kill -9 %s", ProcessUtils.getPidsStr(taskExecutionContext.getProcessId()));logger.info("process id:{}, cmd:{}", taskExecutionContext.getProcessId(), cmd);OSUtils.exeCmd(cmd);}} catch (Exception e) {processFlag = false;logger.error("kill task error", e);}// find log and kill yarn jobPair<Boolean, List<String>> yarnResult = killYarnJob(Host.of(taskExecutionContext.getHost()).getIp(),taskExecutionContext.getLogPath(),taskExecutionContext.getExecutePath(),taskExecutionContext.getTenantCode());return Pair.of(processFlag && yarnResult.getLeft(), yarnResult.getRight());
}

master#TaskKillResponseProcessor

TaskKillResponseProcessor用于master处理worker停止任务的响应请求。

public void process(Channel channel, Command command) {Preconditions.checkArgument(CommandType.TASK_KILL_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));TaskKillResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), TaskKillResponseCommand.class);logger.info("received task kill response command : {}", responseCommand);
}

通过对 Apache DolphinScheduler 1.3.9 的源码分析,我们深入了解了其核心模块的设计和实现。

如果你对 Apache DolphinScheduler 的源码有兴趣,可以深入研究其任务调度策略的细节部分,或者根据自身业务场景进行二次开发,充分发挥 DolphinScheduler 的调度能力。

本文完!

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/444277.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

pytorch导入数据集

1、概念&#xff1a; Dataset&#xff1a;一种数据结构&#xff0c;存储数据及其标签 Dataloader&#xff1a;一种工具&#xff0c;可以将Dataset里的数据分批、打乱、批量加载并进行迭代等 &#xff08;方便模型训练和验证&#xff09; Dataset就像一个大书架&#xff0c;存…

【Ubuntu】在Ubuntu上配置Java环境

【Ubuntu】在Ubuntu上配置Java环境 壹、前言 Java是运用得非常广泛的编程语言&#xff0c;在使用Linux时难免会碰到需要用到JDK的情况&#xff0c;故本文介绍如何在Ubuntu上配置Java21环境。 贰、下载 Java的下载渠道很多&#xff0c;有甲骨文公司的“官方”JDK&#xff0c…

WebGoat JAVA反序列化漏洞源码分析

目录 InsecureDeserializationTask.java 代码分析 反序列化漏洞知识补充 VulnerableTaskHolder类分析 poc 编写 WebGoat 靶场地址&#xff1a;GitHub - WebGoat/WebGoat: WebGoat is a deliberately insecure application 这里就不介绍怎么搭建了&#xff0c;可以参考其他…

小北的技术博客:探索华为昇腾CANN训练营与AI技术创新——Ascend C算子开发能力认证考试(中级)

前言 哈喽哈喽,这里是zyll~,北浊.(大家可以亲切的呼唤我叫小北)智慧龙阁的创始人,一个在大数据和全站领域不断深耕的技术创作者。今天,我想和大家分享一些关于华为昇腾CANN训练营以及AI技术创新的最新资讯和实践经验~(初级证书还没拿到的小伙伴,可以先参考小北的这篇技术…

HUAWEI_HCIA_实验指南_Lib2.1_交换机基础配置

1、原理概述 交换机之间通过以太网电接口对接时需要协商一些接口参数&#xff0c;比如速率、双工模式等。交换机的全双工是指交换机在发送数据的同时也能够接收数据&#xff0c;两者同时进行。就如平时打电话一样&#xff0c;说话的同时也能够听到对方的声音。而半双工指在同一…

Python Memcached 的工作原理

Python 解释 Memcached 的工作原理 在现代 Web 应用程序中&#xff0c;性能和响应速度是影响用户体验的关键因素。随着应用的用户数量和数据量的增加&#xff0c;数据库查询次数变得更加频繁&#xff0c;服务器负载也随之增加。如果每次请求都要通过数据库处理&#xff0c;那么…

003 Springboot操作RabbitMQ

Springboot整合RabbitMQ 文章目录 Springboot整合RabbitMQ1.pom依赖2.yml配置3.配置队列、交换机方式一&#xff1a;直接通过配置类配置bean方式二&#xff1a;消息监听通过注解配置 4.编写消息监听发送测试5.其他类型交换机配置1.FanoutExchange2.TopicExchange3.HeadersExcha…

继承--C++

文章目录 一、继承的概念及定义1、继承的概念 二、继承定义1、定义格式2、继承基类成员访问方式的变化3、继承类模板 三、基类和派生类间的转换1、继承中的作用域2、隐藏规则&#xff1a; 四、派生类的默认成员函数1、4个常见默认成员函数2、实现⼀个不能被继承的类 五、继承与…

Android15之解决:Dex checksum does not match for dex:services.jar问题(二百三十五)

简介&#xff1a; CSDN博客专家、《Android系统多媒体进阶实战》一书作者 新书发布&#xff1a;《Android系统多媒体进阶实战》&#x1f680; 优质专栏&#xff1a; Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 优质专栏&#xff1a; 多媒体系统工程师系列【…

【拥抱AIGC】应该如何衡量AI辅助编程带来的收益

本文主要介绍了如何度量研发效能&#xff0c;以及AI辅助编程是如何影响效能的&#xff0c;进而阐述如何衡量AI辅助编程带来的收益。 理解度量&#xff1a;有效区分度量指标 为了帮助研发团队更好地理解和度量研发效能&#xff0c;可以将指标分为三类&#xff1a;能力和行为指…

【含文档】基于Springboot+Vue的母婴全程服务管理系统(含源码+数据库+lw)

1.开发环境 开发系统:Windows10/11 架构模式:MVC/前后端分离 JDK版本: Java JDK1.8 开发工具:IDEA 数据库版本: mysql5.7或8.0 数据库可视化工具: navicat 服务器: SpringBoot自带 apache tomcat 主要技术: Java,Springboot,mybatis,mysql,vue 2.视频演示地址 3.功能 系统定…

vue3中 a-table设置某一个单元格的背景颜色

需求&#xff1a;根据某一个单元格中的某个条件不同&#xff0c;设置动态的颜色&#xff1b; 思路&#xff1a;通过官方文档提供的customCell进行判断设置不同的颜色背景&#xff0c;案例中进行了简单的行列判断&#xff0c;同学们可以根据自己的需求修改判断条件&#xff0c;动…

SSH 公钥认证:从gitlab clone项目repo到本地

这篇文章的分割线以下文字内容由 ChatGPT 生成&#xff08;我稍微做了一些文字上的调整和截图的补充&#xff09;&#xff0c;我review并实践后觉得内容没有什么问题&#xff0c;由此和大家分享。 假如你想通过 git clone git10.12.5.19:your_project.git 命令将 git 服务器上…

建筑工程系列中级职称申报有什么要求?

一、学历资历条件 1.理工类或建筑工程相关专业博士研究生毕业后&#xff0c;从事本专业技术工作&#xff0c;当年内经考核评审确认&#xff1b; 2.理工类或建筑工程相关专业硕士研究生毕业或取得双学士学位后&#xff0c;从事本专业技术工作 3 年以上&#xff0c;取得并被聘任…

【大模型理论篇】精简循环序列模型(minGRU/minLSTM)性能堪比Transformer以及对循环神经网络的回顾

1. 语言模型之精简RNN结构 近期关注到&#xff0c;Yoshua Bengio发布了一篇论文《Were RNNs All We Needed?》&#xff0c;提出简化版RNN&#xff08;minLSTM和minGRU&#xff09;。该工作的初始缘由&#xff1a;Transformer 在序列长度方面的扩展性限制重新引发了对可在训练期…

Vue包的安装使用

文章目录 vue介绍一、灵活易用1.渐进式框架2.简洁的语法 二、高效的响应式系统1.数据驱动2.响应式原理 三、强大的组件化开发1.组件化思想2.组件通信 四、丰富的生态系统1.插件和库2.社区支持 安装依赖删除新增文件夹components设置(1)home.vue(2)data.vue(3)zero.vue router配…

简单的maven nexus私服学习

简单的maven nexus私服学习 1.需求 我们现在使用的maven私服是之前同事搭建的&#xff0c;是在公司的一台windows电脑上面&#xff0c;如果出问题会比较难搞&#xff0c;所以现在想将私服迁移到我们公司的测试服务器上&#xff0c;此处简单了解一下私服的一些配置记录一下&am…

Visual Studio 2022安装(含重生版)

前言&#xff1a; 昨天调试代码的时候发现程序怎么都运行不了&#xff0c;错误显示无法找到文件啊啊啊&#xff0c;能力有限&#xff0c;找不出错误源&#xff0c;然后就狠心删掉所有相关文件来“重新开始”&#xff01; 正文&#xff1a; 1.官网下载&#xff08;内定中文版…

Java | Leetcode Java题解之第470题用Rand7()实现Rand10()

题目&#xff1a; 题解&#xff1a; class Solution extends SolBase {public int rand10() {int a, b, idx;while (true) {a rand7();b rand7();idx b (a - 1) * 7;if (idx < 40) {return 1 (idx - 1) % 10;}a idx - 40;b rand7();// get uniform dist from 1 - 63…

中标麒麟操作系统:如何查看系统激活状态

中标麒麟操作系统&#xff1a;如何查看系统激活状态 1、图形界面查看方法方法一&#xff1a;任务栏查看方法二&#xff1a;通过“我的电脑”属性查看 2、命令行查看方法 &#x1f490;The Begin&#x1f490;点点关注&#xff0c;收藏不迷路&#x1f490; 本文将介绍两种查看系…