数据库表结构:
YyJobInfo:
public class YyJobInfo {//定时任务idprivate int id;//该定时任务所属的执行器的idprivate int jobGroup;//定时任务描述private String jobDesc;//定时任务添加的时间private Date addTime;//定时任务的更新时间private Date updateTime;//负责人private String author;//报警邮件private String alarmEmail;//调度类型private String scheduleType;//一般为调度的cron表达式private String scheduleConf;//定时任务的失败策略private String misfireStrategy;//定时任务的路由策略private String executorRouteStrategy;//JobHandler的名称private String executorHandler;//执行器参数private String executorParam;//定时任务阻塞策略private String executorBlockStrategy;//执行超时时间private int executorTimeout;//失败重试次数private int executorFailRetryCount;//定时任务运行类型private String glueType;//glue的源码private String glueSource;//glue备注private String glueRemark;//glue更新时间private Date glueUpdatetime;//子任务idprivate String childJobId;//定时任务触发状态,0为停止,1为运行private int triggerStatus;//最近一次的触发时间private long triggerLastTime;//下一次的触发时间private long triggerNextTime;
}
项目分层:
核心代码:
com.yy.yyjob.admin.core.thread.JobScheduleHelper
任务调度代码(有些逻辑采用模拟逻辑,后续会逐渐补全):
@Component
public class JobScheduleHelper {@Resourceprivate YyJobInfoDao yyJobInfoDao;// 调度定时任务的线程private Thread scheduleThread;// 创建当前类的对象private static JobScheduleHelper instance = new JobScheduleHelper();// 把当前类的对象暴露出去public static JobScheduleHelper getInstance(){return instance;}// 启动调度线程工作的方法public void start(){scheduleThread = new Thread(new Runnable() {@Overridepublic void run() {while (true){// 从数据库中查询所有定时任务信息List<YyJobInfo> yyJobInfoList = YyJobAdminConfig.getAdminConfig().getYyJobInfoDao().findAll();// 得到当前时间long time = System.currentTimeMillis();// 遍历所有定时任务信息for (YyJobInfo yyJobInfo : yyJobInfoList) {if (time > yyJobInfo.getTriggerNextTime()){// 如果大于就执行定时任务,在这里就选用集合的第一个地址System.out.println("通知address服务器,去执行定时任务");// 计算定时任务下一次的执行时间Date nextTime = null;try {nextTime = new CronExpression(yyJobInfo.getScheduleConf()).getNextValidTimeAfter(new Date());} catch (ParseException e) {e.printStackTrace();}// 下面就是更新数据库中定时任务的操作YyJobInfo job = new YyJobInfo();job.setTriggerNextTime(nextTime.getTime());System.out.println("保存job信息");}}}}});scheduleThread.start();}
数据库配置代码:
/*** @Description:* 这个类可以说是服务端的启动入口,该类实现了Spring的InitializingBean接口,* 所以该类中的 afterPropertiesSet方法会在容器中的bean初始化完毕后被回掉。回掉的过程中会创建xxl-job中最重要的块慢线程池* 同时也会启动xxl-job中的时间轮*/
@Component
public class YyJobAdminConfig implements InitializingBean, DisposableBean {//当前类的引用,看到这里就应该想到单例模式了//xxl-job中所有组件都是用了单例模式,通过public的静态方法把对象暴露出去private static YyJobAdminConfig adminConfig = null;//获得当前类对象的静态方法public static YyJobAdminConfig getAdminConfig() {return adminConfig;}@Overridepublic void afterPropertiesSet() throws Exception {//为什么这里可以直接赋值呢?还是和spring的bean对象的初始化有关,XxlJobAdminConfig添加了@Component//注解,所以会作为bean被反射创建,创建的时候会调用无参构造器//而afterPropertiesSet方法是在容器所有的bean初始化完成式才会被回掉,所以这时候XxlJobAdminConfig对象已经//创建完成了,直接赋值this就行adminConfig = this;}@Overridepublic void destroy() throws Exception {}@Resourceprivate YyJobInfoDao yyJobInfoDao;@Resourceprivate DataSource dataSource;public YyJobInfoDao getYyJobInfoDao() {return yyJobInfoDao;}public DataSource getDataSource() {return dataSource;}
总结:
目前调度中心单独部署在一台服务器中,两个定时任务程序各自部署在不同的服务器上。调度中心和定时任务程序启动的时候,会先启动调度中心,然后再启动定时任务程序。这样,定时任务程序才能把定时任务的信息和定时任务部署的服务器的ip地址发送给调度中心,然后调度中心把这些信息记录在数据库中。然后,调度中心的调度任务的线程就会开始在一个while循环中不断地扫描数据库,查找数据库能够执行的定时任务,如果有定时任务到了执行时间了,就远程通知执行定时任务的程序执行定时任务,接着在计算定时任务下一次的执行时间,记录时间到数据库中。