从源码角度解读xxl-job的工作流程

剖析xxl-job的源码——了解其实现细节与优化策略

  • 设计思想🧠
  • 服务端-源码探究 🔍
    • 1. 初始化触发器线程池
    • 2. 维护注册信息
    • 3. 运行失败监视器
    • 4. 将丢失主机信息调度日志更改状态
    • 5. 统计一些失败成功报表,删除过期日志
    • 6. 执行调度器[核心]
  • 客户端-源码探究🔍
    • 1. 初始化调度器资源管理器
    • 2. 刷新 GlueFactory
    • 3. 启动类[核心]
      • 3.1 初始化日志文件
      • 3.2 封装调度中心请求路径,用于访问调度中心
      • 3.3 清除过期日志
      • 3.4 回调调度中心任务执行状态
      • 3.5 初始化并且执行内嵌服务
  • 服务注册🐷📖
    • 第一步、客户端向服务端发送注册请求
    • 第二步、服务端接收注册请求
    • 第三步、客户端收到请求
  • 执行任务🔥
    • 第一步、服务端向客户端发送post请求
    • 第二步、客户端接收调度任务
    • 第三步、客户端回调线程通过post请求访问 服务端
    • 第四步、服务端收到回调请求,更改job最终结果
    • 第五步、服务端针对超时任务设置失败结果

设计思想🧠

将调度行为抽象形成“调度中心”公共平台,而平台自身并不承担业务逻辑,“调度中心”负责发起调度请求。

将任务抽象成分散的JobHandler,交由“执行器”统一管理,“执行器”负责接收调度请求并执行对应的JobHandler中业务逻辑。

因此,“调度”和“任务”两部分可以相互解耦,提高系统整体稳定性和扩展性;

  • 调度模块(调度中心)
    通过调度器调度具体执行器,执行器收到调度请求执行JobHandler任务处理器,实时将执行日志返回给调度中心。执行完任务将任务方法执行结果返回给调度中心。调度中心执行回调方法,写入调度日志,进行日志管理。可以生成调度的运行报表,或者调度失败进行告警等等。

    • 负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码。调度系统与任务解耦,提高了系统可用性和稳定性,同时调度系统性能不再受限于任务模块;
    • 支持可视化、简单且动态的管理调度信息,包括任务新建,更新,删除,GLUE开发和任务报警等,所有上述操作都会实时生效,同时支持监控调度结果以及执行日志,支持执行器Failover(故障转移)。
  • 执行模块(执行器)
    执行器就是我们的业务服务,一个执行器中存在多个任务。执行器在启动的时候通过注册信息向调度中心发送注册请求,建立心跳连接,通过执行器管理。执行器中存在多个任务,使用任务管理进行管理。

    • 负责接收调度请求并执行任务逻辑。任务模块专注于任务的执行等操作,开发和维护更加简单和高效;
    • 接收“调度中心”的执行请求、终止请求和日志请求等。

时序图

在这里插入图片描述

系统架构图

在这里插入图片描述

执行流程

在这里插入图片描述

服务端-源码探究 🔍

首先找到配置类 XxlJobScheduler, 可以发现该类实现了InitializingBean接口,在 Spring 容器中,当一个 bean 实现了 InitializingBean 接口时,在 bean 初始化过程中会调用它的 afterPropertiesSet() 方法。这里直接看afterPropertiesSet()方法即可:

@Override
public void afterPropertiesSet() throws Exception {//利用静态声明的只会加载一次的特性,初始化一个单例对象。adminConfig = this;//初始化xxjob调度器xxlJobScheduler = new XxlJobScheduler();xxlJobScheduler.init();
}

xxlJobScheduler.init()

public void init() throws Exception {// init i18ninitI18n();// admin trigger pool start 初始化触发器线程池JobTriggerPoolHelper.toStart();// admin registry monitor run 30秒执行一次,维护注册表信息, 判断在线超时时间90sJobRegistryHelper.getInstance().start();// admin fail-monitor run 运行事变监视器,主要失败发送邮箱,重试触发器JobFailMonitorHelper.getInstance().start();// admin lose-monitor run ( depend on JobTriggerPoolHelper ) 将丢失主机信息调度日志更改状态JobCompleteHelper.getInstance().start();// admin log report start 统计一些失败成功报表JobLogReportHelper.getInstance().start();// start-schedule  ( depend on JobTriggerPoolHelper ) 执行调度器JobScheduleHelper.getInstance().start();logger.info(">>>>>>>>> init xxl-job admin success.");
}

该方法主要做了如下事情:

  1. init i18n,国际化配置的一些东西
  2. 初始化触发器线程池
  3. 维护注册表信息(30秒执行一次)
  4. 将丢失主机信息调度日志更改状态
  5. 统计一些失败成功报表,删除过期日志
  6. 执行调度器

1. 初始化触发器线程池

分别初始化了两个线程池,一个快一个满,优先选择快,当一分钟以内任务超过10次执行时间超过500ms,则加入慢线程池执行。

public static void toStart() {helper.start();
}public void start(){//最大200线程,最多处理1000任务fastTriggerPool = new ThreadPoolExecutor(10,XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),60L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(1000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());}});//最大100线程,最多处理2000任务//一分钟内超时10次,则采用慢触发器执行slowTriggerPool = new ThreadPoolExecutor(10,XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),60L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(2000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());}});
}

2. 维护注册信息

该方法中主要做了三件事,

  1. 初始化 注册或删除 线程池
  2. 维护注册表信息, 从注册表中剔除超时90s的注册机器
  3. 更新xxl_job_group执行器地址列表
public void start() {// for registry or remove 初始化注册或者删除线程池registryOrRemoveThreadPool = new ThreadPoolExecutor(2,10,30L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(2000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode());}},new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {r.run();logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now).");}});// for monitor 监控连接,30秒执行一次,维护注册表信息, 判断在线超时时间90sregistryMonitorThread = new Thread(new Runnable() {@Overridepublic void run() {while (!toStop) {try {// auto registry group 查询自动注册的数据(这里如果没添加自动注册的数据,则不会进入该方法,然后删除register表中超时注册数据。)List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);if (groupList != null && !groupList.isEmpty()) {// remove dead address (admin/executor)1、 从注册表中删除超时90s的机器,不分是否自动注册List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());if (ids != null && ids.size() > 0) {//从数据库删除注册机器信息【物理删除】XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);}// fresh online address (admin/executor)// 获取所有在线机器,注册表: 见"xxl_job_registry"表, "执行器" 在进行任务注册时将会周期性维护一条注册记录,// 即机器地址和AppName的绑定关系; "调度中心" 从而可以动态感知每个AppName在线的机器列表;HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();//维护注册表注册key和注册value// 不分是否自动注册List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());if (list != null) {for (XxlJobRegistry item : list) {// 2、将注册类型为EXECUTOR的XxlJobRegistry集合改装成appname=>设置触发器的ip地址if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {//AppName: 每个执行器机器集群的唯一标示, 任务注册以 "执行器" 为最小粒度进行注册; 每个任务通过其绑定的执行器可感知对应的执行器机器列表;String appname = item.getRegistryKey();List<String> registryList = appAddressMap.get(appname);if (registryList == null) {registryList = new ArrayList<String>();}if (!registryList.contains(item.getRegistryValue())) {registryList.add(item.getRegistryValue());}appAddressMap.put(appname, registryList);}}}// fresh group address 3、更新xxl_job_group执行器地址列表for (XxlJobGroup group : groupList) {List<String> registryList = appAddressMap.get(group.getAppname());String addressListStr = null;//将所有配置触发器的ip地址,使用,拼接if (registryList != null && !registryList.isEmpty()) {Collections.sort(registryList);StringBuilder addressListSB = new StringBuilder();for (String item : registryList) {addressListSB.append(item).append(",");}addressListStr = addressListSB.toString();addressListStr = addressListStr.substring(0, addressListStr.length() - 1);}//更新设置了触发器的ip地址group.setAddressList(addressListStr);//更新修改时间group.setUpdateTime(new Date());//将注册表中appname对应的多条ip地址,整成appname-> ips(IP1,IP2,IP3)格式存储xxl_job_group表中,只针对自动注册。XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);}}} catch (Exception e) {if (!toStop) {logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);}}try {TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);} catch (InterruptedException e) {if (!toStop) {logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);}}}logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");}});registryMonitorThread.setDaemon(true);registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");registryMonitorThread.start();
}

