手撸XXL-JOB(四)——远程调用定时任务

Java Socket网络编程

网络编程是Java编程中的重要组成部分,包括服务端和客户端两部分内容。Socket是Java网络编程的基本组件之一,用于在应用程序之间提供双向通信,Socket提供了一种标准的接口,允许应用程序通过网络发送和接收数据,在Java中,Socket可以分为客户端Socket和服务端Socket两种类型。
客户端Socket:客户端 Socket 用于与服务端 Socket 进行通信。客户端 Socket 通过指定服务端的 IP 地址和端口号,连接到服务端 Socket,然后发送数据到服务端 Socket。
服务端Socket:服务端 Socket 用于接收来自客户端 Socket 的连接请求,并在连接成功后,与客户端 Socket 进行通信。服务端 Socket 首先需要创建一个 ServerSocket 对象,并通过 bind 方法绑定到一个本地端口,然后等待客户端 Socket 的连接请求。
下面是Socket的一个示例:
服务端:

package org.example.demo1;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;public class Server {public static void main(String[] args) {try {ServerSocket serverSocket = new ServerSocket(8000);System.out.println("Server started, waiting for client...");Socket socket = serverSocket.accept();System.out.println("Client connected.");BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));PrintWriter out = new PrintWriter(socket.getOutputStream(), true);String message;while ((message = in.readLine()) != null) {System.out.println("Client:" + message);out.println("Server received message:" + message);}in.close();out.close();socket.close();serverSocket.close();} catch (IOException e) {e.printStackTrace();}}
}

客户端:

package org.example.demo1;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;public class Client {public static void main(String[] args) {try {Socket socket = new Socket("localhost", 8000);System.out.println("Connected to server.");BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));PrintWriter out = new PrintWriter(socket.getOutputStream(), true);BufferedReader consoleIn = new BufferedReader(new InputStreamReader(System.in));String message;while ((message = consoleIn.readLine()) != null) {out.println(message);System.out.println("Server:" + in.readLine());}consoleIn.close();in.close();out.close();socket.close();} catch (IOException e) {e.printStackTrace();}}
}

首先启动服务端,然后启动客户端,在客户端的控制台,输入数据,服务端能接收到数据并返回对应的响应。
image.png
image.png

远程调用定时任务

首先,我们创建两个模块,core模块包含yang-job的一些核心内容,比如IJobExecutor执行器、JobExecuteRequest执行器请求等;client模块依赖core模块,并封装和socket客户端调用相关的一些内容。
然后创建一个sample1模块,用于演示。
image.png

core模块

image.png
core目前定义了定时任务执行类和其入参、出参等信息,其中,YangJobTransferDTO包含任务类路径和任务请求,如下所示:

package com.yang.job.dto;import com.yang.job.execute.YangJobExecuteRequest;import java.io.Serializable;public class YangJobTransferDTO implements Serializable {private String className;private YangJobExecuteRequest yangJobExecuteRequest;public String getClassName() {return className;}public void setClassName(String className) {this.className = className;}public YangJobExecuteRequest getYangJobExecuteRequest() {return yangJobExecuteRequest;}public void setYangJobExecuteRequest(YangJobExecuteRequest yangJobExecuteRequest) {this.yangJobExecuteRequest = yangJobExecuteRequest;}
}
client模块

image.png
client模块定义了客户端所需要的一些类,其中,YangJob为注解类,对于每一个定时任务,需要加上YangJob注解,才能被正确调用。

package com.yang.job.client.annotations;import java.lang.annotation.*;@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface YangJob {
}

YangJobClientProperty为配置信息类,目前需要两个配置信息,客户端socket的ip和端口号

package com.yang.job.client.configuration;import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;@Component
public class YangJobClientProperty {@Value("${yang-job.executor.port}")private Integer port;@Value("${yang-job.executor.ip}")private String ip;public Integer getPort() {return port;}public void setPort(Integer port) {this.port = port;}public String getIp() {return ip;}public void setIp(String ip) {this.ip = ip;}
}

YangJobClientPostProcessor在SpringBoot加载完毕后,扫描bean,将实现IYongJobExecutor的bean,注册到YangJobClientManager的map中,方便后续调用

package com.yang.job.client.schema;import com.yang.job.client.annotations.YangJob;
import com.yang.job.client.YangJobClientManager;
import com.yang.job.execute.IYangJobExecutor;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;public class YangJobClientPostProcessor implements BeanPostProcessor {public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {if (!(bean instanceof IYangJobExecutor)) {return bean;}YangJob annotation = bean.getClass().getAnnotation(YangJob.class);if (annotation == null) {return bean;}YangJobClientManager.putJobExecutor(bean.getClass().getName(), (IYangJobExecutor) bean);return bean;}
}

