深入解析XXL-JOB任务调度执行原理

引言

​ 在分布式系统中,定时任务调度是业务场景中不可或缺的一环。面对海量任务、复杂依赖和高可用性要求,传统单机调度方案逐渐显得力不从心。XXL-JOB作为一款开源的分布式任务调度平台,凭借其轻量级、高扩展性和易用性成为众多企业的选择。本文将深入剖析XXL-JOB的任务执行原理,揭示其背后的设计哲学。

一、架构设计

1、 设计思想

将调度行为抽象形成“调度中心”公共平台,而平台自身并不承担业务逻辑,“调度中心”负责发起调度请求。

将任务抽象成分散的JobHandler,交由“执行器”统一管理,“执行器”负责接收调度请求并执行对应的JobHandler中业务逻辑。

因此,“调度”和“任务”两部分可以相互解耦,提高系统整体稳定性和扩展性;

2、 系统组成

  • 调度模块(调度中心)
    负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码。调度系统与任务解耦,提高了系统可用性和稳定性,同时调度系统性能不再受限于任务模块;
    支持可视化、简单且动态的管理调度信息,包括任务新建,更新,删除,GLUE开发和任务报警等,所有上述操作都会实时生效,同时支持监控调度结果以及执行日志,支持执行器Failover。
  • 执行模块(执行器)
    负责接收调度请求并执行任务逻辑。任务模块专注于任务的执行等操作,开发和维护更加简单和高效;
    接收“调度中心”的执行请求、终止请求和日志请求等。

3、 架构图

输入图片说明

4、 quartz的不足

Quartz作为开源作业调度中的佼佼者,是作业调度的首选。但是集群环境中Quartz采用API的方式对任务进行管理,从而可以避免上述问题,但是同样存在以下问题:

  • 问题一:调用API的方式操作任务,不人性化;
  • 问题二:需要持久化业务QuartzJobBean到底层数据表中,系统侵入性相当严重。
  • 问题三:调度逻辑和QuartzJobBean耦合在同一个项目中,这将导致一个问题,在调度任务数量逐渐增多,同时调度任务逻辑逐渐加重的情况下,此时调度系统的性能将大大受限于业务;
  • 问题四:quartz底层以“抢占式”获取DB锁并由抢占成功节点负责运行任务,会导致节点负载悬殊非常大;而XXL-JOB通过执行器实现“协同分配式”运行任务,充分发挥集群优势,负载各节点均衡。

XXL-JOB弥补了quartz的上述不足之处。

5、自研调度模块

XXL-JOB最终选择自研调度组件(早期调度组件基于Quartz);一方面是为了精简系统降低冗余依赖,另一方面是为了提供系统的可控度与稳定性;

XXL-JOB中“调度模块”和“任务模块”完全解耦,调度模块进行任务调度时,将会解析不同的任务参数发起远程调用,调用各自的远程执行器服务。这种调用模型类似RPC调用,调度中心提供调用代理的功能,而执行器提供远程服务的功能。

二、源码解析

1、执行器入口

在配置执行器组件时,会配置一个XxlJobSpringExecutor的Bean

@Bean
public XxlJobSpringExecutor xxlJobExecutor() {logger.info(">>>>>>>>>>> xxl-job config init.");XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();//设置调用中心的连接地址xxlJobSpringExecutor.setAdminAddresses(adminAddresses);//设置执行器的名称xxlJobSpringExecutor.setAppname(appname);//设置执行器IPxxlJobSpringExecutor.setIp(ip);//执行器端口号xxlJobSpringExecutor.setPort(port);//调度中心通讯tokenxxlJobSpringExecutor.setAccessToken(accessToken);//任务执行日志存放的目录xxlJobSpringExecutor.setLogPath(logPath);//执行器日志文件保存天数xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);return xxlJobSpringExecutor;
}

XxlJobSpringExecutor这个类就是执行器的入口

