前言
在我们的springboot项目中,有很多种实现定时任务的方式
有用最简单的
@Scheduled
实现定时任务,即:
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;@Component
@EnableScheduling
public class ScheduledTasks {// 固定延迟任务@Scheduled(fixedDelay = 5000)public void fixedDelayTask() {System.out.println("任务以固定的5秒延迟执行");}// 固定频率任务@Scheduled(fixedRate = 5000)public void fixedRateTask() {System.out.println("任务以固定的每5秒执行");}// Cron 表达式任务@Scheduled(cron = "0 0/5 * * * ?")public void cronTask() {System.out.println("任务以固定的每5分钟执行");}
}
使用
TaskScheduler
动态创建任务
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;@Configuration
public class SchedulerConfig {@Beanpublic TaskScheduler taskScheduler() {ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();scheduler.setPoolSize(10);scheduler.setThreadNamePrefix("scheduler-");return scheduler;}
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.util.concurrent.ScheduledFuture;@Component
public class DynamicScheduledTask {@Autowiredprivate TaskScheduler taskScheduler;private ScheduledFuture<?> scheduledFuture;@PostConstructpublic void scheduleTask() {scheduledFuture = taskScheduler.schedule(this::task, new CronTrigger("0 0/5 * * * ?"));}public void task() {System.out.println("定时任务执行");}
}
除此外还有各种定时框架
Quartz
XXL-JOB
Elastic-Job
当然还有其他,但是本次将会着重讲解如何用Quartz定时框架
和数据库
来控制定时任务的执行
与否以及执行时间长度
🎈🎈🎈🎈🎈🎈🎈🎈🎈🎈
Quartz定时框架
Quartz 是一个开源的调度框架,作为 Spring 默认的调度框架,Quartz 很容易与 Spring 集成实现灵活可配置的调度功能,它有很多优点
强大的调度功能
灵活的应用方式,支持任务和调度的多种组合方式,支持调度数据的多种存储方式;
分布式和集群能力
缺点也是有的
分布式支持不友好,没有内置 UI 管理控制台
、相较于其他框架使用麻烦。
如下为Quartz相关的链接
Quartz 框架
Quzrtz框架官方文档
除此外,在 Quartz 体系结构中,有三个组件非常重要
Scheduler
:调度器。使用Scheduler启动Trigger去执行Job。
Trigger
:触发器。用来定义 Job(任务)触发条件、触发时间,触发间隔,终止时间等。
有四大类型:SimpleTrigger(简单的触发器)、CornTrigger(Cron表达式触发器)、DateIntervalTrigger(日期触发器)、CalendarIntervalTrigger(日历触发器)。当然,本次主要以SimpleTrigger和CornTrigger作为演示
Job
:任务。具体要执行的业务逻辑,需要定时执行的逻辑
虽然我们本文章说用数据库的方式进行和Quartz进行配置,但是它也能用本地配置的的方式执行,详情如下:
类型 | 优点 | 缺点 |
---|---|---|
RAM作业存储 | 不要外部数据库,配置容易,运行速度快 | 因为调度程序信息是存储在被分配给 JVM 的内存里面,所以,当应用程序停止运行时,所有调度信息将被丢失。另外因为存储到JVM内存里面,所以可以存储多少个 Job 和 Trigger 将会受到限制 |
JDBC 作业存储 | 支持集群,因为所有的任务信息都会保存到数据库中,可以控制事物,还有就是如果应用服务器关闭或者重启,任务信息都不会丢失,并且可以恢复因服务器关闭或者重启而导致执行失败的任务 | 运行速度的快慢取决与连接数据库的快慢 |
✨✨✨✨✨✨✨✨✨
执行步骤
重要MAVEN依赖
此处给出Quartz的依赖,依赖版本随自己项目的springboot项目版本
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-quartz</artifactId><version>2.6.2</version></dependency>
建表与实体
Quartz虽然也有它自己的表,可以使用它的内置的数据库表进行搭建定时任务,但我追求简洁,方便,因此我本次自定义建表
CREATE TABLE `task_config` (`handle` varchar(40) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '主键',`task_id` varchar(128) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '定时任务编号',`task_name` varchar(128) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '定时任务名称',`module_name` varchar(128) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '执行分类',`enable` tinyint(1) NOT NULL COMMENT '任务是否执行',`task_run_type` varchar(64) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '定时执行方式,固定速率FIX或者corn表达式CORN',`task_run_fix` int(11) DEFAULT NULL COMMENT '固定速率时读取的配置',`task_run_corn` varchar(128) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT 'corn表达式时候读取的配置',`create_user` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '创建人',`delete_flag` tinyint(1) NOT NULL COMMENT '是否任务失效',`create_time` datetime DEFAULT NULL COMMENT '创建时间',`update_user` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '更新人',`update_time` datetime DEFAULT NULL COMMENT '更新时间',PRIMARY KEY (`handle`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
对应建立实体如下:
import java.time.LocalDateTime;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;@Getter
@Setter
@ToString
public class TaskConfig {private String handle;private String taskId;private String taskName;private String moduleName;private String enable;private String taskRunType;private Integer taskRunFix;private String taskRunCorn;private String createUser;private String deleteFlag;private LocalDateTime createTime;private String updateUser;private LocalDateTime updateTime;
}
JDBC数据库DAO
✒️✒️✒️✒️✒️✒️✒️✒️✒️
在我的项目中是使用JDBC的数据库连接方式,你们如果和我的处理方式不一致,可使用自己的数据库连接代码
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import com.hxc.DateUtils;
import com.hxc.Where;import com.hxc.user.baseTable.TaskConfig;public class TaskConfigDao {private static final String SQL_INS = "INSERT INTO task_config(handle,task_id,task_name,module_name,enable,task_run_type,task_run_fix,task_run_corn,create_user,delete_flag,create_time,update_user,update_time,handle,task_id,task_name,module_name,enable,task_run_type,task_run_fix,task_run_corn,create_user,delete_flag,create_time,update_user,update_time) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";private static final String SQL_UPD = "UPDATE task_config SET task_id=?,task_name=?,module_name=?,enable=?,task_run_type=?,task_run_fix=?,task_run_corn=?,create_user=?,delete_flag=?,create_time=?,update_user=?,update_time=?,task_id=?,task_name=?,module_name=?,enable=?,task_run_type=?,task_run_fix=?,task_run_corn=?,create_user=?,delete_flag=?,create_time=?,update_user=?,update_time=? WHERE handle=? AND handle=?";private static final String SQL_SEL = "SELECT handle,task_id,task_name,module_name,enable,task_run_type,task_run_fix,task_run_corn,create_user,delete_flag,create_time,update_user,update_time,handle,task_id,task_name,module_name,enable,task_run_type,task_run_fix,task_run_corn,create_user,delete_flag,create_time,update_user,update_time FROM task_config ";private final Connection conn;public TaskConfigDao(Connection conn) {this.conn = conn;}public int insert(TaskConfig data) {try (PreparedStatement ps = this.conn.prepareStatement(SQL_INS)) {ps.setString(1, data.getHandle());ps.setString(2, data.getTaskId());ps.setString(3, data.getTaskName());ps.setString(4, data.getModuleName());ps.setString(5, data.getEnable());ps.setString(6, data.getTaskRunType());ps.setObject(7, data.getTaskRunFix());ps.setString(8, data.getTaskRunCorn());ps.setString(9, data.getCreateUser());ps.setString(10, data.getDeleteFlag());DateUtils.setDateTz(ps, 11, data.getCreateTime());ps.setString(12, data.getUpdateUser());DateUtils.setDateTz(ps, 13, data.getUpdateTime());ps.setString(14, data.getHandle());ps.setString(15, data.getTaskId());ps.setString(16, data.getTaskName());ps.setString(17, data.getModuleName());ps.setString(18, data.getEnable());ps.setString(19, data.getTaskRunType());ps.setObject(20, data.getTaskRunFix());ps.setString(21, data.getTaskRunCorn());ps.setString(22, data.getCreateUser());ps.setString(23, data.getDeleteFlag());DateUtils.setDateTz(ps, 24, data.getCreateTime());ps.setString(25, data.getUpdateUser());DateUtils.setDateTz(ps, 26, data.getUpdateTime());return ps.executeUpdate();} catch (SQLException e) {throw new IllegalStateException("数据库查询错误, " + e.getMessage(), e);}}public int insert(List<TaskConfig> dataList){try (PreparedStatement ps = this.conn.prepareStatement(SQL_INS)) {for(TaskConfig data : dataList) {ps.setString(1, data.getHandle());ps.setString(2, data.getTaskId());ps.setString(3, data.getTaskName());ps.setString(4, data.getModuleName());ps.setString(5, data.getEnable());ps.setString(6, data.getTaskRunType());ps.setObject(7, data.getTaskRunFix());ps.setString(8, data.getTaskRunCorn());ps.setString(9, data.getCreateUser());ps.setString(10, data.getDeleteFlag());DateUtils.setDateTz(ps, 11, data.getCreateTime());ps.setString(12, data.getUpdateUser());DateUtils.setDateTz(ps, 13, data.getUpdateTime());ps.setString(14, data.getHandle());ps.setString(15, data.getTaskId());ps.setString(16, data.getTaskName());ps.setString(17, data.getModuleName());ps.setString(18, data.getEnable());ps.setString(19, data.getTaskRunType());ps.setObject(20, data.getTaskRunFix());ps.setString(21, data.getTaskRunCorn());ps.setString(22, data.getCreateUser());ps.setString(23, data.getDeleteFlag());DateUtils.setDateTz(ps, 24, data.getCreateTime());ps.setString(25, data.getUpdateUser());DateUtils.setDateTz(ps, 26, data.getUpdateTime());ps.addBatch();}return ps.executeBatch().length;} catch (SQLException e) {throw new IllegalStateException("数据库查询错误, " + e.getMessage(), e);}}public int update(TaskConfig data){try (PreparedStatement ps = this.conn.prepareStatement(SQL_UPD)) {ps.setString(1, data.getTaskId());ps.setString(2, data.getTaskName());ps.setString(3, data.getModuleName());ps.setString(4, data.getEnable());ps.setString(5, data.getTaskRunType());ps.setObject(6, data.getTaskRunFix());ps.setString(7, data.getTaskRunCorn());ps.setString(8, data.getCreateUser());ps.setString(9, data.getDeleteFlag());DateUtils.setDateTz(ps, 10, data.getCreateTime());ps.setString(11, data.getUpdateUser());DateUtils.setDateTz(ps, 12, data.getUpdateTime());ps.setString(13, data.getTaskId());ps.setString(14, data.getTaskName());ps.setString(15, data.getModuleName());ps.setString(16, data.getEnable());ps.setString(17, data.getTaskRunType());ps.setObject(18, data.getTaskRunFix());ps.setString(19, data.getTaskRunCorn());ps.setString(20, data.getCreateUser());ps.setString(21, data.getDeleteFlag());DateUtils.setDateTz(ps, 22, data.getCreateTime());ps.setString(23, data.getUpdateUser());DateUtils.setDateTz(ps, 24, data.getUpdateTime());ps.setString(25, data.getHandle());ps.setString(26, data.getHandle());return ps.executeUpdate();} catch (SQLException e) {throw new IllegalStateException("数据库查询错误, " + e.getMessage(), e);}}public int update(List<TaskConfig> dataList){try (PreparedStatement ps = this.conn.prepareStatement(SQL_UPD)) {for(TaskConfig data : dataList) {ps.setString(1, data.getTaskId());ps.setString(2, data.getTaskName());ps.setString(3, data.getModuleName());ps.setString(4, data.getEnable());ps.setString(5, data.getTaskRunType());ps.setObject(6, data.getTaskRunFix());ps.setString(7, data.getTaskRunCorn());ps.setString(8, data.getCreateUser());ps.setString(9, data.getDeleteFlag());DateUtils.setDateTz(ps, 10, data.getCreateTime());ps.setString(11, data.getUpdateUser());DateUtils.setDateTz(ps, 12, data.getUpdateTime());ps.setString(13, data.getTaskId());ps.setString(14, data.getTaskName());ps.setString(15, data.getModuleName());ps.setString(16, data.getEnable());ps.setString(17, data.getTaskRunType());ps.setObject(18, data.getTaskRunFix());ps.setString(19, data.getTaskRunCorn());ps.setString(20, data.getCreateUser());ps.setString(21, data.getDeleteFlag());DateUtils.setDateTz(ps, 22, data.getCreateTime());ps.setString(23, data.getUpdateUser());DateUtils.setDateTz(ps, 24, data.getUpdateTime());ps.setString(25, data.getHandle());ps.setString(26, data.getHandle());ps.addBatch();}return ps.executeBatch().length;} catch (SQLException e) {throw new IllegalStateException("数据库查询错误, " + e.getMessage(), e);}}public int delete(String handle){try (PreparedStatement ps = this.conn.prepareStatement("DELETE FROM task_config WHERE handle=?")) {ps.setString(1, handle);ps.setString(2, handle);return ps.executeUpdate();} catch (SQLException e) {throw new IllegalStateException("数据库查询错误, " + e.getMessage(), e);}}public List<TaskConfig> selectAll(){ArrayList<TaskConfig> result = new ArrayList<TaskConfig>();try (PreparedStatement ps = this.conn.prepareStatement(SQL_SEL)) {ResultSet rs = ps.executeQuery();while(rs.next()) {result.add(convert(rs));}return result;} catch (SQLException e) {throw new IllegalStateException("数据库查询错误, " + e.getMessage(), e);}}public TaskConfig selectByPK(String handle){TaskConfig result = null;try (PreparedStatement ps = this.conn.prepareStatement(SQL_SEL + "WHERE handle=?")) {ps.setString(1, handle);ps.setString(2, handle);ResultSet rs = ps.executeQuery();if(rs.next()) {result = convert(rs);}return result;} catch (SQLException e) {throw new IllegalStateException("数据库查询错误, " + e.getMessage(), e);}}public List<TaskConfig> selectWhere(Where where){List<TaskConfig> result = new ArrayList<>();try (PreparedStatement ps = where.prepareStatement(conn, SQL_SEL)) {ResultSet rs = ps.executeQuery();while(rs.next()) {result.add(convert(rs));}return result;} catch (SQLException e) {throw new IllegalStateException("数据库查询错误, " + e.getMessage(), e);}}private TaskConfig convert(ResultSet rs) throws SQLException {TaskConfig data = new TaskConfig();int index = 1;data.setHandle(rs.getString(index++));data.setTaskId(rs.getString(index++));data.setTaskName(rs.getString(index++));data.setModuleName(rs.getString(index++));data.setEnable(rs.getString(index++));data.setTaskRunType(rs.getString(index++));data.setTaskRunFix((Integer)rs.getObject(index++));data.setTaskRunCorn(rs.getString(index++));data.setCreateUser(rs.getString(index++));data.setDeleteFlag(rs.getString(index++));data.setCreateTime(DateUtils.getDateTz(rs, index++));data.setUpdateUser(rs.getString(index++));data.setUpdateTime(DateUtils.getDateTz(rs, index++));data.setHandle(rs.getString(index++));data.setTaskId(rs.getString(index++));data.setTaskName(rs.getString(index++));data.setModuleName(rs.getString(index++));data.setEnable(rs.getString(index++));data.setTaskRunType(rs.getString(index++));data.setTaskRunFix((Integer)rs.getObject(index++));data.setTaskRunCorn(rs.getString(index++));data.setCreateUser(rs.getString(index++));data.setDeleteFlag(rs.getString(index++));data.setCreateTime(DateUtils.getDateTz(rs, index++));data.setUpdateUser(rs.getString(index++));data.setUpdateTime(DateUtils.getDateTz(rs, index++));return data;}
}
定时配置与更新
🖊️🖊️🖊️🖊️🖊️🖊️🖊️🖊️🖊️🖊️
import java.sql.Connection;
import org.quartz.JobBuilder;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.SimpleScheduleBuilder;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.CronScheduleBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.PostConstruct;
import java.util.List;
import org.springframework.context.annotation.Configuration;@Configuration
public class QuartzConfig {@Autowiredprivate Scheduler scheduler;@Autowiredprivate PrimeDB primeDB;@PostConstructpublic void init() {try (Connection conn = primeDB.create()) {TaskConfigDao taskConfigDao = new TaskConfigDao(conn);List<TaskConfig> configs = taskConfigDao.selectAll();for (TaskConfig config : configs) {if ("0".equals(config.getEnable())) {// 如果任务未启用,则跳过continue;}if (uploadQuartzScheduledGroup(config)) {continue; // 无效的调度方式}}} catch (Exception e) {e.printStackTrace();}}public boolean uploadQuartzScheduledGroup(TaskConfig config) throws SchedulerException {String jobGroup = "jobGroup_" + config.getTaskId(); // 根据需要设置任务组String triggerGroup = "triggerGroup_" + config.getTaskId(); // 根据需要设置触发器组JobKey jobKey = new JobKey(config.getTaskId(), jobGroup);if (scheduler.checkExists(jobKey)) {scheduler.deleteJob(jobKey);}JobDetail jobDetail = JobBuilder.newJob(TaskJob.class).withIdentity(config.getTaskId(), jobGroup) // 指定任务组.build();Trigger trigger;switch (config.getTaskRunType()) {case "FIX":trigger = TriggerBuilder.newTrigger().withIdentity(config.getTaskId() + "_trigger", triggerGroup) // 指定触发器组.startNow().withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInMilliseconds(config.getTaskRunFix()).repeatForever()).build();break;case "CRON":trigger = TriggerBuilder.newTrigger().withIdentity(config.getTaskId() + "_trigger", triggerGroup) // 指定触发器组.withSchedule(CronScheduleBuilder.cronSchedule(config.getTaskRunCorn())).build();break;default:return true;}scheduler.scheduleJob(jobDetail, trigger);return false;}
}
我的逻辑使用基础的
ScheduledTasks
定时读取一次数据库的定时任务配置表实时更新我们的定时执行
import java.sql.Connection;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;@EnableScheduling
@Component
@Slf4j
public class TaskUpdater {@Autowiredprivate Scheduler scheduler;@Autowiredprivate PrimeDB primeDB;@Autowiredprivate QuartzConfig quartzConfig;@Scheduled(fixedRate = 60000) // 每分钟检查一次public void updateTasks() {System.out.println("更新定时配置");try (Connection conn = primeDB.create()) {TaskConfigDao taskConfigDao = new TaskConfigDao(conn);List<TaskConfig> configs = taskConfigDao.selectAll();for (TaskConfig config : configs) {if ("0".equals(config.getEnable())) {// 如果任务未启用,则删除任务JobKey jobKey = new JobKey(config.getTaskId(), "jobGroup_" + config.getTaskId());if (scheduler.checkExists(jobKey)) {scheduler.deleteJob(jobKey);}continue;}// 调用封装的启用定时逻辑quartzConfig.uploadQuartzScheduledGroup(config);}} catch (Exception e) {log.error("定时同步定时任务设定失败:" + e);}}
}
定时业务逻辑
✏️✏️✏️✏️✏️✏️✏️✏️✏️✏️✏️
接下来我举例两个例子,一个是基础的固定速率执行,一个是使用corn表达式执行
业务逻辑执行主入口
import java.util.HashMap;
import java.util.Map;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.springframework.stereotype.Component;@Component
public class TaskJob implements Job {private final Map<String, TaskHandler> handlers;@Overridepublic void execute(JobExecutionContext context) {String taskId = context.getJobDetail().getKey().getName();TaskHandler handler = handlers.get(taskId);if (handler != null) {handler.handle();} else {System.out.println("没有找到处理逻辑对应的任务 ID: " + taskId);}}public TaskJob() {handlers = new HashMap<>();handlers.put("AA", new AATaskHandler());handlers.put("BB", new BBTaskHandler());}
}
这里,我写了两个例子,一个是
AA
任务ID,和BB
任务ID
后续如果有新的业务逻辑,只需要往如下代码新增业务逻辑即可
public TaskJob() {handlers = new HashMap<>();handlers.put("AA", new AATaskHandler());handlers.put("BB", new BBTaskHandler());handlers.put("CC", new BBTaskHandler());// 更多其他....}
那么我对应的AA和BB业务逻辑如下:
在之前先需要写一个interface
实现继承来达到加入列表的目的
public interface TaskHandler {void handle();
}
public class AATaskHandler implements TaskHandler{@Overridepublic void handle() {System.out.println("AA定时任务执行");}
}
public class BBTaskHandler implements TaskHandler{@Overridepublic void handle() {System.out.println("BB定时任务执行");}
}
数据库数据新增
我们在逻辑里已经编写了两个业务逻辑,那么对应我们的表数据如下:
INSERT INTO primedata.task_config
(handle, task_id, task_name, module_name, enable, task_run_type, task_run_fix, task_run_corn, create_user, delete_flag, create_time, update_user, update_time)
VALUES('111', 'AA', 'AA', 'AA', 1, 'FIX', 1000, NULL, NULL, 0, NULL, NULL, NULL);INSERT INTO primedata.task_config
(handle, task_id, task_name, module_name, enable, task_run_type, task_run_fix, task_run_corn, create_user, delete_flag, create_time, update_user, update_time)
VALUES('222', 'BB', 'BB', 'BB', 1, 'CRON', NULL, '0/1 * * * * ? ', NULL, 0, NULL, NULL, NULL);
也就是
新增一个AA
任务ID,挂载在AA
任务分组下,使用固定速率每1秒执行一次
新增一个BB
任务ID,挂载在BB
任务分组下,使用CRON表达式每1秒执行一次
此时启动项目,控制台输出:
AA和BB正常执行
假如此时修改AA任务ID,变成3秒执行一次:
等待更新配置之后:
可以看到执行逻辑已经
更新为3秒一次
更新关闭BB任务ID:
重要
: 如上,我们只要更新了任务的状态
,以及任务的执行时间,便能够简单快捷的实现定时任务的控制。
另外再额外配合开发一个页面做增删改查
,便可以实现界面管理我们的定时任务调度
结语:
如上便是我们的springboot+Quartz通过数据库控制定时任务执行与时间具体的实现过程,如有遗漏后续会更新