YangJobClientManager负责监听端口和管理定时任务的执行,它会监听我们配置的yang-job.execute.port端口号,然后当接收到消息时,将消息转为入参,并取出对应的定时任务执行类,执行对应的代码。

package com.yang.job.client;import com.alibaba.fastjson.JSONObject;
import com.yang.job.client.dto.YangJobClientPropertyDTO;
import com.yang.job.core.dto.YangJobTransferDTO;
import com.yang.job.core.dto.ResultT;
import com.yang.job.core.execute.IYangJobExecutor;
import com.yang.job.core.execute.YangJobExecuteRequest;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;public class YangJobClientManager {private static Map<String, IYangJobExecutor> className2JobExecutorMap = new ConcurrentHashMap<>();private YangJobClientPropertyDTO yangJobClientPropertyDTO;private ServerSocket serverSocket;public YangJobClientManager(YangJobClientPropertyDTO yangJobClientPropertyDTO) {this.yangJobClientPropertyDTO = yangJobClientPropertyDTO;}public void init() {Integer port = this.yangJobClientPropertyDTO.getPort();try {this.serverSocket = new ServerSocket(port);} catch (IOException e) {throw new RuntimeException(e);}System.out.println("init success============");new Thread(() -> {while (true) {try {Socket socket = serverSocket.accept();BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));PrintWriter printWriter = new PrintWriter(socket.getOutputStream(), true);String params = bufferedReader.readLine();YangJobTransferDTO yangJobTransferDTO = JSONObject.parseObject(params, YangJobTransferDTO.class);System.out.println(yangJobTransferDTO);String className = yangJobTransferDTO.getClassName();YangJobExecuteRequest yangJobExecuteRequest = yangJobTransferDTO.getYangJobExecuteRequest();IYangJobExecutor jobExecutor = getJobExecutor(className);if (jobExecutor != null) {ResultT response = jobExecutor.execute(yangJobExecuteRequest);printWriter.println(JSONObject.toJSONString(response));}bufferedReader.close();printWriter.close();socket.close();} catch (IOException e) {e.printStackTrace();}if (serverSocket.isClosed() || serverSocket == null) {break;}}}).start();}public void shutdown() {if (this.serverSocket != null) {try {this.serverSocket.close();} catch (IOException e) {throw new RuntimeException(e);}}}public YangJobClientPropertyDTO getYangJobPropertyDTO() {return this.yangJobClientPropertyDTO;}public static void putJobExecutor(String className, IYangJobExecutor iJobExecutor) {className2JobExecutorMap.put(className, iJobExecutor);}public static IYangJobExecutor getJobExecutor(String className) {return className2JobExecutorMap.get(className);}}

YangJobClientContext为客户端的上下文,负责监听SpringBoot刷新消息和关闭消息,并执行对应的操作。

package com.yang.job.client;import com.yang.job.client.utils.SpringContextUtils;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ApplicationContextEvent;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.ContextRefreshedEvent;public class YangJobClientContext implements ApplicationListener<ApplicationContextEvent> {private static YangJobClientContext instance;private ApplicationContext applicationContext;@Overridepublic void onApplicationEvent(ApplicationContextEvent event) {if (event instanceof ContextRefreshedEvent) {System.out.println("刷新了=========");YangJobClientContext.instance = this;instance.applicationContext = applicationContext;init();} else if (event instanceof ContextClosedEvent) {System.out.println("销毁了=========");shutdown();}}private void init() {YangJobClientManager yangJobClientManager = SpringContextUtils.getBeanOfType(YangJobClientManager.class);yangJobClientManager.init();}private void shutdown() {YangJobClientManager yangJobClientManager = SpringContextUtils.getBeanOfType(YangJobClientManager.class);yangJobClientManager.shutdown();}
}

YangJobClientConfiguration为配置类,负责对YangJobClientPostProcessor、YangJobClientManager和YangJobClientContext的统一配置管理。

package com.yang.job.client.configuration;import com.yang.job.client.YangJobClientManager;
import com.yang.job.client.YangJobClientContext;
import com.yang.job.client.dto.YangJobClientPropertyDTO;
import com.yang.job.client.schema.YangJobClientPostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class YangJobClientConfiguration {@Autowiredprivate YangJobClientProperty yangJobClientProperty;@Beanpublic YangJobClientPostProcessor yangJobPostProcessor() {return new YangJobClientPostProcessor();}@Beanpublic YangJobClientManager yangJobClientManager() {YangJobClientPropertyDTO yangJobClientPropertyDTO = new YangJobClientPropertyDTO();yangJobClientPropertyDTO.setIp(yangJobClientProperty.getIp());yangJobClientPropertyDTO.setPort(yangJobClientProperty.getPort());return new YangJobClientManager(yangJobClientPropertyDTO);}@Beanpublic YangJobClientContext yangJobContext() {return new YangJobClientContext();}
}