该类实现了SmartInitializingSingleton接口,经过Bean的生命周期,会调用afterSingletonsInstantiated这个方法

	public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {private static final Logger logger = LoggerFactory.getLogger(XxlJobSpringExecutor.class);@Overridepublic void afterSingletonsInstantiated() {// init JobHandler Repository (for method)initJobHandlerMethodRepository(applicationContext);GlueFactory.refreshInstance(1);try {super.start();} catch (Exception e) {throw new RuntimeException(e);}}

2、初始化JobHandler

首先来看initJobHandlerMethodRepository这个方法

    private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {if (applicationContext == null) {return;}//getBeanNamesForType获取所有beanNames的集合String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);for (String beanDefinitionName : beanDefinitionNames) {Object bean = null;//如果是懒加载也就是加了@Lazy注解的,直接跳过Lazy onBean = applicationContext.findAnnotationOnBean(beanDefinitionName, Lazy.class);if (onBean!=null){logger.debug("xxl-job annotation scan, skip @Lazy Bean:{}", beanDefinitionName);continue;}else {//获取当前的beanbean = applicationContext.getBean(beanDefinitionName);}//key为Method方法,值为XxlJob注解的集合Map<Method, XxlJob> annotatedMethods = null;try {//通过selectMethods方法返回map对象annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),new MethodIntrospector.MetadataLookup<XxlJob>() {@Overridepublic XxlJob inspect(Method method) {//当前method有没有加XxlJob注解return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);}});} catch (Throwable ex) {logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);}if (annotatedMethods==null || annotatedMethods.isEmpty()) {continue;}for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {//获取当前Entry的key和valueMethod executeMethod = methodXxlJobEntry.getKey();XxlJob xxlJob = methodXxlJobEntry.getValue();//注册registJobHandler(xxlJob, bean, executeMethod);}}}
2.1 MethodIntrospector.selectMethods()
	public static <T> Map<Method, T> selectMethods(Class<?> targetType, final MetadataLookup<T> metadataLookup) {//返回的map集合final Map<Method, T> methodMap = new LinkedHashMap<>();Set<Class<?>> handlerTypes = new LinkedHashSet<>();//添加给定类实现的所有接口的集合,相当于把当前类的上一级全部添加进来handlerTypes.addAll(ClassUtils.getAllInterfacesForClassAsSet(targetType));for (Class<?> currentHandlerType : handlerTypes) {//调用doWithMethods方法,传入函数式接口实现ReflectionUtils.doWithMethods(currentHandlerType, method -> {Method specificMethod = ClassUtils.getMostSpecificMethod(method, targetClass);//调用外层的inspect方法返回结果,也就是XxlJob注解T result = metadataLookup.inspect(specificMethod);if (result != null) {Method bridgedMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);if (bridgedMethod == specificMethod || metadataLookup.inspect(bridgedMethod) == null) {//将method和result放入methodMap中methodMap.put(specificMethod, result);}}}, ReflectionUtils.USER_DECLARED_METHODS);}//最后返回map对象return methodMap;}

来看一下ReflectionUtils.doWithMethods()这个方法

	public static void doWithMethods(Class<?> clazz, MethodCallback mc, @Nullable MethodFilter mf) {//获取到当前类的声明的所有方法数组Method[] methods = getDeclaredMethods(clazz, false);for (Method method : methods) {if (mf != null && !mf.matches(method)) {continue;}try {//调用doWith方法,也就是外层的函数式接口实现mc.doWith(method);}catch (IllegalAccessException ex) {throw new IllegalStateException("Not allowed to access method '" + method.getName() + "': " + ex);}}//对父类或者接口做同样的处理if (clazz.getSuperclass() != null && (mf != USER_DECLARED_METHODS || clazz.getSuperclass() != Object.class)) {doWithMethods(clazz.getSuperclass(), mc, mf);}else if (clazz.isInterface()) {for (Class<?> superIfc : clazz.getInterfaces()) {doWithMethods(superIfc, mc, mf);}}}
2.2 registJobHandler()
    protected void registJobHandler(XxlJob xxlJob, Object bean, Method executeMethod){if (xxlJob == null) {return;}//xxlJob的value值String name = xxlJob.value();Class<?> clazz = bean.getClass();//方法名String methodName = executeMethod.getName();if (name.trim().length() == 0) {throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + clazz + "#" + methodName + "] .");}//如果name存在重复的,则会抛出异常if (loadJobHandler(name) != null) {throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");}executeMethod.setAccessible(true);//获取init和destroy方法.....// registry jobhandler//这里new了一个MethodJobHandler对象作为参数,后面会详细说明registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));}
    //维护的map集合,key是xxljob注解中设置的value值,value是IJobHandler对象private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();public static IJobHandler loadJobHandler(String name){return jobHandlerRepository.get(name);}//注册JobHandler就是放入到jobHandlerRepository这个map集合中public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);return jobHandlerRepository.put(name, jobHandler);}
2.3 JobHandler

JobHandler其实就是一个定时任务的封装,一个定时任务会对应一个JobHandler对象,注册JobHandler时就是new一个JobHandler对象放入到维护的map集合中

当执行器执行任务的时候,就会调用JobHandler的execute方法

JobHandler有三种实现:

  • MethodJobHandler:方法的实现
  • GlueJobHandler:支持动态修改任务执行的代码
  • ScriptJobHandler:处理脚本任务
public class MethodJobHandler extends IJobHandler {@Overridepublic void execute() throws Exception {Class<?>[] paramTypes = method.getParameterTypes();//通过反射调用方法if (paramTypes.length > 0) {method.invoke(target, new Object[paramTypes.length]);       } else {method.invoke(target);}}

通过上面解析,MethodJobHandler会在项目启动的时候就会创建,GlueJobHandlerScriptJobHandler都是任务触发时才会创建

3、执行器注册

调用完initJobHandlerMethodRepository方法后,会执行super.start()方法

	public void start() throws Exception {//初始化日志XxlJobFileAppender.initLogPath(logPath);//初始化admin集合initAdminBizList(adminAddresses, accessToken);//初始化日志清理的线程JobLogFileCleanThread.getInstance().start(logRetentionDays);//初始化执行器回调线程TriggerCallbackThread.getInstance().start();//初始化执行器服务端initEmbedServer(address, ip, port, appname, accessToken);}
    private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {//如果没有指定端口,默认9999端口port = port>0?port: NetUtil.findAvailablePort(9999);//没有指定ip,通过IpUtil.getIp()方法获取ipip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();//生成注册地址,默认使用指定的address,如果没有指定,则使用ip:portif (address==null || address.trim().length()==0) {String ip_port_address = IpUtil.getIpPort(ip, port);address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);}embedServer = new EmbedServer();//调用start方法embedServer.start(address, port, appname, accessToken);}
    //创建一个Http服务器,底层是基于Netty实现的,这个Http服务端会接收来自调度中心的请求public void start(final String address, final int port, final String appname, final String accessToken) {//当执行器接收到调度中心的请求时,会把请求交给ExecutorBizImpl来处理//ExecutorBiz还有一个ExecutorBizClient实现类,主要是用来发送http请求,所以这个实现类是在调度中心使用的,用来访问执行器提供的http接口executorBiz = new ExecutorBizImpl();thread = new Thread(new Runnable() {@Overridepublic void run() {......// start registrystartRegistry(appname, address);......thread.setDaemon(true);thread.start();}
    public void startRegistry(final String appname, final String address) {// start registryExecutorRegistryThread.getInstance().start(appname, address);}
//如果admin是集群部署的话,就会向每台服务器调用registry方法区注册
@Override
public ReturnT<String> registry(RegistryParam registryParam) {//通过post接口调用return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
}
    //JobApiController.java@RequestMapping("/{uri}")@ResponseBody@PermissionLimit(limit=false)public ReturnT<String> api(HttpServletRequest request, @PathVariable("uri") String uri, @RequestBody(required = false) String data) {......if ("callback".equals(uri)) {List<HandleCallbackParam> callbackParamList = GsonTool.fromJson(data, List.class, HandleCallbackParam.class);return adminBiz.callback(callbackParamList);} else if ("registry".equals(uri)) {RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);//调用registry方法return adminBiz.registry(registryParam);} else if ("registryRemove".equals(uri)) {RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);return adminBiz.registryRemove(registryParam);} else {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");}}
	//最终注册就是往xxl_job_registry表中插入一条记录public ReturnT<String> registry(RegistryParam registryParam) {// async executeregistryOrRemoveThreadPool.execute(new Runnable() {@Overridepublic void run() {//先执行更新语句,如果不存在,则执行插入语句int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());if (ret < 1) {//执行插入方法
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());// freshfreshGroupRegistryInfo(registryParam);}}});return ReturnT.SUCCESS;}

当执行器启动的时候,会启动一个注册线程,这个线程会往调度中心注册当前执行器的信息,包括两部分数据

  • 执行器的名字,也就是设置的appname
  • 执行器所在机器的ip和端口,这样调度中心就可以访问到这个执行器提供的Http接口

4、任务触发

在xxl-job-admin项目中也会配置一个类似于XxlJobSpringExecutor的Bean,叫XxlJobAdminConfig

//实现了InitializingBean接口,在初始化是会执行afterPropertiesSet方法
@Component
public class XxlJobAdminConfig implements InitializingBean, DisposableBean {@Overridepublic void afterPropertiesSet() throws Exception {adminConfig = this;xxlJobScheduler = new XxlJobScheduler();xxlJobScheduler.init();}
}
public class XxlJobScheduler  {public void init() throws Exception {// init i18ninitI18n();//初始化admin的任务触发线程池JobTriggerPoolHelper.toStart();//监控和维护注册的执行器实例JobRegistryHelper.getInstance().start();//job任务执行失败处理JobFailMonitorHelper.getInstance().start();//job任务结果丢失处理JobCompleteHelper.getInstance().start();//admin的日志记录JobLogReportHelper.getInstance().start();//job任务触发JobScheduleHelper.getInstance().start();logger.info(">>>>>>>>> init xxl-job admin success.");}
}

来看job任务触发的start方法

public void start(){//开启任务线程scheduleThread = new Thread(new Runnable() {@Overridepublic void run() {......//通过数据库实现的分布式锁来保证任务在同一时间只会被其中的一个调度中心触发一次preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );preparedStatement.execute();//查询xxl_job_info这张表中下一次执行的时间 <= 当前时间 + 5s的任务,5s是XxlJob写死的,被称为预读时间,提前读出来,保证任务能准时触发//查询到任务之后,调度线程会去将这些任务根据执行时间划分为三个部分://1、当前时间已经超过任务下一次执行时间5s以上的任务//2、当前时间已经超过任务下一次执行时间,但是但不足5s的任务//3、还未到触发时间,但是一定是5s内就会触发执行的List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);for (XxlJobInfo jobInfo: scheduleList) {//超过5秒以上会根据任务配置的调度过期策略来选择要不要执行//调度过期策略就两种,一种直接忽略,一种就是立马执行一次if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {//执行一次JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);}//没有超过5秒则就直接立马执行一次,之后如果判断任务下一次执行时间就在5s内,会直接放到一个时间轮里面,等待下一次触发执行} else if (nowTime > jobInfo.getTriggerNextTime()) {JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);//还没到执行时间,所以不会立马执行,也是直接放到时间轮里面,等待触发执行} else {// 1、make ring secondint ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);// 2、push time ringpushTimeRing(ringSecond, jobInfo.getId());// 3、fresh nextrefreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}}scheduleThread.setDaemon(true);scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");scheduleThread.start();}}
}
4.1 JobTriggerPoolHelper.trigger()

当任务达到了触发条件,并不是由调度线程直接去触发执行器的任务执行,调度线程会将这个触发的任务交给线程池去执行,并且Xxl-Job为了进一步优化任务的触发,将这个触发任务执行的线程池划分成快线程池慢线程池两个线程池

在调用执行器的Http接口触发任务执行的时候,Xxl-Job会去记录每个任务的触发所耗费的时间,当任务一次触发的时间超过500ms,那么这个任务的慢次数就会加1,如果这个任务一分钟内触发的慢次数超过10次,接下来就会将触发任务交给慢线程池去执行,所以快慢线程池就是避免那种频繁触发并且每次触发时间还很长的任务阻塞其它任务的触发的情况发生

    public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {//执行addTrigger方法helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);}public void addTrigger(final int jobId,final TriggerTypeEnum triggerType,final int failRetryCount,final String executorShardingParam,final String executorParam,final String addressList) {//选择快慢线程池ThreadPoolExecutor triggerPool_ = fastTriggerPool;AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {//慢线程池triggerPool_ = slowTriggerPool;}triggerPool_.execute(new Runnable() {@Overridepublic void run() {long start = System.currentTimeMillis();try {// do triggerXxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);} catch (Exception e) {logger.error(e.getMessage(), e);}}});}public static void trigger(int jobId,TriggerTypeEnum triggerType,int failRetryCount,String executorShardingParam,String executorParam,String addressList) {......//如果执行器路由策略是分片广播的话if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)&& group.getRegistryList()!=null && !group.getRegistryList().isEmpty()&& shardingParam==null) {//分片执行for (int i = 0; i < group.getRegistryList().size(); i++) {processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());}} else {if (shardingParam == null) {shardingParam = new int[]{0, 1};}processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);}}private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){// 1、save log-id// 2、init trigger-param// 3、init addressString address = null;ReturnT<String> routeAddressResult = null;//分片广播处理if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {if (index < group.getRegistryList().size()) {address = group.getRegistryList().get(index);} else {address = group.getRegistryList().get(0);}} else {//选择路由routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {address = routeAddressResult.getContent();}}} else {routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));}// 4、trigger remote executorReturnT<String> triggerResult = null;if (address != null) {//远程调用执行器执行任务triggerResult = runExecutor(triggerParam, address);} else {triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);}// 5、collection trigger info// 6、save log trigger-info}public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){ReturnT<String> runResult = null;try {//根据注册地址返回ExecutorBiz对象,这里是用map缓存起来,一个地址对应一个ExecutorBiz,也就是ExecutorBizClient实现类//private static ConcurrentMap<String, ExecutorBiz> executorBizRepository = new ConcurrentHashMap<String, ExecutorBiz>();ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);//调用run方法runResult = executorBiz.run(triggerParam);} catch (Exception e) {logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));}return runResult;}@Overridepublic ReturnT<String> run(TriggerParam triggerParam) {//通过post请求远程调用return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);}
4.2 路由策略

img

XxlJob支持多种路由策略,除了分片广播,其余的具体的算法实现都是通过ExecutorRouter的实现类来实现的

在这里插入图片描述
第一个、最后一个、轮询、随机都比较好理解

  • 一致性Hash:一致性Hash可以理解,就是Hash函数(hashcode%size)的size保持不变,从而保证了Hash函数的前后一致性
  • 最不经常使用(LFU:Least Frequently Used):Xxl-Job内部会有一个缓存,统计每个任务每个地址的使用次数,每次都选择使用次数最少的地址,这个缓存每隔24小时重置一次
  • 最近最久未使用(LRU:Least Recently Used):将地址存到LinkedHashMap中,它利用LinkedHashMap可以根据元素访问(get/put)顺序来给元素排序的特性,快速找到最近最久未使用(未访问)的节点
  • 故障转移:调度中心都会去请求每个执行器,只要能接收到响应,说明执行器正常,那么任务就会交给这个执行器去执行
  • 忙碌转移:调度中心也会去请求每个执行器,判断执行器是不是正在执行当前需要执行的任务(任务执行时间过长,导致上一次任务还没执行完,下一次又触发了),如果在执行,说明忙碌,不能用,否则就可以用
  • 分片广播:XxlJob给每个执行器分配一个编号,从0开始递增,然后向所有执行器触发任务,告诉每个执行器自己的编号和总共执行器的数据,我们可以通过XxlJobHelper#getShardIndex获取到编号,XxlJobHelper#getShardTotal获取到执行器的总数据量,分片广播就是将任务量分散到各个执行器,每个执行器只执行一部分任务,加快任务的处理

5、任务执行

前面在说执行器注册的时候有提到,当执行器接收到调度中心的请求时,会把请求交给ExecutorBizImpl来处理

@Overrideprotected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {bizThreadPool.execute(new Runnable() {@Overridepublic void run() {Object responseObj = process(httpMethod, uri, requestData, accessTokenReq); }});}//包括beat、idleBeat、run、kill、log请求都是通过ExecutorBizImpl来实现的private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {try {switch (uri) {case "/beat":return executorBiz.beat();case "/idleBeat":IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);return executorBiz.idleBeat(idleBeatParam);case "/run":TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);//最终会调用到ExecutorBizImpl的run方法实现return executorBiz.run(triggerParam);case "/kill":KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);return executorBiz.kill(killParam);case "/log":LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);return executorBiz.log(logParam);default:return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");}} catch (Exception e) {logger.error(e.getMessage(), e);return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));}}

executorBiz.run(TriggerParam triggerParam),会根据传入的条件构建出IJobHandler和JobThread,最终会放到队列中等待触发

public ReturnT<String> run(TriggerParam triggerParam) {//根据传入的JobId,加载旧的jobHandler和jobThread(如果一个任务已经执行过一次,会存入jobThreadRepository这个本地的Map里)JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;String removeOldReason = null;//根据不同的GlueType,构建不同的IJobHandlerGlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());//如果类型是Bean的话,则会构建为MethodJobHandlerif (GlueTypeEnum.BEAN == glueTypeEnum) {//直接从map缓存中取,在初始化jobHandler的那一步,都已经全部放入到map中IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());//校验jobThread,如果满足以下条件则会终止if (jobThread!=null && jobHandler != newJobHandler) {removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";jobThread = null;jobHandler = null;}//校验jobHandlerif (jobHandler == null) {jobHandler = newJobHandler;if (jobHandler == null) {return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");}}//如果类型是GLUE的话,则会构建为GlueJobHandler} else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {//如果类型是Script的话,ScriptJobHandler} else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {} else {return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");}//如果jobThread不为null,则根据阻塞策略进行相应的操作if (jobThread != null) {ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);//如果当前阻塞策略是DISCARD_LATER,也就是丢弃后续调度if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {//jobThread正在运行或在触发队列中,则返回错误信息if (jobThread.isRunningOrHasQueue()) {return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());}//如果当前阻塞策略是COVER_EARLY,也就是覆盖之前调度} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {//jobThread正在运行或在触发队列中,则终止旧的任务线程,并将任务线程设置为 nullif (jobThread.isRunningOrHasQueue()) {removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();jobThread = null;}//如果是单机串行SERIAL_EXECUTION,则放入队列中触发} else {// just queue trigger}}//如果jobThread任务线程为null,注册一个新的任务线程if (jobThread == null) {jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);}//将触发参数推送到任务线程的触发队列中,等待执行ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);return pushResult;}

在上面这个方法中,最终会把triggerParam放入TriggerQueue中,那么真正的任务执行是在哪里呢?还是看上面这段代码,XxlJobExecutor.registJobThread这个方法中,注册了一个新的jobThread,并且通过start方法启动了这个线程。

    private static ConcurrentMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<Integer, JobThread>();public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){//new了一个JobThread对象,并且调用start方法启动JobThread newJobThread = new JobThread(jobId, handler);newJobThread.start();logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread);	if (oldJobThread != null) {oldJobThread.toStop(removeOldReason);oldJobThread.interrupt();}return newJobThread;}

调用了线程的start方法之后,JobThread的run方法就开始执行。

	@Overridepublic void run() {//toStop不为true则循环调用while(!toStop){running = false;idleTimes++;TriggerParam triggerParam = null;try {//从队列triggerQueue中取TriggerParam对象,最多等待三秒triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);if (triggerParam!=null) {running = true;idleTimes = 0;triggerLogIdSet.remove(triggerParam.getLogId());//如果任务设置了超时时间,则new一个FutureTask对象异步执行任务,通过futureTask.get()方法,设置超时时间,然后捕获TimeoutException异常来实现if (triggerParam.getExecutorTimeout() > 0) {Thread futureThread = null;try {FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {@Overridepublic Boolean call() throws Exception {XxlJobContext.setXxlJobContext(xxlJobContext);//通过该方法真正执行任务handler.execute();return true;}});futureThread = new Thread(futureTask);futureThread.start();Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);} catch (TimeoutException e) {XxlJobHelper.log("<br>----------- xxl-job job execute timeout");XxlJobHelper.log(e);// handle resultXxlJobHelper.handleTimeout("job execute timeout ");} finally {futureThread.interrupt();}} else {//如果没有设置超时时间,直接执行任务handler.execute();}//如果任务执行失败或结果丢失,调用XxlJobHelper.handleFail()方法进行处理//如果任务执行成功,记录执行结果信息if (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) {XxlJobHelper.handleFail("job handle result lost.");} else {String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg();tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000)?tempHandleMsg.substring(0, 50000).concat("..."):tempHandleMsg;XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg);}XxlJobHelper.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode="+ XxlJobContext.getXxlJobContext().getHandleCode()+ ", handleMsg = "+ XxlJobContext.getXxlJobContext().getHandleMsg());} else {//空闲次数计数器idleTimes。如果idleTimes达到阈值(30 次),相当于有30次都没有执行对应的任务,就从执行器缓存中删除这个任务if (idleTimes > 30) {if(triggerQueue.size() == 0) {XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");}}}} catch (Throwable e) {if (toStop) {XxlJobHelper.log("<br>----------- JobThread toStop, stopReason:" + stopReason);}//处理异常信息StringWriter stringWriter = new StringWriter();e.printStackTrace(new PrintWriter(stringWriter));String errorMsg = stringWriter.toString();XxlJobHelper.handleFail(errorMsg);XxlJobHelper.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");} finally {if(triggerParam != null) {//如果任务线程未被停止,将推送回调信息push到回调线程TriggerCallbackThreadif (!toStop) {TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.getXxlJobContext().getHandleCode(),XxlJobContext.getXxlJobContext().getHandleMsg() ));} else {//如果任务线程被停止,则推送线程停止信息到回调线程TriggerCallbackThreadTriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.HANDLE_CODE_FAIL,stopReason + " [job running, killed]" ));}}}}//如果线程被停止后触发队列不为空,则推送线程停止信息到回调线程while(triggerQueue !=null && triggerQueue.size()>0){TriggerParam triggerParam = triggerQueue.poll();if (triggerParam!=null) {// is killedTriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.HANDLE_CODE_FAIL,stopReason + " [job not executed, in the job queue, killed.]"));}}}
  • JobThread会将任务执行的结果发送到一个内存队列中
  • 执行器启动的时候会开启一个处发送任务执行结果的线程:TriggerCallbackThread
  • 这个线程会不停地从队列中获取所有的执行结果,将执行结果批量发送给调度中心
  • 调用中心接收到请求时,会根据执行的结果修改这次任务的执行状态和进行一些后续的事,比如失败了是否需要重试,是否有子任务需要触发等等

三、推荐阅读

XXL-JOB官网

xxl-job架构原理讲解

XXL-JOB调度算法

xxl-job源码解读

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/26286.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

为AI聊天工具添加一个知识系统 之127 详细设计之68 编程 核心技术:Cognitive Protocol Language 之1

本文要点 要点 今天讨论的题目&#xff1a;本项目&#xff08;为使用AI聊天工具的两天者加挂一个知识系统&#xff09; 详细程序设计 之“编程的核心技术” 。 source的三个子类&#xff08;Instrument, Agent, Effector&#xff09; 分别表示--实际上actually &#xff0c;…

word转换为pdf后图片失真解决办法、高质量PDF转换方法

1、安装Adobe Acrobat Pro DC 自行安装 2、配置Acrobat PDFMaker &#xff08;1&#xff09;点击word选项卡上的Acrobat插件&#xff0c;&#xff08;2&#xff09;点击“首选项”按钮&#xff0c;&#xff08;3&#xff09;点击“高级配置”按钮&#xff08;4&#xff09;点…

Kafka生产者相关

windows中kafka集群部署示例-CSDN博客 先启动集群或者单机也OK 引入依赖 <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.9.0</version></dependency>关于主题创建 理论…

《Effective Objective-C》阅读笔记(下)

目录 内存管理 理解引用计数 引用计数工作原理 自动释放池 保留环 以ARC简化引用计数 使用ARC时必须遵循的方法命名规则 变量的内存管理语义 ARC如何清理实例变量 在dealloc方法中只释放引用并解除监听 编写“异常安全代码”时留意内存管理问题 以弱引用避免保留环 …

一、对iic类模块分析与使用

bmp280驱动代码 说明&#xff1a; 1、该模块用于获取气压&#xff0c;温度&#xff0c;海拔等数据。 vcc&#xff0c;gnd接电源 sda &#xff0c;scl 接iic通信引脚 2、该模块使用iic通信&#xff0c;通过iic发送请求相关类的寄存器值&#xff0c;芯片获取对应寄存器返回的数据…

辛格迪客户案例 | 祐儿医药科技GMP培训管理(TMS)项目

01 项目背景&#xff1a;顺应行业趋势&#xff0c;弥补管理短板 随着医药科技行业的快速发展&#xff0c;相关法规和标准不断更新&#xff0c;对企业的质量管理和人员培训提出了更高要求。祐儿医药科技有限公司&#xff08;以下简称“祐儿医药”&#xff09;作为一家专注于创新…

汽车低频发射天线介绍

汽车低频PKE天线是基于RFID技术的深度研究及产品开发应用的一种天线&#xff0c;在汽车的智能系统中发挥着重要作用&#xff0c;以下是关于它的详细介绍&#xff1a; 移动管家PKE低频天线结构与原理 结构&#xff1a;产品一般由一个高Q值磁棒天线和一个高压电容组成&#xff…

Java 大视界 -- Java 大数据分布式文件系统的性能调优实战(101)

&#x1f496;亲爱的朋友们&#xff0c;热烈欢迎来到 青云交的博客&#xff01;能与诸位在此相逢&#xff0c;我倍感荣幸。在这飞速更迭的时代&#xff0c;我们都渴望一方心灵净土&#xff0c;而 我的博客 正是这样温暖的所在。这里为你呈上趣味与实用兼具的知识&#xff0c;也…

全国普通高等学校名单

全国普通高等学校名单 全国普通高等院校&#xff0c;简称“高校”&#xff0c;是指那些提供高等教育的学校&#xff0c;涵盖了大学、独立学院、高等专科学校以及高等职业学校等多种类型。这些机构通过国家普通高等教育招生考试&#xff0c;主要招收高中毕业生&#xff0c;并为…

Vue.js 学习笔记

文章目录 前言一、Vue.js 基础概念1.1 Vue.js 简介1.2 Vue.js 的特点1.3 Vue.js 基础示例 二、Vue.js 常用指令2.1 双向数据绑定&#xff08;v-model&#xff09;2.2 条件渲染&#xff08;v-if 和 v-show&#xff09;2.3 列表渲染&#xff08;v-for&#xff09;2.4 事件处理&am…

【Python】基础语法三

> 作者&#xff1a;დ旧言~ > 座右铭&#xff1a;松树千年终是朽&#xff0c;槿花一日自为荣。 > 目标&#xff1a;了解Python的函数、列表和数组。 > 毒鸡汤&#xff1a;有些事情&#xff0c;总是不明白&#xff0c;所以我不会坚持。早安! > 专栏选自&#xff…

传奇3光通版手游行会战攻略:团队协作与战术布局详解

戳一戳&#xff1b;了解更多 在《传奇3光通版》手游中&#xff0c;行会战是玩家们展现团队协作与战术布局的重要舞台。下面&#xff0c;我们就来详细解析一下行会战中的团队协作与战术布局攻略。 一、团队协作 ​职业搭配 在行会战中&#xff0c;合理的职业搭配至关重要。一般…

unity学习56:旧版legacy和新版TMP文本输入框 InputField学习

目录 1 旧版文本输入框 legacy InputField 1.1 新建一个文本输入框 1.2 InputField 的子物体构成 1.3 input field的的component 1.4 input Field的属性 2 过渡 transition 3 控件导航 navigation 4 占位文本 placeholder 5 文本 text 5.1 文本内容&#xff0c;用户…

99分巧克力

99分巧克力 ⭐️难度&#xff1a;中等 &#x1f31f;考点&#xff1a;二分 2017省赛真题 &#x1f4d6; &#x1f4da; import java.util.Scanner;public class Main {public static void main(String[] args) {Scanner sc new Scanner(System.in);int n sc.nextInt();i…

Python 基础知识全面总结

Python 是一种广泛应用的编程语言&#xff0c;具有简洁、易读、功能强大等特点。本文将对 Python 的基础知识进行全面梳理&#xff0c;涵盖从入门必备知识到各类模块和编程概念等内容。 一、Python基础语法 &#xff08;一&#xff09;标识符 定义&#xff1a;用于给变量、函…

7.1.1 计算机网络的组成

文章目录 物理组成功能组成工作方式完整导图 物理组成 计算机网络是将分布在不同地域的计算机组织成系统&#xff0c;便于相互之间资源共享、传递信息。 计算机网络的物理组成包括硬件和软件。硬件中包含主机、前端处理器、连接设备、通信线路。软件中包含协议和应用软件。 功…

领域驱动设计:事件溯源架构简介

概述 事件溯源架构通常由3种应用设计模式组成,分别是:事件驱动(Event Driven),事件溯源(Event Source)、CQRS(读写分离)。这三种应用设计模式常见于领域驱动设计(DDD)中,但它们本身是一种应用设计的思想,不仅仅局限于DDD,每一种模式都可以单独拿出来使用。 E…

【AD】3-10 原理图PDF导出

文件—智能PDF 多页原理图导出 导出设置时选择工程&#xff0c;可自行选择导出一页或多页原理图&#xff0c;一般PCB不用导出

半导体制造工艺(二)光刻工艺—掩模版

在上文中我们已经简单概述了光刻工艺的大致流程。接下来将会介绍在光刻工艺中所需用到的必备材料以及设备。例如掩模版、光刻胶、匀胶机、光刻机等等。由于需要保持讲述工艺的完整性以及流畅&#xff0c;每一个都需要涉及&#xff0c;所以每次仅是侧重点不同。此篇主要讲述的是…

ubuntu服务器安装VASP.6.4.3

ubuntu服务器安装VASP.6.4.3 1 安装Intel OneAPI Base Toolkit和Intel OneAPI HPC Toolkit1.1 更新并安装环境变量1.2 下载Intel OneAPI Base Toolkit和Intel OneAPI HPC Toolkit安装包1.3 安装 Intel OneAPI Base Toolkit1.4 安装 Intel OneAPI HPC Toolkit1.5 添加并激活环境…