前言:
在企业项目中,往往有定时任务发布的需求,比如每天晚9点将今日数据备份一次,或每月一号将上月的销售数据邮件发送给对应的工作人员。显然这些操作不可能是人工到时间点调用一次接口,需要编写专门的模块完成任务的调度。
Quartz 是一个强大的 Java 作业调度库,支持持久化和集群模式,适合需要高可用性和负载均衡的应用场景。其使用方法相对简单。只需要实现Job接口定义调度任务,通过JobDetail传入调度所需数据,并定义触发器,即可通过Scheduler类完成任务调度,其中每个任务对应一组Trigger和JobDetail类。下为每5秒调用一次DateJob类的execute方法,共调用6次的示例。
public class DateJob implements Job {@Overridepublic void execute(JobExecutionContext arg0) throws JobExecutionException {System.out.println(new Date());}}
//从标准SchedulerFactory获取默认scheduler
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
JobDetail job = JobBuilder.newJob(DateJob.class).withIdentity("job1", "group1").build();
Trigger trigger1 = TriggerBuilder.newTrigger().withIdentity("trigger1", "group1").startNow().withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(5).withRepeatCount(5)).build();scheduler.scheduleJob(job, trigger1);scheduler.start();
但如果有多个定时任务时就需要定义多个实现类,且无法将任务状态维护在数据库中,针对上述问题,本文实现一种基于Quartz的定时任务模块,只需要将定时任务所需要的参数定义在数据库中,模块可从数据库中生成对应的定时任务并进行维护。本文将从易到难逐步实践搭建完整模块并验证各部分代码内容。具体实现类图如下,其中虚线框部分为Quartz自带库。
上述类图为模块简化版本,实际还能引入装饰器模式、事件订阅和发布、AOP处理异常等进一步优化,本章将详细讲述如何从零搭建一个Quartz模块,并在后续通过一或两章的内容补充上述功能,并将模块整合至之前的微服务项目Base中,链接如下。
从零搭建微服务项目Base(第0章——微服务项目结构搭建)_从0创建微服务项目-CSDN博客https://blog.csdn.net/wlf2030/article/details/145206361本章项目源码如下:
wlf728050719/SpringCloudPro1-1https://github.com/wlf728050719/SpringCloudPro1-1以及本专栏会持续更新微服务项目,每一章的项目都会基于前一章项目进行功能的完善,欢迎小伙伴们关注!同时如果只是对单章感兴趣也不用从头看,只需下载前一章项目即可,每一章都会有前置项目准备部分,跟着操作就能实现上一章的最终效果,当然如果是一直跟着做可以直接跳过这一部分。其中Base篇为基础微服务搭建,Pro篇为复杂模块实现。
一、项目创建
1.Spring Boot项目
2.添加Quartz依赖(也可不加,后续可直接替换pom文件导入依赖)
3.删去无用文件以及resources下application.properties,创建application.yml内容如下:
(记得替换成自己的数据库,并测试连接成功,否则启动服务时会报找不到数据源)
server:port: 6699
spring:datasource:url: jdbc:mysql://localhost:3306/db_quartz?useSSL=falseusername: rootpassword: 15947035212driver-class-name: com.mysql.jdbc.Driverapplication:name: quartz
mybatis:mapper-locations: classpath:mapper/*Mapper.xmltype-aliases-package: cn.bit.pro1_1.core.mapper
4.pom内容替换为下面内容:(尽量保证依赖版本相同,特别是mybatis版本与spring版本一致)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.9.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>cn.bit</groupId><artifactId>Pro1_1</artifactId><version>0.0.1-SNAPSHOT</version><name>Pro1_1</name><description>Pro1_1</description><dependencies><!-- Web依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.3.9.RELEASE</version></dependency><!-- Quartz --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-quartz</artifactId></dependency><!-- Mysql驱动 --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.47</version></dependency><!--Mybatis--><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>2.1.1</version></dependency><!-- Lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><scope>provided</scope><version>1.18.36</version><optional>true</optional></dependency><!--test --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
创建包结构如下,最外层中controller为quartz模块对外接口,core为核心,service中定义类为定时任务真正执行的业务类,core中enums定义任务执行的结果、任务类型、任务状态枚举,exception定义任务执行的异常类,handler定义java,springbean的任务处理类以及接口和工厂类。mapper和service主要服务操纵数据库任务数据,util定义工具类。
二、Scheduler入门demo
service包下创建DateService类,controller创建QuartzController类。
DateService实现job接口,执行方法为输出当前时间。QuartzController定义一个对外接口,测试quratz的简单使用。
package cn.bit.pro1_1.service;import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;import java.util.Date;public class DateService implements Job {@Overridepublic void execute(JobExecutionContext arg0) throws JobExecutionException {System.out.println(new Date());}}
package cn.bit.pro1_1.controller;import cn.bit.pro1_1.service.DateService;
import lombok.AllArgsConstructor;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/quartz")
@AllArgsConstructor
public class QuartzController {@GetMapping("/test")public String test() throws SchedulerException {//从标准SchedulerFactory获取默认schedulerScheduler scheduler = StdSchedulerFactory.getDefaultScheduler();JobDetail job = JobBuilder.newJob(DateService.class).withIdentity("job1", "group1").build();Trigger trigger1 = TriggerBuilder.newTrigger().withIdentity("trigger1", "group1").startNow().withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(5).withRepeatCount(5)).build();scheduler.scheduleJob(job, trigger1);scheduler.start();return "ok";}
}
启动服务:
能够看到控制台每五秒输出一次当前时间并执行6次。如果显示网页错误可能端口被占用,可以自行在application.xml文件中配置端口。
三、任务相关类定义
创建Task类以及对应枚举以及异常类,具体如下图:
Task内容如下:
package cn.bit.pro1_1.core;import lombok.Data;
import lombok.ToString;@Data
@ToString
public class Task {private Integer id;private String taskName;private String taskGroup;private Integer type;//1、java类 2、Spring Bean 3、http请求private String beanName;//bean名称private String className;//java类名private String path;//rest请求路径private String methodName;//方法名private String params;//方法参数private String cronExpression;//cron表达式private String description;//描述private Integer status;//任务当前状态private Integer result;//任务执行结果
}
枚举内容分别如下:
package cn.bit.pro1_1.core.enums;import lombok.AllArgsConstructor;
import lombok.Getter;@AllArgsConstructor
@Getter
public enum Result {FAIL(0,"失败"),SUCCESS(1,"成功");private final Integer code;private final String desc;
}
package cn.bit.pro1_1.core.enums;import lombok.AllArgsConstructor;
import lombok.Getter;@AllArgsConstructor
@Getter
public enum TaskStatus {PAUSE(0, "暂停中"),RUNNING(1, "运行中");private final Integer code;private final String desc;
}
package cn.bit.pro1_1.core.enums;import lombok.AllArgsConstructor;
import lombok.Getter;@AllArgsConstructor
@Getter
public enum TaskType {JAVA_CLASS(1,"java类"),SPRING_BEAN(2,"spring bean"),HTTP(3,"http");private final Integer code;private final String desc;
}
异常类
package cn.bit.pro1_1.core.exception;import cn.bit.pro1_1.core.Task;
import lombok.Getter;@Getter
public class TaskException extends Exception {private final Task task;public TaskException(String message, Task task) {super(message);this.task = task;}
}
四、handler接口、实现类以及工厂类
在util中创建SpringContextHolder工具类方便其他类拿取ApplicationContext。
package cn.bit.pro1_1.core.util;import lombok.Getter;
import lombok.NonNull;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Service;@Service
public class SpringContextHolder implements ApplicationContextAware {@Getterprivate static ApplicationContext applicationContext = null;@Overridepublic void setApplicationContext(@NonNull ApplicationContext applicationContext) throws BeansException {SpringContextHolder.applicationContext = applicationContext;}
}
在handler中创建如下类和接口。
ITaskHandler内容
package cn.bit.pro1_1.core.handler;import cn.bit.pro1_1.core.Task;
import cn.bit.pro1_1.core.exception.TaskException;public interface ITaskHandler {void invoke(Task task) throws TaskException;
}
JavaClassTaskHandler内容,使用全类名调用方法
package cn.bit.pro1_1.core.handler.impl;import cn.bit.pro1_1.core.Task;
import cn.bit.pro1_1.core.enums.Result;
import cn.bit.pro1_1.core.exception.TaskException;
import cn.bit.pro1_1.core.handler.ITaskHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.ReflectionUtils;import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;@Slf4j
@Component
public class JavaClassTaskHandler implements ITaskHandler {@Overridepublic void invoke(Task task) throws TaskException {try {Object target;Class<?> clazz;Method method;Result returnValue;clazz = Class.forName(task.getClassName());target = clazz.newInstance();if (task.getParams() == null || task.getParams().isEmpty()) {method = target.getClass().getDeclaredMethod(task.getMethodName());ReflectionUtils.makeAccessible(method);returnValue = (Result) method.invoke(target);} else {method = target.getClass().getDeclaredMethod(task.getMethodName(), String.class);ReflectionUtils.makeAccessible(method);returnValue = (Result) method.invoke(target, task.getParams());}//判断业务是否执行成功if (returnValue == null || Result.FAIL.equals(returnValue))throw new TaskException("JavaClassTaskHandler方法执行失败", task);} catch (NoSuchMethodException e) {throw new TaskException("JavaClassTaskHandler找不到对应方法", task);} catch (InvocationTargetException | IllegalAccessException e) {throw new TaskException("JavaClassTaskHandler执行反射方法异常", task);} catch (ClassCastException e) {throw new TaskException("JavaClassTaskHandler方法返回值定义错误", task);} catch (ClassNotFoundException e) {throw new TaskException("JavaClassTaskHandler找不到对应类", task);} catch (InstantiationException e) {throw new TaskException("JavaClassTaskHandler实例化错误", task);}}
}
SpringBeanTaskHandler内容,通过Spring Bean的名称获取对应类以及方法。
package cn.bit.pro1_1.core.handler.impl;import cn.bit.pro1_1.core.Task;
import cn.bit.pro1_1.core.enums.Result;
import cn.bit.pro1_1.core.exception.TaskException;
import cn.bit.pro1_1.core.handler.ITaskHandler;
import cn.bit.pro1_1.core.util.SpringContextHolder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.stereotype.Component;
import org.springframework.util.ReflectionUtils;import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;@Slf4j
@Component
public class SpringBeanTaskHandler implements ITaskHandler {@Overridepublic void invoke(Task task) throws TaskException {try {Object target;Method method;Result returnValue;//上下文寻找对应beantarget = SpringContextHolder.getApplicationContext().getBean(task.getBeanName());//寻找对应方法if(task.getParams()==null|| task.getParams().isEmpty()){method = target.getClass().getDeclaredMethod(task.getMethodName());ReflectionUtils.makeAccessible(method);returnValue = (Result) method.invoke(target);}else{method = target.getClass().getDeclaredMethod(task.getMethodName(), String.class);ReflectionUtils.makeAccessible(method);returnValue = (Result) method.invoke(target, task.getParams());}//判断业务是否执行成功if(returnValue==null || Result.FAIL.equals(returnValue))throw new TaskException("SpringBeanTaskHandler方法执行失败", task);}catch (NoSuchBeanDefinitionException e){throw new TaskException("SpringBeanTaskHandler找不到对应bean", task);} catch (NoSuchMethodException e) {throw new TaskException("SpringBeanTaskHandler找不到对应方法", task);} catch (InvocationTargetException | IllegalAccessException e) {throw new TaskException("SpringBeanTaskHandler执行反射方法异常", task);} catch (ClassCastException e) {throw new TaskException("SpringBeanTaskHandler方法返回值定义错误", task);}}
}
TaskHandlerFactory内容,根据Task类型判断使用哪种handler处理
package cn.bit.pro1_1.core.handler;import cn.bit.pro1_1.core.Task;
import cn.bit.pro1_1.core.enums.TaskType;
import cn.bit.pro1_1.core.handler.impl.JavaClassTaskHandler;
import cn.bit.pro1_1.core.handler.impl.SpringBeanTaskHandler;
import cn.bit.pro1_1.core.util.SpringContextHolder;
import org.springframework.stereotype.Component;@Component
public class TaskHandlerFactory {public static ITaskHandler getTaskHandler(Task task) {ITaskHandler taskHandler = null;if(TaskType.SPRING_BEAN.getCode().equals(task.getType())) {taskHandler = SpringContextHolder.getApplicationContext().getBean(SpringBeanTaskHandler.class);}if(TaskType.JAVA_CLASS.getCode().equals(task.getType())) {taskHandler = SpringContextHolder.getApplicationContext().getBean(JavaClassTaskHandler.class);}return taskHandler;}
}
五、业务定义
最外层的service层中再创建两个测试业务分别测试bean名称调用和java全类名调用两种业务。
内容如下:
package cn.bit.pro1_1.service;import cn.bit.pro1_1.core.enums.Result;
import org.springframework.stereotype.Service;@Service("test")
public class TestBeanService {public Result test(String param){System.out.println(param);return Result.SUCCESS;}
}
package cn.bit.pro1_1.service;import cn.bit.pro1_1.core.enums.Result;public class TestJavaService {public Result test(String param){System.out.println(param);return Result.SUCCESS;}
}
controller中增加测试接口test1和test2
package cn.bit.pro1_1.controller;import cn.bit.pro1_1.core.Task;
import cn.bit.pro1_1.core.exception.TaskException;
import cn.bit.pro1_1.core.handler.ITaskHandler;
import cn.bit.pro1_1.core.handler.TaskHandlerFactory;
import cn.bit.pro1_1.service.DateService;
import lombok.AllArgsConstructor;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/quartz")
@AllArgsConstructor
public class QuartzController {@GetMapping("/test")public String test() throws SchedulerException {//从标准SchedulerFactory获取默认schedulerScheduler scheduler = StdSchedulerFactory.getDefaultScheduler();JobDetail job = JobBuilder.newJob(DateService.class).withIdentity("job1", "group1").build();Trigger trigger1 = TriggerBuilder.newTrigger().withIdentity("trigger1", "group1").startNow().withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(5).withRepeatCount(5)).build();scheduler.scheduleJob(job, trigger1);scheduler.start();return "ok";}@GetMapping("/test1")public String test1(){Task task = new Task();task.setType(2);task.setBeanName("test");task.setMethodName("test");task.setParams("test");ITaskHandler taskHandler = TaskHandlerFactory.getTaskHandler(task);try {taskHandler.invoke(task);} catch (TaskException e) {e.printStackTrace();return "error";}return "success";}@GetMapping("/test2")public String test2(){Task task = new Task();task.setType(1);task.setClassName("cn.bit.pro1_1.service.TestJavaService");task.setMethodName("test");task.setParams("test");ITaskHandler taskHandler = TaskHandlerFactory.getTaskHandler(task);try {taskHandler.invoke(task);} catch (TaskException e) {e.printStackTrace();return "error";}return "success";}
}
调用test1
再调用test2
无论是bean调用还是类名调用均成功。
六、任务调度器
core中定义SysJob以及TaskManager类
SysJob实现job接口,完成之前反射调用的封装。内容如下:
package cn.bit.pro1_1.core;import cn.bit.pro1_1.core.exception.TaskException;
import cn.bit.pro1_1.core.handler.ITaskHandler;
import cn.bit.pro1_1.core.handler.TaskHandlerFactory;
import lombok.extern.slf4j.Slf4j;
import org.quartz.Job;
import org.quartz.JobExecutionContext;@Slf4j
public class SysJob implements Job {@Overridepublic void execute(JobExecutionContext jobExecutionContext){Task task = (Task) jobExecutionContext.getJobDetail().getJobDataMap().get("task");ITaskHandler handler = TaskHandlerFactory.getTaskHandler(task);try {handler.invoke(task);} catch (TaskException e) {log.error("{},Task:{}", e.getMessage(), e.getTask().toString());}}
}
TaskManger则对外提供服务管理调度器中任务状态。内容如下:(其中注释部分为持久化部分)
package cn.bit.pro1_1.core;import cn.bit.pro1_1.core.enums.TaskStatus;import lombok.AllArgsConstructor;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import java.util.List;@Slf4j
@Service
@AllArgsConstructor
public class TaskManager {private Scheduler scheduler;
// private TaskService taskService;@PostConstructpublic void init() throws SchedulerException {log.info("TaskManager初始化开始...");
// List<Task> tasks = taskService.selectAllTask();
// if(tasks != null && !tasks.isEmpty()) {
// for (Task task : tasks)
// {
// addOrUpdateTask(task);
// }
// log.info("共加载{}项任务", tasks.size());
// }log.info("TaskManager初始化结束...");}public static JobKey getJobKey(@NonNull Task task) {return JobKey.jobKey(task.getTaskName(),task.getTaskGroup());}public static TriggerKey getTriggerKey(@NonNull Task task) {return TriggerKey.triggerKey(task.getTaskName(),task.getTaskGroup());}public void addOrUpdateTask(@NonNull Task task) throws SchedulerException {CronTrigger trigger = null;JobKey jobKey = getJobKey(task);TriggerKey triggerKey = getTriggerKey(task);trigger = (CronTrigger) scheduler.getTrigger(triggerKey);// 判断触发器是否存在(如果存在说明之前运行过但是在当前被禁用了,如果不存在说明一次都没运行过)if (trigger == null) {// 新建一个工作任务 指定任务类型为串接进行的JobDetail jobDetail = JobBuilder.newJob(SysJob.class).withIdentity(jobKey).build();// 将任务信息添加到任务信息中jobDetail.getJobDataMap().put("task",task);// 将cron表达式进行转换CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(task.getCronExpression());// 创建触发器并将cron表达式对象给塞入trigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).withSchedule(cronScheduleBuilder).build();// 在调度器中将触发器和任务进行组合scheduler.scheduleJob(jobDetail, trigger);// if(taskService.selectTaskByNameAndGroup(task.getTaskName(),task.getTaskGroup()) == null) {
// taskService.insertTask(task);
// }else
// taskService.updateTask(task);}else {CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(task.getCronExpression());// 按照新的规则进行trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(cronScheduleBuilder).build();// 将任务信息更新到任务信息中trigger.getJobDataMap().put("task", task);// 重启scheduler.rescheduleJob(triggerKey, trigger);
// taskService.updateTask(task);}// 如任务状态为暂停if (task.getStatus().equals(TaskStatus.PAUSE.getCode())) {this.pauseJob(task);}}public void pauseJob(@NonNull Task task) throws SchedulerException {scheduler.pauseJob(getJobKey(task));task.setStatus(TaskStatus.PAUSE.getCode());
// taskService.updateTask(task);}
}
controller中添加start和pause方法,验证调度是否成功。
package cn.bit.pro1_1.controller;import cn.bit.pro1_1.core.Task;
import cn.bit.pro1_1.core.TaskManager;
import cn.bit.pro1_1.core.exception.TaskException;
import cn.bit.pro1_1.core.handler.ITaskHandler;
import cn.bit.pro1_1.core.handler.TaskHandlerFactory;
import cn.bit.pro1_1.service.DateService;
import lombok.AllArgsConstructor;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/quartz")
@AllArgsConstructor
public class QuartzController {private TaskManager taskManager;@GetMapping("/test")public String test() throws SchedulerException {//从标准SchedulerFactory获取默认schedulerScheduler scheduler = StdSchedulerFactory.getDefaultScheduler();JobDetail job = JobBuilder.newJob(DateService.class).withIdentity("job1", "group1").build();Trigger trigger1 = TriggerBuilder.newTrigger().withIdentity("trigger1", "group1").startNow().withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(5).withRepeatCount(5)).build();scheduler.scheduleJob(job, trigger1);scheduler.start();return "ok";}@GetMapping("/test1")public String test1(){Task task = new Task();task.setType(2);task.setBeanName("test");task.setMethodName("test");task.setParams("test");ITaskHandler taskHandler = TaskHandlerFactory.getTaskHandler(task);try {taskHandler.invoke(task);} catch (TaskException e) {e.printStackTrace();return "error";}return "success";}@GetMapping("/test2")public String test2(){Task task = new Task();task.setType(1);task.setClassName("cn.bit.pro1_1.service.TestJavaService");task.setMethodName("test");task.setParams("test");ITaskHandler taskHandler = TaskHandlerFactory.getTaskHandler(task);try {taskHandler.invoke(task);} catch (TaskException e) {e.printStackTrace();return "error";}return "success";}@GetMapping("/start")public String start() throws SchedulerException {Task task = new Task();task.setTaskName("task1");task.setTaskGroup("group1");task.setType(2);task.setBeanName("test");task.setMethodName("test");task.setParams("test");task.setCronExpression("*/5 * * * * ?");task.setStatus(1);taskManager.addOrUpdateTask(task);return "ok";}@GetMapping("/pause")public String pause() throws SchedulerException {Task task = new Task();task.setTaskName("task1");task.setTaskGroup("group1");task.setType(2);task.setBeanName("test");task.setMethodName("test");task.setParams("test");task.setCronExpression("*/5 * * * * ?");task.setStatus(1);taskManager.pauseJob(task);return "ok";}
}
启动服务,调用接口
能够查看到任务正不断被调用。调用暂停接口后任务不再触发。
七、持久化定时任务状态
在对应数据库创建表,sql如下:
CREATE TABLE tb_task (id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,task_name VARCHAR(255) NOT NULL,task_group VARCHAR(255) NOT NULL,type INT NOT NULL,bean_name VARCHAR(255) NULL,class_name VARCHAR(255) NULL,path VARCHAR(255) NULL,method_name VARCHAR(255) NULL,params VARCHAR(255) NULL,cron_expression VARCHAR(255) NOT NULL,description TEXT NULL,status INT NOT NULL DEFAULT 0,result INT NULL
);
创建对应mapper,service,serviceImpl以及mapper.xml
内容如下:
package cn.bit.pro1_1.core.mapper;import cn.bit.pro1_1.core.Task;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;import java.util.List;@Mapper
public interface TaskMapper {List<Task> selectAllTask();int updateTask(@Param("task") Task task);int insertTask(@Param("task") Task task);Task selectTaskByNameAndGroup(@Param("name") String name, @Param("group") String group );
}
package cn.bit.pro1_1.core.service;import cn.bit.pro1_1.core.Task;import java.util.List;public interface TaskService {List<Task> selectAllTask();int updateTask(Task task);int insertTask(Task task);Task selectTaskByNameAndGroup(String taskName, String groupName);
}
package cn.bit.pro1_1.core.service.impl;import cn.bit.pro1_1.core.Task;
import cn.bit.pro1_1.core.mapper.TaskMapper;
import cn.bit.pro1_1.core.service.TaskService;
import lombok.AllArgsConstructor;
import org.springframework.stereotype.Service;import java.util.List;@Service
@AllArgsConstructor
public class TaskServiceImpl implements TaskService {private TaskMapper taskMapper;@Overridepublic List<Task> selectAllTask() {return taskMapper.selectAllTask();}@Overridepublic int updateTask(Task task) {System.out.println(task.toString());return taskMapper.updateTask(task);}@Overridepublic int insertTask(Task task) {return taskMapper.insertTask(task);}@Overridepublic Task selectTaskByNameAndGroup(String taskName, String groupName) {return taskMapper.selectTaskByNameAndGroup(taskName, groupName);}
}
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapperPUBLIC "-//mybatis.org//DTD Mapper 3.0//EN""http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.bit.pro1_1.core.mapper.TaskMapper"><resultMap id="TaskResultMap" type="cn.bit.pro1_1.core.Task"><result property="id" column="id"/><result property="taskName" column="task_name"/><result property="taskGroup" column="task_group"/><result property="type" column="type"/><result property="beanName" column="bean_name"/><result property="className" column="class_name"/><result property="path" column="path"/><result property="methodName" column="method_name"/><result property="params" column="params"/><result property="cronExpression" column="cron_expression"/><result property="description" column="description"/><result property="status" column="status"/><result property="result" column="result"/></resultMap><insert id="insertTask">INSERT INTO tb_task (task_name,task_group,type,bean_name,class_name,path,method_name,params,cron_expression,description,status,result) VALUES (#{task.taskName},#{task.taskGroup},#{task.type},#{task.beanName},#{task.className},#{task.path},#{task.methodName},#{task.params},#{task.cronExpression},#{task.description},#{task.status},#{task.result})</insert><select id="selectAllTask" resultMap="TaskResultMap">SELECTid,task_name,task_group,type,bean_name,class_name,path,method_name,params,cron_expression,description,status,resultFROM tb_task;</select><select id="selectTaskByNameAndGroup" resultType="cn.bit.pro1_1.core.Task">select * from tb_taskwheretask_name = #{name} andtask_group = #{group}</select><update id="updateTask">UPDATE tb_taskSETtype = #{task.type},bean_name = #{task.beanName},class_name = #{task.className},path = #{task.path},method_name = #{task.methodName},params = #{task.params},cron_expression = #{task.cronExpression},description = #{task.description},status = #{task.status},result = #{task.result}WHEREtask_name = #{task.taskName} andtask_group = #{task.taskGroup};</update>
</mapper>
并将之前TaskManager中与持久层相关注释还原
package cn.bit.pro1_1.core;import cn.bit.pro1_1.core.enums.TaskStatus;
import cn.bit.pro1_1.core.service.TaskService;import lombok.AllArgsConstructor;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import java.util.List;@Slf4j
@Service
@AllArgsConstructor
public class TaskManager {private Scheduler scheduler;private TaskService taskService;@PostConstructpublic void init() throws SchedulerException {log.info("TaskManager初始化开始...");List<Task> tasks = taskService.selectAllTask();if(tasks != null && !tasks.isEmpty()) {for (Task task : tasks){addOrUpdateTask(task);}log.info("共加载{}项任务", tasks.size());}log.info("TaskManager初始化结束...");}public static JobKey getJobKey(@NonNull Task task) {return JobKey.jobKey(task.getTaskName(),task.getTaskGroup());}public static TriggerKey getTriggerKey(@NonNull Task task) {return TriggerKey.triggerKey(task.getTaskName(),task.getTaskGroup());}public void addOrUpdateTask(@NonNull Task task) throws SchedulerException {CronTrigger trigger = null;JobKey jobKey = getJobKey(task);TriggerKey triggerKey = getTriggerKey(task);trigger = (CronTrigger) scheduler.getTrigger(triggerKey);// 判断触发器是否存在(如果存在说明之前运行过但是在当前被禁用了,如果不存在说明一次都没运行过)if (trigger == null) {// 新建一个工作任务 指定任务类型为串接进行的JobDetail jobDetail = JobBuilder.newJob(SysJob.class).withIdentity(jobKey).build();// 将任务信息添加到任务信息中jobDetail.getJobDataMap().put("task",task);// 将cron表达式进行转换CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(task.getCronExpression());// 创建触发器并将cron表达式对象给塞入trigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).withSchedule(cronScheduleBuilder).build();// 在调度器中将触发器和任务进行组合scheduler.scheduleJob(jobDetail, trigger);if(taskService.selectTaskByNameAndGroup(task.getTaskName(),task.getTaskGroup()) == null) {taskService.insertTask(task);}elsetaskService.updateTask(task);}else {CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(task.getCronExpression());// 按照新的规则进行trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(cronScheduleBuilder).build();// 将任务信息更新到任务信息中trigger.getJobDataMap().put("task", task);// 重启scheduler.rescheduleJob(triggerKey, trigger);taskService.updateTask(task);}// 如任务状态为暂停if (task.getStatus().equals(TaskStatus.PAUSE.getCode())) {this.pauseJob(task);}}public void pauseJob(@NonNull Task task) throws SchedulerException {scheduler.pauseJob(getJobKey(task));task.setStatus(TaskStatus.PAUSE.getCode());taskService.updateTask(task);}
}
并在controller定义接口,通过请求体获取task
package cn.bit.pro1_1.controller;import cn.bit.pro1_1.core.Task;
import cn.bit.pro1_1.core.TaskManager;
import cn.bit.pro1_1.core.exception.TaskException;
import cn.bit.pro1_1.core.handler.ITaskHandler;
import cn.bit.pro1_1.core.handler.TaskHandlerFactory;
import cn.bit.pro1_1.service.DateService;
import lombok.AllArgsConstructor;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import org.springframework.web.bind.annotation.*;@RestController
@RequestMapping("/quartz")
@AllArgsConstructor
public class QuartzController {private TaskManager taskManager;@GetMapping("/test")public String test() throws SchedulerException {//从标准SchedulerFactory获取默认schedulerScheduler scheduler = StdSchedulerFactory.getDefaultScheduler();JobDetail job = JobBuilder.newJob(DateService.class).withIdentity("job1", "group1").build();Trigger trigger1 = TriggerBuilder.newTrigger().withIdentity("trigger1", "group1").startNow().withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(5).withRepeatCount(5)).build();scheduler.scheduleJob(job, trigger1);scheduler.start();return "ok";}@GetMapping("/test1")public String test1(){Task task = new Task();task.setType(2);task.setBeanName("test");task.setMethodName("test");task.setParams("test");ITaskHandler taskHandler = TaskHandlerFactory.getTaskHandler(task);try {taskHandler.invoke(task);} catch (TaskException e) {e.printStackTrace();return "error";}return "success";}@GetMapping("/test2")public String test2(){Task task = new Task();task.setType(1);task.setClassName("cn.bit.pro1_1.service.TestJavaService");task.setMethodName("test");task.setParams("test");ITaskHandler taskHandler = TaskHandlerFactory.getTaskHandler(task);try {taskHandler.invoke(task);} catch (TaskException e) {e.printStackTrace();return "error";}return "success";}@GetMapping("/start")public String start() throws SchedulerException {Task task = new Task();task.setTaskName("task1");task.setTaskGroup("group1");task.setType(2);task.setBeanName("test");task.setMethodName("test");task.setParams("test");task.setCronExpression("*/5 * * * * ?");task.setStatus(1);taskManager.addOrUpdateTask(task);return "ok";}@GetMapping("/pause")public String pause() throws SchedulerException {Task task = new Task();task.setTaskName("task1");task.setTaskGroup("group1");task.setType(2);task.setBeanName("test");task.setMethodName("test");task.setParams("test");task.setCronExpression("*/5 * * * * ?");task.setStatus(1);taskManager.pauseJob(task);return "ok";}@PostMapping("/task")public String addOrUpdateTask(@RequestBody Task task) throws SchedulerException {taskManager.addOrUpdateTask(task);return "ok";}
}
同时为方便观察调用时间将原有输出test添加上时间
启动服务
使用postman发请求,注意此时发送的任务状态为暂停
同时数据库中增加了一条任务
将status改为1
定时任务被调用,目前是每次update任务是立刻执行一次,为bug后续会修复,可关注。
同时数据库状态改变
改变调用时间从10秒一次变成5秒一次
关掉服务器,再重启服务,从数据库中加载定时任务并执行。
最后:
目前定时模块的基本需求已经能够满足,由于模块借鉴实习公司的模块,持久化部分还存在小bug,先把基础思路和实现发出来作为一个抛砖引玉的作用。bug主要集中在持久层部分,例如在spring项目运行时无法修改调用的方法,公司的处理方法是直接暴力在前端杜绝更改,不过修改其实也很简单在addOrupdate部分针对已有任务移除并重新添加一次即可,但个人认为将add和update操作放在一个函数里导致这个bug的根本原因,同时也因为这个二合一的函数导致我持久层的代码改了四五遍,后续更新Pro1-2章会修复这个问题,同时添加日志异步等功能,Pro1-3章计划是把quartz模块添加到前言中提到的之前的微服务项目中。欢迎大家关注和收藏,你们的支持是我更新的最大动力。