最后,为了使引入client依赖的应用,能自动装配我们提供的bean,我们在resources目录下创建META-INF目录,在该目录下创建spring.factories文件,文件内容如下:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\com.yang.job.client.utils.SpringContextUtils,\com.yang.job.client.configuration.YangJobClientProperty,\com.yang.job.client.configuration.YangJobClientConfiguration
sample1

我们创建一个sample1项目,引入spring-boot-starter-web依赖和yang-client,yang-core的依赖

  <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.yang</groupId><artifactId>yang-job-core</artifactId><version>1.0-SNAPSHOT</version></dependency><dependency><groupId>com.yang</groupId><artifactId>yang-job-client</artifactId><version>1.0-SNAPSHOT</version></dependency></dependencies>

创建启动类

package com.yang.job.sample1;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class YangJobSample1App {public static void main(String[] args) {SpringApplication.run(YangJobSample1App.class, args);}
}

创建一个任务类:

package com.yang.job.sample1.task;import com.yang.job.client.annotations.YangJob;
import com.yang.job.dto.ResultT;
import com.yang.job.execute.IYangJobExecutor;
import com.yang.job.execute.YangJobExecuteRequest;
import org.springframework.stereotype.Component;@Component
@YangJob
public class TestTask1 implements IYangJobExecutor {@Overridepublic ResultT execute(YangJobExecuteRequest yangJobExecuteRequest) {System.out.println("开启定时任务了,入参为:" + yangJobExecuteRequest);return ResultT.success();}
}

添加配置文件,因为client模块的YangJobClientProperty需要有yang-job.executor.port和yang-job.executor.ip这两个配置,如果我们的配置文件中,缺少这些配置,会导致报错,无法启动项目。

spring:application:name: YangJobSample1App
yang-job:executor:port: 9999ip: 127.0.0.1
server:port: 8001
测试

我们先启动刚才的sample1项目,然后执行下列代码,来远程调用TestTask1方法执行类。

 public static void main(String[] args) {try {Socket socket = new Socket("127.0.0.1", 9999);System.out.println("链接成功=============");PrintWriter printWriter = new PrintWriter(socket.getOutputStream(), true);BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));YangJobExecuteRequest yangJobExecuteRequest = new YangJobExecuteRequest();yangJobExecuteRequest.setJobId("1");yangJobExecuteRequest.addParam("num", "1");YangJobTransferDTO yangJobTransferDTO = new YangJobTransferDTO();yangJobTransferDTO.setClassName("com.yang.job.sample1.task.TestTask1");yangJobTransferDTO.setYangJobExecuteRequest(yangJobExecuteRequest);printWriter.println(JSONObject.toJSONString(yangJobTransferDTO));System.out.println("response:" + bufferedReader.readLine());bufferedReader.close();printWriter.close();socket.close();} catch (IOException e) {e.printStackTrace();}}

执行结果如下,说明我们能成功地进行远程调用。
image.png
image.png

添加远程任务

domain层

在上一篇文章中,我们操作的任务,都是本地任务,现在我们需要对远程任务进行操作,为了区分任务类型,我们首先在domain层添加一个任务类型枚举

package com.yang.job.admin.domain.enums;public enum JobTypeEnum {LOCAL("local", "本地任务"),REMOTE("remote", "远程任务");private String name;private String description;JobTypeEnum(String name, String description) {this.name = name;this.description = description;}public String getName() {return name;}public String getDescription() {return description;}public static JobTypeEnum getJobTypeByName(String name) {for (JobTypeEnum value : values()) {if (value.getName().equals(name)) {return value;}}return null;}
}

然后修改YangJobModel,添加上任务类型枚举和远程任务信息