3. 运行失败监视器

主要就是失败重试和告警!

  1. 查询状态为失败的调度日志记录(调度日志表: 用于保存XXL-JOB任务调度的历史信息,如调度结果、执行结果、调度入参、调度机器和执行器等等;)
  2. 轮训执行失败重试和告警。
public void start(){// 这里判断失败有2种情况(trigger_code表示调度中心调用执行器状态,handle_code表示执行器执行结束后回调给调度中心的状态,200均标识成功,500标识失败)monitorThread = new Thread(new Runnable() {@Overridepublic void run() {// monitorwhile (!toStop) {try {// 获取执行失败的日志 调度日志表: 用于保存XXL-JOB任务调度的历史信息,如调度结果、执行结果、调度入参、调度机器和执行器等等;List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);if (failLogIds!=null && !failLogIds.isEmpty()) {for (long failLogId: failLogIds) {// lock log 加锁,乐观修改alarm_status=-1int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);if (lockRet < 1) {continue;}//获取失败日志具体信息XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);//加载job信息XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());// 1、fail retry monitor 失败重试监听器,若可重试次数>0,则再次执行触发器if (log.getExecutorFailRetryCount() > 0) {JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";log.setTriggerMsg(log.getTriggerMsg() + retryMsg);//修改触发器执行信息XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);}// 2、fail alarm monitor 失败警告监视器int newAlarmStatus = 0;       // 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败if (info != null) {//若设置报警邮箱,则执行报警boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);newAlarmStatus = alarmResult?2:3;} else {//没设置报警邮箱,则更改状态为不需要告警newAlarmStatus = 1;}// 释放锁XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);}}} catch (Exception e) {if (!toStop) {logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);}}try {TimeUnit.SECONDS.sleep(10);} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}}logger.info(">>>>>>>>>>> xxl-job, job fail monitor thread stop");}});monitorThread.setDaemon(true);monitorThread.setName("xxl-job, admin JobFailMonitorHelper");monitorThread.start();
}

4. 将丢失主机信息调度日志更改状态

调度记录停留在 “运行中” 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;

public void start(){// for callback 针对回调函数处理的线程池callbackThreadPool = new ThreadPoolExecutor(2,20,30L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(3000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, admin JobLosedMonitorHelper-callbackThreadPool-" + r.hashCode());}},new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {r.run();logger.warn(">>>>>>>>>>> xxl-job, callback too fast, match threadpool rejected handler(run now).");}});// for monitormonitorThread = new Thread(new Runnable() {@Overridepublic void run() {// wait for JobTriggerPoolHelper-inittry {TimeUnit.MILLISECONDS.sleep(50);} catch (InterruptedException e) {if (!toStop) {logger.error(e.getMessage(), e);}}// monitorwhile (!toStop) {try {// 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;Date losedTime = DateUtil.addMinutes(new Date(), -10);//调度日志表: 用于保存XXL-JOB任务调度的历史信息,如调度结果、执行结果、调度入参、调度机器和执行器等等;List<Long> losedJobIds  = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);if (losedJobIds!=null && losedJobIds.size()>0) {for (Long logId: losedJobIds) {XxlJobLog jobLog = new XxlJobLog();jobLog.setId(logId);jobLog.setHandleTime(new Date());jobLog.setHandleCode(ReturnT.FAIL_CODE);jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") );//更改处理状态XxlJobCompleter.updateHandleInfoAndFinish(jobLog);}}} catch (Exception e) {if (!toStop) {logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);}}try {TimeUnit.SECONDS.sleep(60);} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}}logger.info(">>>>>>>>>>> xxl-job, JobLosedMonitorHelper stop");}});monitorThread.setDaemon(true);monitorThread.setName("xxl-job, admin JobLosedMonitorHelper");monitorThread.start();
}

5. 统计一些失败成功报表,删除过期日志