package com.yang.job.admin.domain.model;import com.yang.job.admin.client.dto.common.BusinessException;
import com.yang.job.admin.client.dto.common.ErrorCode;
import com.yang.job.admin.domain.enums.JobExecuteStrategyEnum;
import com.yang.job.admin.domain.enums.JobTypeEnum;
import com.yang.job.admin.domain.event.SaveJobPostEvent;
import com.yang.job.admin.domain.event.SubmitJobPostEvent;
import com.yang.job.admin.domain.event.UpdateJobPostEvent;
import com.yang.job.admin.domain.valueobject.RemoteExecutorMessage;
import com.yang.job.admin.infra.event.EventCenter;
import com.yang.job.admin.infra.job.YangJobManager;
import com.yang.job.admin.infra.job.request.YangJobSubmitParam;
import com.yang.job.admin.infra.utils.CronUtils;
import com.yang.job.admin.infra.utils.SpringContextUtils;
import lombok.Data;import java.io.Serializable;
import java.time.ZonedDateTime;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;@Data
public class YangJobModel implements Serializable {private Integer jobId;private String jobName;private String description;private String cron;private String executeClassPath;private Runnable runnable;private JobExecuteStrategyEnum executeStrategy;private JobTypeEnum jobType;private RemoteExecutorMessage remoteExecutorMessage;private Integer enable;private Integer open;private Date createTime;private Date updateTime;private Map<String, String> featureMap = new HashMap<>();private Map<String, String> executeParamMap = new HashMap<>();public boolean isEnable() {if (this.enable == null) {return false;}return this.enable == 1;}public boolean isOpen() {if (!isEnable()) {return false;}if (this.open == null) {return false;}return this.open == 1;}public boolean isClose() {return !isOpen();}public boolean isLocalJob() {return JobTypeEnum.LOCAL == this.jobType;}public boolean isRemoteJob() {return JobTypeEnum.REMOTE == this.jobType;}public void setExecuteStrategy(JobExecuteStrategyEnum jobExecuteStrategyEnum) {if (jobExecuteStrategyEnum == null) {throw new BusinessException(ErrorCode.EXECUTE_STRATEGY_NO_EXIST);}this.executeStrategy = jobExecuteStrategyEnum;}public void submitJob() {YangJobSubmitParam yangJobSubmitParam = convert2YangJobSubmitParam();YangJobManager yangJobManager = getYangJobManager();yangJobManager.submitJob(yangJobSubmitParam);// 提交任务后,发送提交任务后置事件SubmitJobPostEvent submitJobPostEvent = new SubmitJobPostEvent(yangJobSubmitParam);getEventCenter().postEvent(submitJobPostEvent);}public void cancelJob() {YangJobManager yangJobManager = getYangJobManager();yangJobManager.cancelJob(this.jobId);}private YangJobSubmitParam convert2YangJobSubmitParam() {YangJobSubmitParam yangJobBuildParam = new YangJobSubmitParam();yangJobBuildParam.setJobId(this.jobId);yangJobBuildParam.setRunnable(this.runnable);ZonedDateTime nextExecutionTime = CronUtils.nextExecutionTime(this.cron, ZonedDateTime.now());ZonedDateTime nextNextExecutionTime = CronUtils.nextExecutionTime(this.cron, nextExecutionTime);long nowEochMill = ZonedDateTime.now().toInstant().toEpochMilli();long executeEochMill = nextExecutionTime.toInstant().toEpochMilli();long secondExecuteEochMill = nextNextExecutionTime.toInstant().toEpochMilli();yangJobBuildParam.setInitialDelay((int)(executeEochMill - nowEochMill) / 1000);yangJobBuildParam.setPeriod((int)(secondExecuteEochMill - executeEochMill) / 1000);yangJobBuildParam.setJobExecuteStrategy(this.executeStrategy);return yangJobBuildParam;}public void postSaveJobEvent() {SaveJobPostEvent saveJobPostEvent = new SaveJobPostEvent(this.jobId);getEventCenter().asyncPostEvent(saveJobPostEvent);}public void postUpdateJobEvent() {UpdateJobPostEvent updateJobPostEvent = new UpdateJobPostEvent(this.jobId);getEventCenter().asyncPostEvent(updateJobPostEvent);}public void postDeleteJobEvent() {UpdateJobPostEvent updateJobPostEvent = new UpdateJobPostEvent(this.jobId);getEventCenter().asyncPostEvent(updateJobPostEvent);}private YangJobManager getYangJobManager() {return SpringContextUtils.getBeanOfType(YangJobManager.class);}private EventCenter getEventCenter() {return SpringContextUtils.getBeanOfType(EventCenter.class);}}

远程任务信息类:

package com.yang.job.admin.domain.valueobject;import lombok.Data;import java.io.Serializable;@Data
public class RemoteExecutorMessage implements Serializable {private String ip;private Integer port;
}

接着我们添加一个features枚举,用于记录映射features字段中各个key表示的含义,因为我们现在表的设计中没有任务类型字段和远程信息相关的字段,所以会将这些信息添加到features字段中

package com.yang.job.admin.domain.enums;public enum JobModelFeatureEnum {JOB_TYPE("jobType", "任务类型"),REMOTE_EXECUTOR_IP("executorIp", "执行器ip"),REMOTE_EXECUTOR_PORT("executorPort", "执行器端口"),REMOTE_EXECUTOR_MESSAGE("r_executor_m", "远程执行器的信息");private String name;private String description;JobModelFeatureEnum(String name, String description) {this.name = name;this.description = description;}public String getName() {return name;}public String getDescription() {return description;}
}
client层

我们修改原先的NewYangJobCommand类,加上任务类型属性

package com.yang.job.admin.client.dto.command;import lombok.Data;import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;@Data
public class NewYangJobCommand implements Serializable {private String jobName;private String description;private String cron;private String executeStrategy;private String jobType;private String executeClassPath;private Integer open;private Map<String, String> params = new HashMap<>();
}

然后修改YangJobDTO类,也加上jobType属性

package com.yang.job.admin.client.dto;import lombok.Data;import java.io.Serializable;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;@Data
public class YangJobDTO implements Serializable {private Integer jobId;private String jobName;private String description;private String cron;private String executeStrategy;private String executeClassPath;private String jobType;private Integer enable;private Integer open;private Date createTime;private Date updateTime;private Map<String, String> featureMap = new HashMap<>();private Map<String, String> executeParamMap = new HashMap<>();
}
application层

接着修改YangJobApplicationService类的convertYangJobModel方法,将jobType任务类型和远程任务信息添加到YangJobModel中

 private YangJobModel convert2YangJobModel(NewYangJobCommand newYangJobCommand) {String jobType = newYangJobCommand.getJobType();JobTypeEnum jobTypeEnum = JobTypeEnum.getJobTypeByName(jobType);if (jobType == null) {throw new BusinessException(ErrorCode.PARAM_VALID_ERROR);}YangJobModel yangJobModel = new YangJobModel();yangJobModel.setJobName(newYangJobCommand.getJobName());yangJobModel.setDescription(newYangJobCommand.getDescription());yangJobModel.setCron(newYangJobCommand.getCron());yangJobModel.setOpen(newYangJobCommand.getOpen());yangJobModel.setExecuteStrategy(JobExecuteStrategyEnum.getJobExecuteStrategyByName(newYangJobCommand.getExecuteStrategy()));yangJobModel.setExecuteClassPath(newYangJobCommand.getExecuteClassPath());yangJobModel.setExecuteParamMap(newYangJobCommand.getParams());yangJobModel.setJobType(jobTypeEnum);if (jobTypeEnum == JobTypeEnum.REMOTE) {String ip = newYangJobCommand.getParams().get(JobModelFeatureEnum.REMOTE_EXECUTOR_IP.getName());String port = newYangJobCommand.getParams().get(JobModelFeatureEnum.REMOTE_EXECUTOR_PORT.getName());if (ip == null || port == null) {throw new BusinessException(ErrorCode.PARAM_VALID_ERROR);}RemoteExecutorMessage remoteExecutorMessage = new RemoteExecutorMessage();remoteExecutorMessage.setIp(ip);remoteExecutorMessage.setPort(Integer.valueOf(port));yangJobModel.setRemoteExecutorMessage(remoteExecutorMessage);} else {if (yangJobModel.getExecuteClassPath() == null || yangJobModel.getExecuteClassPath().isEmpty()) {throw new BusinessException(ErrorCode.UN_LEGAL_CLASS_PATH);}try {Class.forName(yangJobModel.getExecuteClassPath());} catch (ClassNotFoundException e) {e.printStackTrace();throw new BusinessException(ErrorCode.UN_LEGAL_CLASS_PATH);}}return yangJobModel;}
infra层

最后修改基础设施层,首先修改YangJobModelConvertor类,将RemoteMessage和JobType转化到features中,以及从features中取出

package com.yang.job.admin.infra.gatewayimpl.repository.convertor;import com.alibaba.fastjson.JSONObject;
import com.yang.job.admin.domain.enums.JobExecuteStrategyEnum;
import com.yang.job.admin.domain.enums.JobModelFeatureEnum;
import com.yang.job.admin.domain.enums.JobTypeEnum;
import com.yang.job.admin.domain.model.YangJobModel;
import com.yang.job.admin.domain.valueobject.RemoteExecutorMessage;
import com.yang.job.admin.infra.data.YangJobData;
import com.yang.job.admin.infra.job.thread.RemoteJobExecuteThread;
import com.yang.job.admin.infra.utils.FeaturesUtils;
import com.yang.job.core.dto.YangJobTransferDTO;
import com.yang.job.core.execute.IYangJobExecutor;
import com.yang.job.core.execute.YangJobExecuteRequest;
import org.springframework.stereotype.Component;import java.util.Map;@Component
public class YangJobModelConvertor {public YangJobData convert2Data(YangJobModel yangJobModel) {if (yangJobModel == null) {return null;}YangJobData yangJobData = new YangJobData();yangJobData.setJobId(yangJobModel.getJobId());yangJobData.setJobName(yangJobModel.getJobName());yangJobData.setDescription(yangJobModel.getDescription());yangJobData.setCron(yangJobModel.getCron());yangJobData.setExecuteClassPath(yangJobModel.getExecuteClassPath());yangJobData.setEnable(yangJobModel.getEnable());yangJobData.setOpen(yangJobModel.getOpen());yangJobData.setCreateTime(yangJobModel.getCreateTime());yangJobData.setUpdateTime(yangJobModel.getUpdateTime());Map<String, String> featureMap = yangJobModel.getFeatureMap();featureMap.put(JobModelFeatureEnum.JOB_TYPE.getName(), yangJobModel.getJobType().getName());featureMap.put(JobModelFeatureEnum.REMOTE_EXECUTOR_MESSAGE.getName(), JSONObject.toJSONString(yangJobModel.getRemoteExecutorMessage()));yangJobData.setFeatures(FeaturesUtils.convert2Features(featureMap));yangJobData.setExecuteParams(FeaturesUtils.convert2Features(yangJobModel.getExecuteParamMap()));yangJobData.setExecuteStrategy(yangJobModel.getExecuteStrategy().getName());return yangJobData;}public YangJobModel convert2Model(YangJobData yangJobData) {if (yangJobData == null) {return null;}YangJobModel yangJobModel = new YangJobModel();yangJobModel.setJobId(yangJobData.getJobId());yangJobModel.setDescription(yangJobData.getDescription());yangJobModel.setCron(yangJobData.getCron());yangJobModel.setJobName(yangJobData.getJobName());yangJobModel.setExecuteClassPath(yangJobData.getExecuteClassPath());yangJobModel.setEnable(yangJobData.getEnable());yangJobModel.setOpen(yangJobData.getOpen());yangJobModel.setCreateTime(yangJobData.getCreateTime());yangJobModel.setUpdateTime(yangJobData.getUpdateTime());yangJobModel.setFeatureMap(FeaturesUtils.convert2FeatureMap(yangJobData.getFeatures()));yangJobModel.setExecuteParamMap(FeaturesUtils.convert2FeatureMap(yangJobData.getExecuteParams()));JobExecuteStrategyEnum executeStrategy = JobExecuteStrategyEnum.getJobExecuteStrategyByName(yangJobData.getExecuteStrategy());if (executeStrategy == null) {throw new RuntimeException("执行策略有误!");}JobTypeEnum jobType = JobTypeEnum.getJobTypeByName(yangJobModel.getFeatureMap().get(JobModelFeatureEnum.JOB_TYPE.getName()));yangJobModel.setJobType(jobType);String remoteMessageStr = yangJobModel.getFeatureMap().get(JobModelFeatureEnum.REMOTE_EXECUTOR_MESSAGE.getName());RemoteExecutorMessage remoteExecutorMessage = JSONObject.parseObject(remoteMessageStr, RemoteExecutorMessage.class);yangJobModel.setRemoteExecutorMessage(remoteExecutorMessage);yangJobModel.setExecuteStrategy(executeStrategy);yangJobModel.setRunnable(buildRunnable(yangJobModel));return yangJobModel;}private Runnable buildRunnable(YangJobModel yangJobModel) {if (yangJobModel.isLocalJob()) {String executeClassPath = yangJobModel.getExecuteClassPath();try {Class<?> aClass = Class.forName(executeClassPath);if (!IYangJobExecutor.class.isAssignableFrom(aClass)) {throw new RuntimeException("该类必须实现IYangJobExecutor接口");}IYangJobExecutor executor = (IYangJobExecutor) aClass.newInstance();YangJobExecuteRequest yangJobExecuteRequest = convert2YangJobExecuteRequest(yangJobModel);Runnable runnable = () -> executor.execute(yangJobExecuteRequest);return runnable;} catch (InstantiationException | IllegalAccessException e) {e.printStackTrace();} catch (ClassNotFoundException e) {System.out.println(String.format("%s 类路径对应的类不存在", executeClassPath));e.printStackTrace();}} else {RemoteExecutorMessage remoteExecutorMessage = yangJobModel.getRemoteExecutorMessage();String executeClassPath = yangJobModel.getExecuteClassPath();YangJobTransferDTO yangJobTransferDTO = new YangJobTransferDTO();yangJobTransferDTO.setClassName(executeClassPath);YangJobExecuteRequest yangJobExecuteRequest = convert2YangJobExecuteRequest(yangJobModel);yangJobTransferDTO.setYangJobExecuteRequest(yangJobExecuteRequest);return new RemoteJobExecuteThread(remoteExecutorMessage, yangJobTransferDTO);}return null;}private static YangJobExecuteRequest convert2YangJobExecuteRequest(YangJobModel yangJobModel) {YangJobExecuteRequest yangJobExecuteRequest = new YangJobExecuteRequest();yangJobExecuteRequest.setJobId(yangJobModel.getJobId().toString());yangJobExecuteRequest.setParams(yangJobModel.getExecuteParamMap());return yangJobExecuteRequest;}
}

然后添加一个RemoteJobExecuteThread类,该类实现runnable接口,当我们的任务类型为远程调用时,其YangJobModel的runnable属性为remoteJobExecuteThread类

package com.yang.job.admin.infra.job.thread;import com.alibaba.fastjson.JSONObject;
import com.yang.job.admin.domain.valueobject.RemoteExecutorMessage;
import com.yang.job.core.dto.YangJobTransferDTO;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.UnknownHostException;public class RemoteJobExecuteThread implements Runnable {private YangJobTransferDTO yangJobTransferDTO;private RemoteExecutorMessage remoteExecutorMessage;public RemoteJobExecuteThread(RemoteExecutorMessage remoteExecutorMessage, YangJobTransferDTO yangJobTransferDTO) {this.remoteExecutorMessage = remoteExecutorMessage;this.yangJobTransferDTO = yangJobTransferDTO;}@Overridepublic void run() {try {String ip = remoteExecutorMessage.getIp();Integer port = remoteExecutorMessage.getPort();Socket socket = new Socket(ip, port);PrintWriter printWriter = new PrintWriter(socket.getOutputStream(), true);BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));printWriter.println(JSONObject.toJSONString(yangJobTransferDTO));bufferedReader.close();printWriter.close();socket.close();} catch (UnknownHostException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();}}
}
测试