public void start(){logrThread = new Thread(new Runnable() {//每分钟刷新一次@Overridepublic void run() {// last clean log time 记录上次清除日志时间long lastCleanLogTime = 0;while (!toStop) {// 1、log-report refresh: refresh log report in 3 daystry {for (int i = 0; i < 3; i++) {// today 分别统计今天,昨天,前天0~24点的数据Calendar itemDay = Calendar.getInstance();itemDay.add(Calendar.DAY_OF_MONTH, -i);itemDay.set(Calendar.HOUR_OF_DAY, 0);itemDay.set(Calendar.MINUTE, 0);itemDay.set(Calendar.SECOND, 0);itemDay.set(Calendar.MILLISECOND, 0);Date todayFrom = itemDay.getTime();itemDay.set(Calendar.HOUR_OF_DAY, 23);itemDay.set(Calendar.MINUTE, 59);itemDay.set(Calendar.SECOND, 59);itemDay.set(Calendar.MILLISECOND, 999);Date todayTo = itemDay.getTime();// refresh log-report every minute//设置默认值XxlJobLogReport xxlJobLogReport = new XxlJobLogReport();xxlJobLogReport.setTriggerDay(todayFrom);xxlJobLogReport.setRunningCount(0);xxlJobLogReport.setSucCount(0);xxlJobLogReport.setFailCount(0);//查询失败, 成功,总的调用次数Map<String, Object> triggerCountMap = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLogReport(todayFrom, todayTo);if (triggerCountMap!=null && triggerCountMap.size()>0) {int triggerDayCount = triggerCountMap.containsKey("triggerDayCount")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCount"))):0;int triggerDayCountRunning = triggerCountMap.containsKey("triggerDayCountRunning")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountRunning"))):0;int triggerDayCountSuc = triggerCountMap.containsKey("triggerDayCountSuc")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountSuc"))):0;int triggerDayCountFail = triggerDayCount - triggerDayCountRunning - triggerDayCountSuc;xxlJobLogReport.setRunningCount(triggerDayCountRunning);xxlJobLogReport.setSucCount(triggerDayCountSuc);xxlJobLogReport.setFailCount(triggerDayCountFail);}// do refresh//刷新调用次数,若找不到则默认都是0int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().update(xxlJobLogReport);if (ret < 1) {//没数据则保存XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().save(xxlJobLogReport);}}} catch (Exception e) {if (!toStop) {logger.error(">>>>>>>>>>> xxl-job, job log report thread error:{}", e);}}// 2、log-clean: switch open & once each day 设置了保留日志天数且日志保留了24小时,则进入if (XxlJobAdminConfig.getAdminConfig().getLogretentiondays()>0&& System.currentTimeMillis() - lastCleanLogTime > 24*60*60*1000) {// expire-time 通过日志保留天数算出清除log时间Calendar expiredDay = Calendar.getInstance();expiredDay.add(Calendar.DAY_OF_MONTH, -1 * XxlJobAdminConfig.getAdminConfig().getLogretentiondays());expiredDay.set(Calendar.HOUR_OF_DAY, 0);expiredDay.set(Calendar.MINUTE, 0);expiredDay.set(Calendar.SECOND, 0);expiredDay.set(Calendar.MILLISECOND, 0);Date clearBeforeTime = expiredDay.getTime();// clean expired logList<Long> logIds = null;do {//这里传了3个0表示查询所有,而不是单个任务idlogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findClearLogIds(0, 0, clearBeforeTime, 0, 1000);//删除过期数据if (logIds!=null && logIds.size()>0) {XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().clearLog(logIds);}} while (logIds!=null && logIds.size()>0);// update clean timelastCleanLogTime = System.currentTimeMillis();}try {TimeUnit.MINUTES.sleep(1);} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}}logger.info(">>>>>>>>>>> xxl-job, job log report thread stop");}});logrThread.setDaemon(true);logrThread.setName("xxl-job, admin JobLogReportHelper");logrThread.start();
}

6. 执行调度器[核心]

scheduleThread,首先会加锁,然后查询出下次执行时间在未来5秒以内的所有任务,一次最多获取6000条。然后根据过期时间会分成三种对应处理。

  1. 触发器过期时间 > 5s(当前时间 > 任务下次调度时间 + 5s )
    1. 根据任务调度过期策略判断,如果是 FIRE_ONCE_NOW,即 立即执行一次
    2. 从当前时间开始重新计算下次触发时间;
  2. 触发器过期时间 < 5s (当前时间 > 任务下次调度时间 )
    1. 执行触发器
    2. 更新下次触发时间
    3. 下次触发时间在当前时间往后5s范围内,则加入到 ringData,并再次计算更新下次执行时间
  3. 未来五秒内执行的任务
    1. 任务添加到ringData
    2. 更新下次执行时间

ringThread会一秒执行一次,从ringData获取数据。ringData本质是一个ConcurrentHashMap,容量60,key => 触发器下次执行时间(秒为单位)%60,value => 触发器jobId。这里采用了时间轮的思想,定时任务一秒执行一次,会以当前时间秒往前递推2秒。将数据从ringData取出来,然后执行任务。比如现在时间为0点0分0秒,当前时间秒为0,则会将ringData中索引为59和58的数据捞出来。xxl-job这样做是为了避免处理耗时太长,所以会跨过刻度,多向前校验一个刻度。

在这里插入图片描述

public void start(){// schedule threadscheduleThread = new Thread(new Runnable() {@Overridepublic void run() {try {//下5秒之后执行一次,等待服务器启动。TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );} catch (InterruptedException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}logger.info(">>>>>>>>> init xxl-job admin scheduler success.");// pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20) 每个触发器花费50ms,每个线程单位时间内处理20任务,最多同时处理300*20=6000任务int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;while (!scheduleThreadToStop) {// Scan Joblong start = System.currentTimeMillis();Connection conn = null;Boolean connAutoCommit = null;PreparedStatement preparedStatement = null;boolean preReadSuc = true;try {//设置手动提交conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();connAutoCommit = conn.getAutoCommit();conn.setAutoCommit(false);//获取任务调度锁表内数据信息,加写锁preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );preparedStatement.execute();// tx start// 1、pre readlong nowTime = System.currentTimeMillis();//获取当前时间后5秒,同时最多负载的分页数List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);if (scheduleList!=null && scheduleList.size()>0) {// 2、push time-ringfor (XxlJobInfo jobInfo: scheduleList) {// time-ring jump 触发器过期时间>5sif (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {// 2.1、trigger-expire > 5s:pass && make next-trigger-timelogger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());// 1、misfire match- 调度过期策略://   - 忽略:调度过期后,忽略过期的任务,从当前时间开始重新计算下次触发时间;//   - 立即执行一次:调度过期后,立即执行一次,并从当前时间开始重新计算下次触发时间;MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {// FIRE_ONCE_NOW 》 trigger 执行触发器JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );}// 2、fresh next 更新下次执行时间refreshNextValidTime(jobInfo, new Date());} else if (nowTime > jobInfo.getTriggerNextTime()) {// 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time// 1、触发器过期时间<5s,trigger 执行触发器JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );// 2、fresh next 更新下次执行时间refreshNextValidTime(jobInfo, new Date());// next-trigger-time in 5s, pre-read again 下次触发时间在当前时间往后5秒范围内if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {// 1、make ring second 获取下次执行秒int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);// 2、push time ringpushTimeRing(ringSecond, jobInfo.getId());// 3、fresh next 更新下次执行时间refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}} else {// 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time 未来五秒以内执行的所有任务添加到ringData// 1、make ring secondint ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);// 2、push time ringpushTimeRing(ringSecond, jobInfo.getId());// 3、fresh nextrefreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}}// 3、update trigger info 更新执行时间和上次执行时间到数据库for (XxlJobInfo jobInfo: scheduleList) {XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);}} else {preReadSuc = false;}// tx stop} catch (Exception e) {if (!scheduleThreadToStop) {logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);}} finally {// commitif (conn != null) {try {conn.commit();} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}try {conn.setAutoCommit(connAutoCommit);} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}try {conn.close();} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}}// close PreparedStatementif (null != preparedStatement) {try {preparedStatement.close();} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}}}long cost = System.currentTimeMillis()-start;// Wait seconds, align secondif (cost < 1000) {  // scan-overtime, not waittry {// pre-read period: success > scan each second; fail > skip this period; 若执行成功,下一秒继续执行。执行失败或没查询出数据则5秒执行一次。TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);} catch (InterruptedException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}}}logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");}});scheduleThread.setDaemon(true);scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");scheduleThread.start();// ring threadringThread = new Thread(new Runnable() {@Overridepublic void run() {while (!ringThreadToStop) {// align secondtry {TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);} catch (InterruptedException e) {if (!ringThreadToStop) {logger.error(e.getMessage(), e);}}try {// second dataList<Integer> ringItemData = new ArrayList<>();int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免处理耗时太长,跨过刻度,向前校验一个刻度;for (int i = 0; i < 2; i++) {List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );if (tmpData != null) {ringItemData.addAll(tmpData);}}// ring triggerlogger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );if (ringItemData.size() > 0) {// do triggerfor (int jobId: ringItemData) {// do triggerJobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);}// clearringItemData.clear();}} catch (Exception e) {if (!ringThreadToStop) {logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);}}}logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");}});ringThread.setDaemon(true);ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");ringThread.start();
}

客户端-源码探究🔍

查看项目的XXLJOB配置,XxlJobConfig类,将XxlJobSpringExecutor交由Spring容器管理

在这里插入图片描述

通过类图可见,实现了 SmartInitializingSingleton 接口,实现该接口的当spring容器初始完成,紧接着执行监听器发送监听后,就会遍历所有的Bean然后初始化所有单例非懒加载的bean,最后在实例化阶段结束时触发回调接口。

在这里插入图片描述

so,查看 afterSingletonsInstantiated() 实现方法

@Override
public void afterSingletonsInstantiated() {// init JobHandler Repository/*initJobHandlerRepository(applicationContext);*/// init JobHandler Repository (for method) 初始化调度器资源管理器initJobHandlerMethodRepository(applicationContext);// refresh GlueFactory 刷新GlueFactoryGlueFactory.refreshInstance(1);// super starttry {super.start();} catch (Exception e) {throw new RuntimeException(e);}
}
  1. 初始化调度器资源管理器(从spring容器中将标记了XxlJob注解的方法,将其封装并添加到map中。)
  2. 刷新GlueFactory
  3. 启动服务,接收服务器请求。等

1. 初始化调度器资源管理器

  1. 从spring容器获取所有对象,并遍历查找方法上标记XxlJob注解的方法
  2. 将xxljob配置的jobname作为key,对象,反射的执行,初始,销毁方法作为value注册jobHandlerRepository 中
private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {if (applicationContext == null) {return;}// init job handler from method 获取指定Object类型Bean名称集合String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);// 遍历每个容器对象for (String beanDefinitionName : beanDefinitionNames) {Object bean = applicationContext.getBean(beanDefinitionName);Map<Method, XxlJob> annotatedMethods = null;   // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBeantry {// 获取每个加上注解XxlJob方法annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),new MethodIntrospector.MetadataLookup<XxlJob>() {@Overridepublic XxlJob inspect(Method method) {return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);}});} catch (Throwable ex) {logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);}if (annotatedMethods==null || annotatedMethods.isEmpty()) {continue;}//遍历标记了XxlJob注解的方法for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {Method executeMethod = methodXxlJobEntry.getKey();XxlJob xxlJob = methodXxlJobEntry.getValue();// regist 调用注册任务执行器registJobHandler(xxlJob, bean, executeMethod);}}
}
protected void registJobHandler(XxlJob xxlJob, Object bean, Method executeMethod){if (xxlJob == null) {return;}// 获取配置xxjob的触发器名称String name = xxlJob.value();//make and simplify the variables since they'll be called several times laterClass<?> clazz = bean.getClass();String methodName = executeMethod.getName();if (name.trim().length() == 0) {throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + clazz + "#" + methodName + "] .");}// 判断工作处理资源库是否有相同命名if (loadJobHandler(name) != null) {throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");}// execute method/*if (!(method.getParameterTypes().length == 1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) {throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +"The correct method format like \" public ReturnT<String> execute(String param) \" .");}if (!method.getReturnType().isAssignableFrom(ReturnT.class)) {throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +"The correct method format like \" public ReturnT<String> execute(String param) \" .");}*/executeMethod.setAccessible(true);// init and destroyMethod initMethod = null;Method destroyMethod = null;if (xxlJob.init().trim().length() > 0) {try {// 获取XxlJob标记的方法,配置的init方法initMethod = clazz.getDeclaredMethod(xxlJob.init());initMethod.setAccessible(true);} catch (NoSuchMethodException e) {throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + clazz + "#" + methodName + "] .");}}if (xxlJob.destroy().trim().length() > 0) {try {//获取XxlJob标记的方法,配置的destroy方法destroyMethod = clazz.getDeclaredMethod(xxlJob.destroy());destroyMethod.setAccessible(true);} catch (NoSuchMethodException e) {throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + clazz + "#" + methodName + "] .");}}// registry jobhandler // 将xxljob配置的jobname作为key,对象,反射的执行,初始,销毁方法作为value 注册 jobHandlerRepository中registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));}

2. 刷新 GlueFactory

public static void refreshInstance(int type){if (type == 0) {glueFactory = new GlueFactory();} else if (type == 1) {glueFactory = new SpringGlueFactory();}
}

3. 启动类[核心]

public void start() throws Exception {// init logpath 初始化日志文件XxlJobFileAppender.initLogPath(logPath);// init invoker, admin-client 初始化 admin 链接路径存储集合(封装调度中心请求路径,用于访问调度中心)initAdminBizList(adminAddresses, accessToken);// init JobLogFileCleanThread 清除过期日志JobLogFileCleanThread.getInstance().start(logRetentionDays);// init TriggerCallbackThread 回调调度中心任务执行状态TriggerCallbackThread.getInstance().start();// init executor-server 执行内嵌服务initEmbedServer(address, ip, port, appname, accessToken);
}

3.1 初始化日志文件

public static void initLogPath(String logPath){// init 读取配置拼接日志文件路径if (logPath!=null && logPath.trim().length()>0) {logBasePath = logPath;}// mk base dir  日志文件不存在则创建File logPathDir = new File(logBasePath);if (!logPathDir.exists()) {logPathDir.mkdirs();}logBasePath = logPathDir.getPath();// mk glue dir 创建glue目录File glueBaseDir = new File(logPathDir, "gluesource");if (!glueBaseDir.exists()) {glueBaseDir.mkdirs();}glueSrcPath = glueBaseDir.getPath();
}

3.2 封装调度中心请求路径,用于访问调度中心

private static List<AdminBiz> adminBizList;
private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {if (adminAddresses!=null && adminAddresses.trim().length()>0) {//多个admin地址以,分隔for (String address: adminAddresses.trim().split(",")) {if (address!=null && address.trim().length()>0) {AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);if (adminBizList == null) {adminBizList = new ArrayList<AdminBiz>();}//将admin地址以及token添加adminBiz中adminBizList.add(adminBiz);}}}
}

3.3 清除过期日志

public void start(final long logRetentionDays){// limit min value 日志最大保存天数<3天,直接退出if (logRetentionDays < 3 ) {return;}//一天执行一次localThread = new Thread(new Runnable() {@Overridepublic void run() {while (!toStop) {try {// clean log dir, over logRetentionDays  查询目录下所有子文件(包含目录)File[] childDirs = new File(XxlJobFileAppender.getLogPath()).listFiles();if (childDirs!=null && childDirs.length>0) {// today 获取今天0点时间Calendar todayCal = Calendar.getInstance();todayCal.set(Calendar.HOUR_OF_DAY,0);todayCal.set(Calendar.MINUTE,0);todayCal.set(Calendar.SECOND,0);todayCal.set(Calendar.MILLISECOND,0);Date todayDate = todayCal.getTime();for (File childFile: childDirs) {// valid 不是目录跳过if (!childFile.isDirectory()) {continue;} //查询不到'-'则跳过if (childFile.getName().indexOf("-") == -1) {continue;}// file create date 获取文件创建时间,文件都是以年-月-日命名的Date logFileCreateDate = null;try {SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");logFileCreateDate = simpleDateFormat.parse(childFile.getName());} catch (ParseException e) {logger.error(e.getMessage(), e);}if (logFileCreateDate == null) {continue;}//大于日志最大存活时间则清除if ((todayDate.getTime()-logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000) ) {FileUtil.deleteRecursively(childFile);}}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}try {//睡眠一天处理TimeUnit.DAYS.sleep(1);} catch (InterruptedException e) {if (!toStop) {logger.error(e.getMessage(), e);}}}logger.info(">>>>>>>>>>> xxl-job, executor JobLogFileCleanThread thread destroy.");}});localThread.setDaemon(true);localThread.setName("xxl-job, executor JobLogFileCleanThread");localThread.start();
}

3.4 回调调度中心任务执行状态

public void start() {// valid 是否有配置admin路径if (XxlJobExecutor.getAdminBizList() == null) {logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null.");return;}// callbacktriggerCallbackThread = new Thread(new Runnable() {@Overridepublic void run() {// normal callbackwhile(!toStop){try {//获取回调参数HandleCallbackParam callback = getInstance().callBackQueue.take();if (callback != null) {// callback list paramList<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();// 移除队列中所有元素到callbackParamList中int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);callbackParamList.add(callback);// callback, will retry if error 通知adminif (callbackParamList!=null && callbackParamList.size()>0) {doCallback(callbackParamList);}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}}// last callbacktry {List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);if (callbackParamList!=null && callbackParamList.size()>0) {doCallback(callbackParamList);}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}logger.info(">>>>>>>>>>> xxl-job, executor callback thread destroy.");}});triggerCallbackThread.setDaemon(true);triggerCallbackThread.setName("xxl-job, executor TriggerCallbackThread");triggerCallbackThread.start();// retrytriggerRetryCallbackThread = new Thread(new Runnable() {@Overridepublic void run() {while(!toStop){try {retryFailCallbackFile();} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}try {TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);} catch (InterruptedException e) {if (!toStop) {logger.error(e.getMessage(), e);}}}logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destroy.");}});triggerRetryCallbackThread.setDaemon(true);triggerRetryCallbackThread.start();}

3.5 初始化并且执行内嵌服务

该方法主要是初始了内嵌服务并且启动,内嵌服务主要是基于Netty实现通信,对外暴露端口,并且向服务端注册且维护心跳以及向服务端申请服务下线等业务。

private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {// fill ip port 获取端口port = port>0?port: NetUtil.findAvailablePort(9999);ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();// generate address 获取admin地址if (address==null || address.trim().length()==0) {String ip_port_address = IpUtil.getIpPort(ip, port);   // registry-address:default use address to registry , otherwise use ip:port if address is nulladdress = "http://{ip_port}/".replace("{ip_port}", ip_port_address);}// accessTokenif (accessToken==null || accessToken.trim().length()==0) {logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");}// start 启动内嵌服务器embedServer = new EmbedServer();embedServer.start(address, port, appname, accessToken);
}
public void start(final String address, final int port, final String appname, final String accessToken) {executorBiz = new ExecutorBizImpl();thread = new Thread(new Runnable() {@Overridepublic void run() {// paramEventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(0,200,60L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(2000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, EmbedServer bizThreadPool-" + r.hashCode());}},new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");}});try {// start serverServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel channel) throws Exception {channel.pipeline()//  校验协议规范.addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle 解码.addLast(new HttpServerCodec()).addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));}}).childOption(ChannelOption.SO_KEEPALIVE, true);// bind 绑定端口ChannelFuture future = bootstrap.bind(port).sync();logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);// start registry 注册服务,30s执行一次startRegistry(appname, address);// wait util stopfuture.channel().closeFuture().sync();} catch (InterruptedException e) {logger.info(">>>>>>>>>>> xxl-job remoting server stop.");} catch (Exception e) {logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);} finally {// stoptry {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();} catch (Exception e) {logger.error(e.getMessage(), e);}}}});thread.setDaemon(true);    // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leavethread.start();
}
public void startRegistry(final String appname, final String address) {// start registryExecutorRegistryThread.getInstance().start(appname, address);
}

服务注册🐷📖

第一步、客户端向服务端发送注册请求

客户端启动注册线程(30s执行一次),向服务端发送注册请求。com.xxl.job.core.thread.ExecutorRegistryThread#start

在这里插入图片描述

第二步、服务端接收注册请求

进行异步注册(维护xxl_job_registry注册表信息,未注册则新增记录,已注册则更新时间),返回结果TRUE

在这里插入图片描述

第三步、客户端收到请求

客户端收到请求,打印请求日志

在这里插入图片描述

执行任务🔥

流程大致如下:

  1. 服务端 向客户端发起post请求

  2. 客户端 通过内嵌服务netty接收,异步线程处理。找到job绑定的线程,将任务丢到阻塞队列中。

  3. 客户端 返回结果给 服务端。

  4. 服务端 收到结果,更改任务状态。

  5. 客户端执行任务后,将执行结果丢到回调线程的阻塞队列处理。

  6. 回调线程通过post请求访问 服务端,服务端 更改job最终结果。

    倘若超过10分钟调度中心没收到回调线程的请求,则设置job最终结果失败。

第一步、服务端向客户端发送post请求

主动触发和手动触发最终都会调用该方法com.xxl.job.admin.core.thread.JobTriggerPoolHelper#trigger

public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
}
public static void trigger(int jobId,TriggerTypeEnum triggerType,int failRetryCount,String executorShardingParam,String executorParam,String addressList) {// load data 从数据库获取任务详情信息XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);if (jobInfo == null) {logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);return;}if (executorParam != null) {// 设置任务参数jobInfo.setExecutorParam(executorParam);}// 获取失败重试次数int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();// 获取job分组信息XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());// cover addressList 设置地址集合if (addressList!=null && addressList.trim().length()>0) {group.setAddressType(1);group.setAddressList(addressList.trim());}// sharding param 拆分executorParam任务参数,填入shardingParam数组int[] shardingParam = null;if (executorShardingParam!=null){String[] shardingArr = executorShardingParam.split("/");if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {shardingParam = new int[2];shardingParam[0] = Integer.valueOf(shardingArr[0]);shardingParam[1] = Integer.valueOf(shardingArr[1]);}}if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)&& group.getRegistryList()!=null && !group.getRegistryList().isEmpty()&& shardingParam==null) {//如果路由策略是分片广播模式,同时注册地址不为空时,遍历执行每个注册地址,集群广播for (int i = 0; i < group.getRegistryList().size(); i++) {processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());}} else {if (shardingParam == null) {shardingParam = new int[]{0, 1};}// 执行触发器processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);}}
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){// param  获取阻塞处理策略ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);  // block strategy// 获取路由策略,默认firstExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    // route strategyString shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;// 1、save log-id 保存执行日志XxlJobLog jobLog = new XxlJobLog();jobLog.setJobGroup(jobInfo.getJobGroup());jobLog.setJobId(jobInfo.getId());jobLog.setTriggerTime(new Date());XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());// 2、init trigger-paramTriggerParam triggerParam = new TriggerParam();triggerParam.setJobId(jobInfo.getId());triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());triggerParam.setExecutorParams(jobInfo.getExecutorParam());triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());triggerParam.setLogId(jobLog.getId());triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());triggerParam.setGlueType(jobInfo.getGlueType());triggerParam.setGlueSource(jobInfo.getGlueSource());triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());triggerParam.setBroadcastIndex(index);triggerParam.setBroadcastTotal(total);// 3、init address 获取触发器地址String address = null;ReturnT<String> routeAddressResult = null;if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) { //如果是集群广播模式if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {if (index < group.getRegistryList().size()) { //查询匹配地址执行address = group.getRegistryList().get(index);} else { //超过size,则默认执行第一个.address = group.getRegistryList().get(0);}} else {//根据设置的路由策略,执行路由器,获取返回结果routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {address = routeAddressResult.getContent();}}} else { //获取不到注册地址,返回失败值routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));}// 4、trigger remote executorReturnT<String> triggerResult = null;if (address != null) {//这里真正的执行触发器triggerResult = runExecutor(triggerParam, address);} else {//获取不到执行地址直接返回triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);}// 5、collection trigger infoStringBuffer triggerMsgSb = new StringBuffer();triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":").append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());if (shardingParam != null) {triggerMsgSb.append("("+shardingParam+")");}triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>").append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");// 6、save log trigger-info 更新日志状态jobLog.setExecutorAddress(address);jobLog.setExecutorHandler(jobInfo.getExecutorHandler());jobLog.setExecutorParam(jobInfo.getExecutorParam());jobLog.setExecutorShardingParam(shardingParam);jobLog.setExecutorFailRetryCount(finalFailRetryCount);//jobLog.setTriggerTime();//设置执行触发器返回值jobLog.setTriggerCode(triggerResult.getCode());//设置返回结果信息jobLog.setTriggerMsg(triggerMsgSb.toString()); XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
}

通过post请求客户端执行job

在这里插入图片描述

第二步、客户端接收调度任务

客户端 通过内嵌服务netty接收,异步线程处理

在这里插入图片描述

EmbedHttpServerHandler 继承了 ChannelInboundHandlerAdapter ,即入站处理器,主要用来读取客户端数据,写回结果

@Override
public ReturnT<String> run(TriggerParam triggerParam) {// load old:jobHandler + jobThread 获取job绑定线程JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());//获取作业处理器IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;String removeOldReason = null;// valid:jobHandler + jobThread 获取任务运行模式GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());//任务以JobHandler方式维护在执行器端;需要结合 "JobHandler" 属性匹配执行器中任务;if (GlueTypeEnum.BEAN == glueTypeEnum) {// new jobhandler 获取job处理器,也就是方法声明IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());// valid old jobThread 如果job处理器不一样,则kill旧处理器绑定的线程if (jobThread!=null && jobHandler != newJobHandler) {// change handler, need kill old threadremoveOldReason = "change jobhandler or glue type, and terminate the old job thread.";jobThread = null;jobHandler = null;}// valid handler 执行到这儿,要么新旧处理器不一致,要么没有绑定过任何线程if (jobHandler == null) {jobHandler = newJobHandler;//没找到处理器,直接返回异常if (jobHandler == null) {return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");}}} else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {// valid old jobThreadif (jobThread != null &&!(jobThread.getHandler() instanceof GlueJobHandler&& ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {// change handler or gluesource updated, need kill old threadremoveOldReason = "change job source or glue type, and terminate the old job thread.";jobThread = null;jobHandler = null;}// valid handlerif (jobHandler == null) {try {IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());} catch (Exception e) {logger.error(e.getMessage(), e);return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());}}} else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {// valid old jobThreadif (jobThread != null &&!(jobThread.getHandler() instanceof ScriptJobHandler&& ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {// change script or gluesource updated, need kill old threadremoveOldReason = "change job source or glue type, and terminate the old job thread.";jobThread = null;jobHandler = null;}// valid handlerif (jobHandler == null) {jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));}} else {return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");}// executor block strategy 获取任务阻塞策略if (jobThread != null) {ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {// discard when runningif (jobThread.isRunningOrHasQueue()) {return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());}} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {// kill running jobThreadif (jobThread.isRunningOrHasQueue()) {removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();jobThread = null;}} else {// just queue trigger}}// replace thread (new or exists invalid) 作业没绑定过线程,则绑定作业到具体线程if (jobThread == null) {jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);}// push data to queue 任务丢入触发队列处理ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);return pushResult;
}

绑定作业到具体线程,启动线程

public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {// avoid repeatif (triggerLogIdSet.contains(triggerParam.getLogId())) {logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());}triggerLogIdSet.add(triggerParam.getLogId());triggerQueue.add(triggerParam);return ReturnT.SUCCESS;
}

任务丢入触发队列处理

public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {// avoid repeatif (triggerLogIdSet.contains(triggerParam.getLogId())) {logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());}triggerLogIdSet.add(triggerParam.getLogId());triggerQueue.add(triggerParam);return ReturnT.SUCCESS;
}

具体执行触发器逻辑

   @Override
public void run() {// init 执行初始化任务try {handler.init();} catch (Throwable e) {logger.error(e.getMessage(), e);}// executewhile(!toStop){running = false;//统计空闲执行次数idleTimes++;TriggerParam triggerParam = null;try {// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)//获取触发器任务triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);if (triggerParam!=null) {running = true;idleTimes = 0; //空闲次数重置triggerLogIdSet.remove(triggerParam.getLogId()); /删除logId主要用来判断是否重复执行// log filename, like "logPath/yyyy-MM-dd/9999.log" 写入log文件String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());XxlJobContext xxlJobContext = new XxlJobContext(triggerParam.getJobId(),triggerParam.getExecutorParams(),logFileName,triggerParam.getBroadcastIndex(),triggerParam.getBroadcastTotal());// init job contextXxlJobContext.setXxlJobContext(xxlJobContext);// executeXxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());//设置了超时就异步线程处理if (triggerParam.getExecutorTimeout() > 0) {// limit timeoutThread futureThread = null;try {FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {@Overridepublic Boolean call() throws Exception {// init job contextXxlJobContext.setXxlJobContext(xxlJobContext);handler.execute();return true;}});futureThread = new Thread(futureTask);futureThread.start();//异步线程处理并获取返回值Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);} catch (TimeoutException e) {XxlJobHelper.log("<br>----------- xxl-job job execute timeout");XxlJobHelper.log(e);// handle resultXxlJobHelper.handleTimeout("job execute timeout ");} finally {futureThread.interrupt();}} else {// just execute 没设置超时时间,则立刻执行触发器handler.execute();}// valid execute handle dataif (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) {XxlJobHelper.handleFail("job handle result lost.");} else {String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg();tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000)?tempHandleMsg.substring(0, 50000).concat("..."):tempHandleMsg;XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg);}XxlJobHelper.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode="+ XxlJobContext.getXxlJobContext().getHandleCode()+ ", handleMsg = "+ XxlJobContext.getXxlJobContext().getHandleMsg());} else {if (idleTimes > 30) { // 空闲执行次数超过30次,且队列没任务,则删除并终止线程if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lostXxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");}}}} catch (Throwable e) {if (toStop) {XxlJobHelper.log("<br>----------- JobThread toStop, stopReason:" + stopReason);}// handle resultStringWriter stringWriter = new StringWriter();e.printStackTrace(new PrintWriter(stringWriter));String errorMsg = stringWriter.toString();XxlJobHelper.handleFail(errorMsg);XxlJobHelper.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");} finally {if(triggerParam != null) {// callback handler infoif (!toStop) {// commonm 加入回调队列TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.getXxlJobContext().getHandleCode(),XxlJobContext.getXxlJobContext().getHandleMsg() ));} else {// is killedTriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.HANDLE_CODE_FAIL,stopReason + " [job running, killed]" ));}}}}// callback trigger request in queuewhile(triggerQueue !=null && triggerQueue.size()>0){TriggerParam triggerParam = triggerQueue.poll();if (triggerParam!=null) {// is killedTriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.HANDLE_CODE_FAIL,stopReason + " [job not executed, in the job queue, killed.]"));}}// destroy 销毁,执行调度器设置的销毁方法try {handler.destroy();} catch (Throwable e) {logger.error(e.getMessage(), e);}logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
}

com.xxl.job.core.handler.impl.MethodJobHandler#execute 利用反射调用方法:

@Override
public void execute() throws Exception {Class<?>[] paramTypes = method.getParameterTypes();if (paramTypes.length > 0) {method.invoke(target, new Object[paramTypes.length]);       // method-param can not be primitive-types} else {method.invoke(target);}
}

当服务端调用客户端执行后,会直接返回结果

在这里插入图片描述

private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) {// write responseFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8));   //  Unpooled.wrappedBuffer(responseJson)response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=UTF-8");       // HttpHeaderValues.TEXT_PLAIN.toString()response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());if (keepAlive) {response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);}// 写入并刷新ctx.writeAndFlush(response);
}

这里对应着xxl_job_log表trigger_code,200标识调用客户端成功,500标识调用失败(获取不到客户端地址)

第三步、客户端回调线程通过post请求访问 服务端

客户端 返回结果给 服务端当客户端执行完成后,会将处理回调参数任务加入触发器回调处理线程

com.xxl.job.core.thread.JobThread#run

在这里插入图片描述

com.xxl.job.core.thread.TriggerCallbackThread#start 回调调度中心任务执行状态

在这里插入图片描述

第四步、服务端收到回调请求,更改job最终结果

在这里插入图片描述
com.xxl.job.admin.controller.JobApiController#api服务端接口收到请求,调用 com.xxl.job.admin.controller.JobApiController#api 该方法主要就是更新 job日志信息。

private ReturnT<String> callback(HandleCallbackParam handleCallbackParam) {// valid log item 查询任务日志XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(handleCallbackParam.getLogId());if (log == null) {return new ReturnT<String>(ReturnT.FAIL_CODE, "log item not found.");}if (log.getHandleCode() > 0) {return new ReturnT<String>(ReturnT.FAIL_CODE, "log repeate callback.");     // avoid repeat callback, trigger child job etc}// handle msgStringBuffer handleMsg = new StringBuffer();if (log.getHandleMsg()!=null) {handleMsg.append(log.getHandleMsg()).append("<br>");}if (handleCallbackParam.getHandleMsg() != null) {handleMsg.append(handleCallbackParam.getHandleMsg());}// success, save log log.setHandleTime(new Date());log.setHandleCode(handleCallbackParam.getHandleCode());log.setHandleMsg(handleMsg.toString());XxlJobCompleter.updateHandleInfoAndFinish(log);return ReturnT.SUCCESS;
}

第五步、服务端针对超时任务设置失败结果

调度中心调用客户端执行job后,客户端正确返回。当超过10分钟,调度中心没有接收到客户端返回的执行结果,进行处理。 com.xxl.job.admin.core.thread.JobCompleteHelper#start

monitorThread = new Thread(new Runnable() {@Overridepublic void run() {// wait for JobTriggerPoolHelper-inittry {TimeUnit.MILLISECONDS.sleep(50);} catch (InterruptedException e) {if (!toStop) {logger.error(e.getMessage(), e);}}// monitorwhile (!toStop) {try {// 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;Date losedTime = DateUtil.addMinutes(new Date(), -10);//调度日志表: 用于保存XXL-JOB任务调度的历史信息,如调度结果、执行结果、调度入参、调度机器和执行器等等;List<Long> losedJobIds  = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);if (losedJobIds!=null && losedJobIds.size()>0) {for (Long logId: losedJobIds) {XxlJobLog jobLog = new XxlJobLog();jobLog.setId(logId);jobLog.setHandleTime(new Date());jobLog.setHandleCode(ReturnT.FAIL_CODE);jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") );//更改处理状态XxlJobCompleter.updateHandleInfoAndFinish(jobLog);}}} catch (Exception e) {if (!toStop) {logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);}}try {TimeUnit.SECONDS.sleep(60);} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}}logger.info(">>>>>>>>>>> xxl-job, JobLosedMonitorHelper stop");}
});

通过对xxl-job源码的深入解读和分析,我们不仅获得了对该分布式任务调度平台的全面认识,还深入理解了其内部的工作机制和设计原理。我希望通过本篇博客的内容,您对xxl-job的源码有了更深入的了解,并能够将这些知识应用到实际项目中。不断学习和探索源码,将帮助我们成为更优秀的开发者,拥有解决复杂问题的能力。
感谢您的阅读和关注!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/136840.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

保研复习-计算机组成原理

计算机组成原理 计算机组成冯诺依曼体系结构计算机系统的层次结构计算机的五大组成部件编译和解释的区别 CPUCPU的组成寄存器的类型指令类型指令功能指令执行过程 存储器存储器的层次结构寻址方式 输入和输出io方式有哪几种IO接口的基本结构 计算机组成 冯诺依曼体系结构 存储…

从CNN(卷积神经网络),又名CAM获取热图

一、说明 卷积神经网络&#xff08;CNN&#xff09;令人难以置信。如果你想知道它如何看待世界&#xff08;图像&#xff09;&#xff0c;有一种方法是可视化它。 这个想法是&#xff0c;我们从最后的密集层中得到权重&#xff0c;然后乘以最终的CNN层。这需要全局平均…

每日一题~二叉搜索树中的插入操作

题目链接&#xff1a;701. 二叉搜索树中的插入操作 - 力扣&#xff08;LeetCode&#xff09; 题目描述&#xff1a; 思路分析&#xff1a;由题可知&#xff0c;题目的要求是给我们一个二叉搜索树和一个 val&#xff0c;将这个 val 插入到二叉搜索树中&#xff0c;并且这个树仍…

八、实时时钟

八、实时时钟 简介时钟芯片模块代码可调时钟 简介 引脚定义和应用电路 我们的开发板没有备用电池 寄存器定义 时序定义 在时钟的上升沿&#xff0c;IO口的数据被写入到芯片中&#xff0c;在下降沿&#xff0c;芯片就会将数据输出。如果是写入&#xff0c;那么在整个过程中&…

数据优化与可视化:3D开发工具HOOPS在BIM模型轻量化中的作用分析

在建筑和工程领域&#xff0c;BIM&#xff08;建筑信息建模&#xff09;是一种重要的数字化工具&#xff0c;但大型BIM模型往往需要大量的计算资源和存储空间。为了解决这一问题&#xff0c;HOOPS技术成为了一种关键工具&#xff0c;可以帮助实现BIM模型轻量化&#xff0c;提高…

面试题三:请你谈一谈Vue中的filter功能的实现

Vue中过滤器(filter)的使用 我们想一下有methods为什么要有filter的存在呢&#xff0c;因为filter的实现效率比methods要高的多。 看一下官方定义&#xff1a; Vue.js 允许你自定义过滤器&#xff0c;可被用于一些常见的文本格式化。过滤器可以用在两个地方&#xff1a;双花括号…

【完美解决】GitHub连接超时问题 Recv failure: Connection was reset

问题&#xff1a; 已经开了梯子但是在Idea中使用git&#xff08;GitHub&#xff09;还是连接超时Recv failure: Connection was reset。此时需要让git走代理。 解决方案&#xff1a; 1.对右下角网络点击右键 -> 打开网络和Internet设置 2.代理 -> 查看到地址和端口号…

论文阅读之Learning and Generalization of Motor Skills by Learning from Demonstration

论文阅读其实就是用自己的话讲一遍&#xff0c;然后理解其中的方法 0、论文基本信息 为什么阅读此篇论文&#xff1a;因为它是DMP经典论文&#xff0c;被引多次&#xff0c;学史可以明智&#xff0c;了解最初机理。 论文题目&#xff1a;Learning and Generalization of Moto…

广东白云学院《乡村振兴战略下传统村落文化旅游设计》许少辉八一著作——2023学生开学季辉少许

广东白云学院《乡村振兴战略下传统村落文化旅游设计》许少辉八一著作——2023学生开学季辉少许

手机机型响应式设置2

window.screen.height&#xff1a;屏幕高度 window.innerHeight&#xff1a;视口高度&#xff08;去除浏览器头尾的高度&#xff09; document.body.clientHeight&#xff1a;内容高度 vh&#xff1a;网页视口高度的1/100 vw&#xff1a;网页视口宽度的1/100 vmax&#xff…

GIF动图怎么变成jpg动图?一键分解GIF动画

GIF格式图片怎么转换成jpg格式图片&#xff1f;在日常生活中jpg、png转GIF格式非常的常见&#xff0c;那么gif转换成jpg格式应该怎么操作呢&#xff1f;很简单&#xff0c;给大家分享一款gif动态图片制作&#xff08;https://www.gif.cn/giffenjie&#xff09;工具&#xff0c;…

OpenCV实现的F矩阵+RANSAC原理与实践

1 RANSAC 筛选 1.1 大致原理 Random sample consensus (RANSAC)&#xff0c;即随机抽样一致性&#xff0c;其是一种用于估计模型参数的迭代方法&#xff0c;特别适用于处理包含离群点&#xff08;outliers&#xff09;的数据集 RANSAC 的主要思想是随机采样数据点&#xff0…

前端自定义导出PPT

1、背景 前端导出PPT&#xff0c;刚接触这个需求&#xff0c;还是比较懵逼&#xff0c;然后就在网上查找资料&#xff0c;最终确认是可行的&#xff1b;这个需求也是合理的&#xff0c;我们做了一个可视化数据报表&#xff0c;报表导出成PPT&#xff0c;将在线报表转成文档类型…

02、Servlet核心技术(下)

目录 1 ServletJDBC应用&#xff08;重点&#xff09; 2 重定向和转发&#xff08;重点&#xff09; 2.1 重定向的概述 2.2 转发的概述 3 Servlet线程安全&#xff08;重点&#xff09; 4 状态管理&#xff08;重点 &#xff09; 5 Cookie技术&#xff08;重点&#xf…

操作系统(5-7分)

内容概述 进程管理 进程的状态 前驱图 同步和互斥 PV操作&#xff08;难点&#xff09; PV操作由P操作原语和V操作原语组成&#xff08;原语是不可中断的过程&#xff09;&#xff0c;对信号量进行操作&#xff0c;具体定义如下&#xff1a; P&#xff08;S&#xff09;&#…

Selenium自动化测试 —— 通过cookie绕过验证码的操作

验证码的处理 对于web应用&#xff0c;很多地方比如登录、发帖都需要输入验证码&#xff0c;类型也多种多样&#xff1b;登录/核心操作过程中&#xff0c;系统会产生随机的验证码图片&#xff0c;进行验证才能进行后续操作 解决验证码的方法如下&#xff1a; 1、开发做个万能…

六、展示信息添加 animation 动态效果

简介 给每个信息组件内容添加动画效果,通过 animation 来怎么增强用户浏览时的交互体验。欢迎访问个人的简历网站预览效果 本章涉及修改与新增的文件:App.vue、main.ts、first.vue 、second.vue、third.vue 、fourth.vue 、fifth.vue 一、安装 animae 插件 先安装 animate…

Springboot整合规则引擎

Springboot整合Drools 规则引擎 1.添加maven 依赖坐标&#xff0c;并创建springboot项目 <!-- drools规则引擎 --> <dependency><groupId>org.drools</groupId><artifactId>drools-compiler</artifactId><version>7.6.0.Final<…

JavaScript系列从入门到精通系列第一篇:JavaScript语言简介和它代码初体验

一&#xff1a;简介 1&#xff1a;起源 JavaScript诞生于1995年&#xff0c;它的出现主要是用于处理网页中的前端验证&#xff0c; 所谓的前端验证&#xff0c;就是指检查用户输入的内容是否符合一定的规则。 2&#xff1a;简史 JavaScript是由网景公司发明&#xff0c;起初命…

【SpringMVC】拦截器JSR303的使用

【SpringMVC】拦截器&JSR303的使用 1.1 什么是JSR3031.2 为什么使用JSR3031.3 常用注解1.4 Validated与Valid区别1.5 JSR快速入门1.5.2 配置校验规则# 1.5.3 入门案例二、拦截器2.1 什么是拦截器2.2 拦截器与过滤器2.3 应用场景2.4 拦截器快速入门2.5.拦截器链2.6登录案列权…