我们先启动之前的sample1项目,然后启动yang-job-admin,调用http://localhost:8080/job添加任务,请求体如下:

{"jobName": "RemoteJobExecutor",
"description":"RemoteJobExecutor","cron": "0/10 * * * * ?","executeStrategy": "withFixedDelay","executeClassPath": "com.yang.job.sample1.task.TestTask1",
"open":1,
"jobType":"remote",
"params":{
"executorIp":"127.0.0.1",
"executorPort":"9999"
}
}

image.png
添加成功后,我们查看Sample1项目的控制台,可以看到,每10秒,这个TestTask1任务会被调用一次
image.png

参考文章

https://www.yihuo.tech/programming/server-stack/exploring-the-java-network-programming-paradigm-socket-udp-nio-and-netty-in-focus/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/327946.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据中台管理系统原型

数据中台是一个通用性的基础平台&#xff0c;适用于各类行业场景&#xff0c;数据中台包含多元数据汇聚、数据标准化、数据开发、数据共享、数据智能、数据资产管理等功能&#xff0c;助力企业数字化转型。 数据汇聚 数据汇聚是将不同系统、不同类型的多元源数据汇聚至目标数据…

20.接口自动化-Git

1、Git和SVN–版本控制系统 远程服务出问题后&#xff0c;可以先提交commit到本地仓库&#xff0c;之后再提交push远程仓库 git有clone Git环境组成部分 常用Git代码仓库服务-远程仓库 GitHub-服务器在国外&#xff0c;慢 GitLab-开源&#xff0c;可以在自己服务器搭建&…

真JAVA代码审计之XSS漏洞

Part1 漏洞案例demo&#xff1a; 没有java代码审计XSS漏洞拿赏金的案例。 所以将就看看demo吧 漏洞原理&#xff1a;关于XSS漏洞的漏洞原理核心其实没啥好说的&#xff0c;网上一查一大堆。 反射性XSS漏洞 <% page language"java" contentType"text/ht…

2. 感知机算法和简单 Python 实现

目录 1. 感知机介绍 1.1 背景 1.2 定义 1.2.1 权重 1.2.2 阈值 1.2.3 偏置 1.3 逻辑处理&#xff1a;与门、非门、或门 2. 感知机实现 2.1 与门的 Python 实现 2.2 非门的 Python 实现 2.3 或门的 Python 实现 1. 感知机介绍 1.1 背景 感知机1957年由 Rosenblatt 提…

【全开源】JAVA国际版多语言语聊大厅语音聊天APP系统源码

国际版多语言语聊大厅语音聊天APP系统&#xff1a;跨越语言的界限&#xff0c;连接世界的声音 在全球化日益加速的今天&#xff0c;语言不再是沟通的障碍。我们很高兴地宣布&#xff0c;全新的“国际版多语言语聊大厅语音聊天APP系统”已经正式上线&#xff0c;旨在为全球用户…

【千帆AppBuidler】零代码构建AI人工智能应用,全网都在喊话歌手谁能应战,一键AI制作歌手信息查询应用

欢迎来到《小5讲堂》 这是《千帆平台》系列文章&#xff0c;每篇文章将以博主理解的角度展开讲解。 温馨提示&#xff1a;博主能力有限&#xff0c;理解水平有限&#xff0c;若有不对之处望指正&#xff01; 目录 背景创建应用平台地址随机生成快速创建应用头像应用名称应用描述…

【基于element ui的color选择器】基于element ui的color选择器

技术版本如下&#xff1a; vue 2.6.14 less 3.13.1 element-ui 2.15.6 less-loader 5.0.0需求&#xff1a; 支持RGB、HEX编码、支持吸管吸取颜色、颜色选择器、颜色模板、透明度、色板、线性渐变颜色 效果图&#xff1a; 1.引入选择器的color-all文件 <template><…

[vue] nvm

nvm ls // 看安装的所有node.js的版本nvm list available // 查显示可以安装的所有node.js的版本可以在可选列表里。选择任意版本安装&#xff0c;比如安装16.15.0 执行&#xff1a; nvm install 16.15.0安装好了之后。可以执行&#xff1a; …

鸿蒙内核源码分析 (内核启动篇) | 从汇编到 main ()

这应该是系列篇最难写的一篇&#xff0c;全是汇编代码&#xff0c;需大量的底层知识&#xff0c;涉及协处理器&#xff0c;内核镜像重定位&#xff0c;创建内核映射表&#xff0c;初始化 CPU 模式栈&#xff0c;热启动&#xff0c;到最后熟悉的 main() 。 内核入口 在链接文件…

FreeRTOS开发一、FreeRTOS移植

1、FreeRTOS 源码下载 两个下载链接&#xff0c; 一个是官网&#xff1a;http://www.freertos.org/&#xff0c; 另外一个是代码托管网站&#xff1a;https://sourceforge.net/projects/freertos/files/FreeRTOS/ 打开代码托管网站链接&#xff0c;我们选择FreeRTOS 的版本 V9…

全域运营平台是什么?优缺点有哪些?

当下&#xff0c;全域运营赛道逐渐兴盛&#xff0c;全域运营服务商的数量也开始呈现爆发趋势。在此背景下&#xff0c;很多人都对某些品牌的全域运营平台优缺点产生了浓厚的兴趣。由于小编只使用过微火全域运营平台&#xff0c;因此&#xff0c;本期会着重分析微火运营平台的优…

【软考】设计模式之桥接模式

目录 1. 说明2. 应用场景3. 结构图4. 构成5. 适用性6. 优点7. 缺点8. java示例 1. 说明 1.将抽象部分与其实现部分分离&#xff0c;使它们都可以独立地变化。2.桥接模式&#xff08;Bridge Pattern&#xff09;属于对象结构型模式&#xff0c;又称为柄体&#xff08;Handle an…

Leetcode2105. 给植物浇水 II

Every day a Leetcode 题目来源&#xff1a;2105. 给植物浇水 II 解法1&#xff1a;双指针 设 Alice 当前下标为 i&#xff0c;初始化为 0&#xff0c;水量为 a&#xff0c;初始化为 capacityA&#xff1b;Bob 当前下标为 j&#xff0c;初始化为 n-1&#xff0c;水量为 b&am…

力扣98.验证二叉搜索树

法一&#xff08;自己思路&#xff0c;复杂了&#xff09;&#xff1a; from collections import dequeclass Solution(object):def isValidBST(self, root):""":type root: TreeNode:rtype: bool"""queue deque()if root.left!None:queue.app…

FOSS全闪对象存储--与AI/ML相向而行

行业解读需求剖析 目前&#xff0c;随着AI/ML技术得到了快速的发展及应用&#xff0c;AI/ML系统对底层高速数据访问的需求也日趋强烈&#xff0c;虽然当前业界有多种解决方案&#xff0c;但都存在一些成本或性能方面的挑战&#xff0c;就目前常用的文件存储系统来说&#xff0…

win10共享文件夹到ubuntu22

win10共享文件夹 新建用户 新建用户、设置密码。避免共享给EveryOne&#xff0c;导致隐私问题。 点击左下角的开始菜单&#xff0c;选择“设置”&#xff08;WinI&#xff09;打开设置窗口。在设置窗口中&#xff0c;搜索或直接点击“账户”进入账户设置。在账户设置中&…

机器学习中常用的几种距离——欧式、余弦等

目录 一、欧式距离&#xff08;L2距离&#xff09;二、曼哈顿距离&#xff08;L1距离&#xff09;三、汉明距离四、余弦相似度 一、欧式距离&#xff08;L2距离&#xff09; &#xff08;1&#xff09;二维空间的距离公式&#xff08;三维空间的在这个基础上类推&#xff09;&…

Windows本地部署直播录屏利器Bililive-go并实现远程添加直播间录屏

&#x1f308;个人主页: Aileen_0v0 &#x1f525;热门专栏: 华为鸿蒙系统学习|计算机网络|数据结构与算法 ​&#x1f4ab;个人格言:“没有罗马,那就自己创造罗马~” 文章目录 1. Bililive-go与套件下载1.1 获取ffmpeg1.2 获取Bililive-go1.3 配置套件 2. 本地运行测试3. 录屏…

基于单片机的光照检测系统—光敏电阻

基于单片机的光照检测系统 &#xff08;仿真&#xff0b;程序&#xff0b;原理图&#xff0b;设计报告&#xff09; 功能介绍 具体功能&#xff1a; 1.光敏电阻实时采集环境光照值&#xff1b; 2.采用ADC0804将模拟值转换为数字量&#xff1b; 3.四位数码管显示当前的光照…

Java JDK下载安装教程(2024年)

博主介绍&#xff1a;✌Java老徐、7年大厂程序员经历。全网粉丝12w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;&…