引言
在分布式系统中,定时任务调度是业务场景中不可或缺的一环。面对海量任务、复杂依赖和高可用性要求,传统单机调度方案逐渐显得力不从心。XXL-JOB作为一款开源的分布式任务调度平台,凭借其轻量级、高扩展性和易用性成为众多企业的选择。本文将深入剖析XXL-JOB的任务执行原理,揭示其背后的设计哲学。
一、架构设计
1、 设计思想
将调度行为抽象形成“调度中心”公共平台,而平台自身并不承担业务逻辑,“调度中心”负责发起调度请求。
将任务抽象成分散的JobHandler,交由“执行器”统一管理,“执行器”负责接收调度请求并执行对应的JobHandler中业务逻辑。
因此,“调度”和“任务”两部分可以相互解耦,提高系统整体稳定性和扩展性;
2、 系统组成
- 调度模块(调度中心):
负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码。调度系统与任务解耦,提高了系统可用性和稳定性,同时调度系统性能不再受限于任务模块;
支持可视化、简单且动态的管理调度信息,包括任务新建,更新,删除,GLUE开发和任务报警等,所有上述操作都会实时生效,同时支持监控调度结果以及执行日志,支持执行器Failover。 - 执行模块(执行器):
负责接收调度请求并执行任务逻辑。任务模块专注于任务的执行等操作,开发和维护更加简单和高效;
接收“调度中心”的执行请求、终止请求和日志请求等。
3、 架构图
4、 quartz的不足
Quartz作为开源作业调度中的佼佼者,是作业调度的首选。但是集群环境中Quartz采用API的方式对任务进行管理,从而可以避免上述问题,但是同样存在以下问题:
- 问题一:调用API的方式操作任务,不人性化;
- 问题二:需要持久化业务QuartzJobBean到底层数据表中,系统侵入性相当严重。
- 问题三:调度逻辑和QuartzJobBean耦合在同一个项目中,这将导致一个问题,在调度任务数量逐渐增多,同时调度任务逻辑逐渐加重的情况下,此时调度系统的性能将大大受限于业务;
- 问题四:quartz底层以“抢占式”获取DB锁并由抢占成功节点负责运行任务,会导致节点负载悬殊非常大;而XXL-JOB通过执行器实现“协同分配式”运行任务,充分发挥集群优势,负载各节点均衡。
XXL-JOB弥补了quartz的上述不足之处。
5、自研调度模块
XXL-JOB最终选择自研调度组件(早期调度组件基于Quartz);一方面是为了精简系统降低冗余依赖,另一方面是为了提供系统的可控度与稳定性;
XXL-JOB中“调度模块”和“任务模块”完全解耦,调度模块进行任务调度时,将会解析不同的任务参数发起远程调用,调用各自的远程执行器服务。这种调用模型类似RPC调用,调度中心提供调用代理的功能,而执行器提供远程服务的功能。
二、源码解析
1、执行器入口
在配置执行器组件时,会配置一个XxlJobSpringExecutor
的Bean
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {logger.info(">>>>>>>>>>> xxl-job config init.");XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();//设置调用中心的连接地址xxlJobSpringExecutor.setAdminAddresses(adminAddresses);//设置执行器的名称xxlJobSpringExecutor.setAppname(appname);//设置执行器IPxxlJobSpringExecutor.setIp(ip);//执行器端口号xxlJobSpringExecutor.setPort(port);//调度中心通讯tokenxxlJobSpringExecutor.setAccessToken(accessToken);//任务执行日志存放的目录xxlJobSpringExecutor.setLogPath(logPath);//执行器日志文件保存天数xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);return xxlJobSpringExecutor;
}
XxlJobSpringExecutor
这个类就是执行器的入口
该类实现了SmartInitializingSingleton
接口,经过Bean的生命周期,会调用afterSingletonsInstantiated
这个方法
public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {private static final Logger logger = LoggerFactory.getLogger(XxlJobSpringExecutor.class);@Overridepublic void afterSingletonsInstantiated() {// init JobHandler Repository (for method)initJobHandlerMethodRepository(applicationContext);GlueFactory.refreshInstance(1);try {super.start();} catch (Exception e) {throw new RuntimeException(e);}}
2、初始化JobHandler
首先来看initJobHandlerMethodRepository
这个方法
private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {if (applicationContext == null) {return;}//getBeanNamesForType获取所有beanNames的集合String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);for (String beanDefinitionName : beanDefinitionNames) {Object bean = null;//如果是懒加载也就是加了@Lazy注解的,直接跳过Lazy onBean = applicationContext.findAnnotationOnBean(beanDefinitionName, Lazy.class);if (onBean!=null){logger.debug("xxl-job annotation scan, skip @Lazy Bean:{}", beanDefinitionName);continue;}else {//获取当前的beanbean = applicationContext.getBean(beanDefinitionName);}//key为Method方法,值为XxlJob注解的集合Map<Method, XxlJob> annotatedMethods = null;try {//通过selectMethods方法返回map对象annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),new MethodIntrospector.MetadataLookup<XxlJob>() {@Overridepublic XxlJob inspect(Method method) {//当前method有没有加XxlJob注解return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);}});} catch (Throwable ex) {logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);}if (annotatedMethods==null || annotatedMethods.isEmpty()) {continue;}for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {//获取当前Entry的key和valueMethod executeMethod = methodXxlJobEntry.getKey();XxlJob xxlJob = methodXxlJobEntry.getValue();//注册registJobHandler(xxlJob, bean, executeMethod);}}}
2.1 MethodIntrospector.selectMethods()
public static <T> Map<Method, T> selectMethods(Class<?> targetType, final MetadataLookup<T> metadataLookup) {//返回的map集合final Map<Method, T> methodMap = new LinkedHashMap<>();Set<Class<?>> handlerTypes = new LinkedHashSet<>();//添加给定类实现的所有接口的集合,相当于把当前类的上一级全部添加进来handlerTypes.addAll(ClassUtils.getAllInterfacesForClassAsSet(targetType));for (Class<?> currentHandlerType : handlerTypes) {//调用doWithMethods方法,传入函数式接口实现ReflectionUtils.doWithMethods(currentHandlerType, method -> {Method specificMethod = ClassUtils.getMostSpecificMethod(method, targetClass);//调用外层的inspect方法返回结果,也就是XxlJob注解T result = metadataLookup.inspect(specificMethod);if (result != null) {Method bridgedMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);if (bridgedMethod == specificMethod || metadataLookup.inspect(bridgedMethod) == null) {//将method和result放入methodMap中methodMap.put(specificMethod, result);}}}, ReflectionUtils.USER_DECLARED_METHODS);}//最后返回map对象return methodMap;}
来看一下ReflectionUtils.doWithMethods()
这个方法
public static void doWithMethods(Class<?> clazz, MethodCallback mc, @Nullable MethodFilter mf) {//获取到当前类的声明的所有方法数组Method[] methods = getDeclaredMethods(clazz, false);for (Method method : methods) {if (mf != null && !mf.matches(method)) {continue;}try {//调用doWith方法,也就是外层的函数式接口实现mc.doWith(method);}catch (IllegalAccessException ex) {throw new IllegalStateException("Not allowed to access method '" + method.getName() + "': " + ex);}}//对父类或者接口做同样的处理if (clazz.getSuperclass() != null && (mf != USER_DECLARED_METHODS || clazz.getSuperclass() != Object.class)) {doWithMethods(clazz.getSuperclass(), mc, mf);}else if (clazz.isInterface()) {for (Class<?> superIfc : clazz.getInterfaces()) {doWithMethods(superIfc, mc, mf);}}}
2.2 registJobHandler()
protected void registJobHandler(XxlJob xxlJob, Object bean, Method executeMethod){if (xxlJob == null) {return;}//xxlJob的value值String name = xxlJob.value();Class<?> clazz = bean.getClass();//方法名String methodName = executeMethod.getName();if (name.trim().length() == 0) {throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + clazz + "#" + methodName + "] .");}//如果name存在重复的,则会抛出异常if (loadJobHandler(name) != null) {throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");}executeMethod.setAccessible(true);//获取init和destroy方法.....// registry jobhandler//这里new了一个MethodJobHandler对象作为参数,后面会详细说明registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));}
//维护的map集合,key是xxljob注解中设置的value值,value是IJobHandler对象private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();public static IJobHandler loadJobHandler(String name){return jobHandlerRepository.get(name);}//注册JobHandler就是放入到jobHandlerRepository这个map集合中public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);return jobHandlerRepository.put(name, jobHandler);}
2.3 JobHandler
JobHandler其实就是一个定时任务的封装,一个定时任务会对应一个JobHandler对象,注册JobHandler时就是new一个JobHandler对象放入到维护的map集合中
当执行器执行任务的时候,就会调用JobHandler的execute方法
JobHandler有三种实现:
- MethodJobHandler:方法的实现
- GlueJobHandler:支持动态修改任务执行的代码
- ScriptJobHandler:处理脚本任务
public class MethodJobHandler extends IJobHandler {@Overridepublic void execute() throws Exception {Class<?>[] paramTypes = method.getParameterTypes();//通过反射调用方法if (paramTypes.length > 0) {method.invoke(target, new Object[paramTypes.length]); } else {method.invoke(target);}}
通过上面解析,MethodJobHandler
会在项目启动的时候就会创建,GlueJobHandler
和ScriptJobHandler
都是任务触发时才会创建
3、执行器注册
调用完initJobHandlerMethodRepository
方法后,会执行super.start()方法
public void start() throws Exception {//初始化日志XxlJobFileAppender.initLogPath(logPath);//初始化admin集合initAdminBizList(adminAddresses, accessToken);//初始化日志清理的线程JobLogFileCleanThread.getInstance().start(logRetentionDays);//初始化执行器回调线程TriggerCallbackThread.getInstance().start();//初始化执行器服务端initEmbedServer(address, ip, port, appname, accessToken);}
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {//如果没有指定端口,默认9999端口port = port>0?port: NetUtil.findAvailablePort(9999);//没有指定ip,通过IpUtil.getIp()方法获取ipip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();//生成注册地址,默认使用指定的address,如果没有指定,则使用ip:portif (address==null || address.trim().length()==0) {String ip_port_address = IpUtil.getIpPort(ip, port);address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);}embedServer = new EmbedServer();//调用start方法embedServer.start(address, port, appname, accessToken);}
//创建一个Http服务器,底层是基于Netty实现的,这个Http服务端会接收来自调度中心的请求public void start(final String address, final int port, final String appname, final String accessToken) {//当执行器接收到调度中心的请求时,会把请求交给ExecutorBizImpl来处理//ExecutorBiz还有一个ExecutorBizClient实现类,主要是用来发送http请求,所以这个实现类是在调度中心使用的,用来访问执行器提供的http接口executorBiz = new ExecutorBizImpl();thread = new Thread(new Runnable() {@Overridepublic void run() {......// start registrystartRegistry(appname, address);......thread.setDaemon(true);thread.start();}
public void startRegistry(final String appname, final String address) {// start registryExecutorRegistryThread.getInstance().start(appname, address);}
//如果admin是集群部署的话,就会向每台服务器调用registry方法区注册
@Override
public ReturnT<String> registry(RegistryParam registryParam) {//通过post接口调用return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
}
//JobApiController.java@RequestMapping("/{uri}")@ResponseBody@PermissionLimit(limit=false)public ReturnT<String> api(HttpServletRequest request, @PathVariable("uri") String uri, @RequestBody(required = false) String data) {......if ("callback".equals(uri)) {List<HandleCallbackParam> callbackParamList = GsonTool.fromJson(data, List.class, HandleCallbackParam.class);return adminBiz.callback(callbackParamList);} else if ("registry".equals(uri)) {RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);//调用registry方法return adminBiz.registry(registryParam);} else if ("registryRemove".equals(uri)) {RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);return adminBiz.registryRemove(registryParam);} else {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");}}
//最终注册就是往xxl_job_registry表中插入一条记录public ReturnT<String> registry(RegistryParam registryParam) {// async executeregistryOrRemoveThreadPool.execute(new Runnable() {@Overridepublic void run() {//先执行更新语句,如果不存在,则执行插入语句int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());if (ret < 1) {//执行插入方法
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());// freshfreshGroupRegistryInfo(registryParam);}}});return ReturnT.SUCCESS;}
当执行器启动的时候,会启动一个注册线程,这个线程会往调度中心注册当前执行器的信息,包括两部分数据
- 执行器的名字,也就是设置的appname
- 执行器所在机器的ip和端口,这样调度中心就可以访问到这个执行器提供的Http接口
4、任务触发
在xxl-job-admin项目中也会配置一个类似于XxlJobSpringExecutor
的Bean,叫XxlJobAdminConfig
//实现了InitializingBean接口,在初始化是会执行afterPropertiesSet方法
@Component
public class XxlJobAdminConfig implements InitializingBean, DisposableBean {@Overridepublic void afterPropertiesSet() throws Exception {adminConfig = this;xxlJobScheduler = new XxlJobScheduler();xxlJobScheduler.init();}
}
public class XxlJobScheduler {public void init() throws Exception {// init i18ninitI18n();//初始化admin的任务触发线程池JobTriggerPoolHelper.toStart();//监控和维护注册的执行器实例JobRegistryHelper.getInstance().start();//job任务执行失败处理JobFailMonitorHelper.getInstance().start();//job任务结果丢失处理JobCompleteHelper.getInstance().start();//admin的日志记录JobLogReportHelper.getInstance().start();//job任务触发JobScheduleHelper.getInstance().start();logger.info(">>>>>>>>> init xxl-job admin success.");}
}
来看job任务触发的start方法
public void start(){//开启任务线程scheduleThread = new Thread(new Runnable() {@Overridepublic void run() {......//通过数据库实现的分布式锁来保证任务在同一时间只会被其中的一个调度中心触发一次preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );preparedStatement.execute();//查询xxl_job_info这张表中下一次执行的时间 <= 当前时间 + 5s的任务,5s是XxlJob写死的,被称为预读时间,提前读出来,保证任务能准时触发//查询到任务之后,调度线程会去将这些任务根据执行时间划分为三个部分://1、当前时间已经超过任务下一次执行时间5s以上的任务//2、当前时间已经超过任务下一次执行时间,但是但不足5s的任务//3、还未到触发时间,但是一定是5s内就会触发执行的List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);for (XxlJobInfo jobInfo: scheduleList) {//超过5秒以上会根据任务配置的调度过期策略来选择要不要执行//调度过期策略就两种,一种直接忽略,一种就是立马执行一次if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {//执行一次JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);}//没有超过5秒则就直接立马执行一次,之后如果判断任务下一次执行时间就在5s内,会直接放到一个时间轮里面,等待下一次触发执行} else if (nowTime > jobInfo.getTriggerNextTime()) {JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);//还没到执行时间,所以不会立马执行,也是直接放到时间轮里面,等待触发执行} else {// 1、make ring secondint ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);// 2、push time ringpushTimeRing(ringSecond, jobInfo.getId());// 3、fresh nextrefreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}}scheduleThread.setDaemon(true);scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");scheduleThread.start();}}
}
4.1 JobTriggerPoolHelper.trigger()
当任务达到了触发条件,并不是由调度线程直接去触发执行器的任务执行,调度线程会将这个触发的任务交给线程池去执行,并且Xxl-Job为了进一步优化任务的触发,将这个触发任务执行的线程池划分成快线程池和慢线程池两个线程池
在调用执行器的Http接口触发任务执行的时候,Xxl-Job会去记录每个任务的触发所耗费的时间,当任务一次触发的时间超过500ms,那么这个任务的慢次数就会加1,如果这个任务一分钟内触发的慢次数超过10次,接下来就会将触发任务交给慢线程池去执行,所以快慢线程池就是避免那种频繁触发并且每次触发时间还很长的任务阻塞其它任务的触发的情况发生
public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {//执行addTrigger方法helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);}public void addTrigger(final int jobId,final TriggerTypeEnum triggerType,final int failRetryCount,final String executorShardingParam,final String executorParam,final String addressList) {//选择快慢线程池ThreadPoolExecutor triggerPool_ = fastTriggerPool;AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {//慢线程池triggerPool_ = slowTriggerPool;}triggerPool_.execute(new Runnable() {@Overridepublic void run() {long start = System.currentTimeMillis();try {// do triggerXxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);} catch (Exception e) {logger.error(e.getMessage(), e);}}});}public static void trigger(int jobId,TriggerTypeEnum triggerType,int failRetryCount,String executorShardingParam,String executorParam,String addressList) {......//如果执行器路由策略是分片广播的话if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)&& group.getRegistryList()!=null && !group.getRegistryList().isEmpty()&& shardingParam==null) {//分片执行for (int i = 0; i < group.getRegistryList().size(); i++) {processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());}} else {if (shardingParam == null) {shardingParam = new int[]{0, 1};}processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);}}private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){// 1、save log-id// 2、init trigger-param// 3、init addressString address = null;ReturnT<String> routeAddressResult = null;//分片广播处理if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {if (index < group.getRegistryList().size()) {address = group.getRegistryList().get(index);} else {address = group.getRegistryList().get(0);}} else {//选择路由routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {address = routeAddressResult.getContent();}}} else {routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));}// 4、trigger remote executorReturnT<String> triggerResult = null;if (address != null) {//远程调用执行器执行任务triggerResult = runExecutor(triggerParam, address);} else {triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);}// 5、collection trigger info// 6、save log trigger-info}public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){ReturnT<String> runResult = null;try {//根据注册地址返回ExecutorBiz对象,这里是用map缓存起来,一个地址对应一个ExecutorBiz,也就是ExecutorBizClient实现类//private static ConcurrentMap<String, ExecutorBiz> executorBizRepository = new ConcurrentHashMap<String, ExecutorBiz>();ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);//调用run方法runResult = executorBiz.run(triggerParam);} catch (Exception e) {logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));}return runResult;}@Overridepublic ReturnT<String> run(TriggerParam triggerParam) {//通过post请求远程调用return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);}
4.2 路由策略
XxlJob支持多种路由策略,除了分片广播,其余的具体的算法实现都是通过ExecutorRouter的实现类来实现的
第一个、最后一个、轮询、随机都比较好理解
- 一致性Hash:一致性Hash可以理解,就是Hash函数(hashcode%size)的size保持不变,从而保证了Hash函数的前后一致性
- 最不经常使用(LFU:Least Frequently Used):Xxl-Job内部会有一个缓存,统计每个任务每个地址的使用次数,每次都选择使用次数最少的地址,这个缓存每隔24小时重置一次
- 最近最久未使用(LRU:Least Recently Used):将地址存到LinkedHashMap中,它利用LinkedHashMap可以根据元素访问(get/put)顺序来给元素排序的特性,快速找到最近最久未使用(未访问)的节点
- 故障转移:调度中心都会去请求每个执行器,只要能接收到响应,说明执行器正常,那么任务就会交给这个执行器去执行
- 忙碌转移:调度中心也会去请求每个执行器,判断执行器是不是正在执行当前需要执行的任务(任务执行时间过长,导致上一次任务还没执行完,下一次又触发了),如果在执行,说明忙碌,不能用,否则就可以用
- 分片广播:XxlJob给每个执行器分配一个编号,从0开始递增,然后向所有执行器触发任务,告诉每个执行器自己的编号和总共执行器的数据,我们可以通过
XxlJobHelper#getShardIndex
获取到编号,XxlJobHelper#getShardTotal
获取到执行器的总数据量,分片广播就是将任务量分散到各个执行器,每个执行器只执行一部分任务,加快任务的处理
5、任务执行
前面在说执行器注册的时候有提到,当执行器接收到调度中心的请求时,会把请求交给ExecutorBizImpl
来处理
@Overrideprotected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {bizThreadPool.execute(new Runnable() {@Overridepublic void run() {Object responseObj = process(httpMethod, uri, requestData, accessTokenReq); }});}//包括beat、idleBeat、run、kill、log请求都是通过ExecutorBizImpl来实现的private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {try {switch (uri) {case "/beat":return executorBiz.beat();case "/idleBeat":IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);return executorBiz.idleBeat(idleBeatParam);case "/run":TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);//最终会调用到ExecutorBizImpl的run方法实现return executorBiz.run(triggerParam);case "/kill":KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);return executorBiz.kill(killParam);case "/log":LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);return executorBiz.log(logParam);default:return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");}} catch (Exception e) {logger.error(e.getMessage(), e);return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));}}
executorBiz.run(TriggerParam triggerParam),会根据传入的条件构建出IJobHandler和JobThread,最终会放到队列中等待触发
public ReturnT<String> run(TriggerParam triggerParam) {//根据传入的JobId,加载旧的jobHandler和jobThread(如果一个任务已经执行过一次,会存入jobThreadRepository这个本地的Map里)JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;String removeOldReason = null;//根据不同的GlueType,构建不同的IJobHandlerGlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());//如果类型是Bean的话,则会构建为MethodJobHandlerif (GlueTypeEnum.BEAN == glueTypeEnum) {//直接从map缓存中取,在初始化jobHandler的那一步,都已经全部放入到map中IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());//校验jobThread,如果满足以下条件则会终止if (jobThread!=null && jobHandler != newJobHandler) {removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";jobThread = null;jobHandler = null;}//校验jobHandlerif (jobHandler == null) {jobHandler = newJobHandler;if (jobHandler == null) {return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");}}//如果类型是GLUE的话,则会构建为GlueJobHandler} else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {//如果类型是Script的话,ScriptJobHandler} else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {} else {return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");}//如果jobThread不为null,则根据阻塞策略进行相应的操作if (jobThread != null) {ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);//如果当前阻塞策略是DISCARD_LATER,也就是丢弃后续调度if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {//jobThread正在运行或在触发队列中,则返回错误信息if (jobThread.isRunningOrHasQueue()) {return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());}//如果当前阻塞策略是COVER_EARLY,也就是覆盖之前调度} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {//jobThread正在运行或在触发队列中,则终止旧的任务线程,并将任务线程设置为 nullif (jobThread.isRunningOrHasQueue()) {removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();jobThread = null;}//如果是单机串行SERIAL_EXECUTION,则放入队列中触发} else {// just queue trigger}}//如果jobThread任务线程为null,注册一个新的任务线程if (jobThread == null) {jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);}//将触发参数推送到任务线程的触发队列中,等待执行ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);return pushResult;}
在上面这个方法中,最终会把triggerParam放入TriggerQueue中,那么真正的任务执行是在哪里呢?还是看上面这段代码,XxlJobExecutor.registJobThread
这个方法中,注册了一个新的jobThread,并且通过start方法启动了这个线程。
private static ConcurrentMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<Integer, JobThread>();public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){//new了一个JobThread对象,并且调用start方法启动JobThread newJobThread = new JobThread(jobId, handler);newJobThread.start();logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread); if (oldJobThread != null) {oldJobThread.toStop(removeOldReason);oldJobThread.interrupt();}return newJobThread;}
调用了线程的start方法之后,JobThread的run方法就开始执行。
@Overridepublic void run() {//toStop不为true则循环调用while(!toStop){running = false;idleTimes++;TriggerParam triggerParam = null;try {//从队列triggerQueue中取TriggerParam对象,最多等待三秒triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);if (triggerParam!=null) {running = true;idleTimes = 0;triggerLogIdSet.remove(triggerParam.getLogId());//如果任务设置了超时时间,则new一个FutureTask对象异步执行任务,通过futureTask.get()方法,设置超时时间,然后捕获TimeoutException异常来实现if (triggerParam.getExecutorTimeout() > 0) {Thread futureThread = null;try {FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {@Overridepublic Boolean call() throws Exception {XxlJobContext.setXxlJobContext(xxlJobContext);//通过该方法真正执行任务handler.execute();return true;}});futureThread = new Thread(futureTask);futureThread.start();Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);} catch (TimeoutException e) {XxlJobHelper.log("<br>----------- xxl-job job execute timeout");XxlJobHelper.log(e);// handle resultXxlJobHelper.handleTimeout("job execute timeout ");} finally {futureThread.interrupt();}} else {//如果没有设置超时时间,直接执行任务handler.execute();}//如果任务执行失败或结果丢失,调用XxlJobHelper.handleFail()方法进行处理//如果任务执行成功,记录执行结果信息if (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) {XxlJobHelper.handleFail("job handle result lost.");} else {String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg();tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000)?tempHandleMsg.substring(0, 50000).concat("..."):tempHandleMsg;XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg);}XxlJobHelper.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode="+ XxlJobContext.getXxlJobContext().getHandleCode()+ ", handleMsg = "+ XxlJobContext.getXxlJobContext().getHandleMsg());} else {//空闲次数计数器idleTimes。如果idleTimes达到阈值(30 次),相当于有30次都没有执行对应的任务,就从执行器缓存中删除这个任务if (idleTimes > 30) {if(triggerQueue.size() == 0) {XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");}}}} catch (Throwable e) {if (toStop) {XxlJobHelper.log("<br>----------- JobThread toStop, stopReason:" + stopReason);}//处理异常信息StringWriter stringWriter = new StringWriter();e.printStackTrace(new PrintWriter(stringWriter));String errorMsg = stringWriter.toString();XxlJobHelper.handleFail(errorMsg);XxlJobHelper.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");} finally {if(triggerParam != null) {//如果任务线程未被停止,将推送回调信息push到回调线程TriggerCallbackThreadif (!toStop) {TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.getXxlJobContext().getHandleCode(),XxlJobContext.getXxlJobContext().getHandleMsg() ));} else {//如果任务线程被停止,则推送线程停止信息到回调线程TriggerCallbackThreadTriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.HANDLE_CODE_FAIL,stopReason + " [job running, killed]" ));}}}}//如果线程被停止后触发队列不为空,则推送线程停止信息到回调线程while(triggerQueue !=null && triggerQueue.size()>0){TriggerParam triggerParam = triggerQueue.poll();if (triggerParam!=null) {// is killedTriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.HANDLE_CODE_FAIL,stopReason + " [job not executed, in the job queue, killed.]"));}}}
- JobThread会将任务执行的结果发送到一个内存队列中
- 执行器启动的时候会开启一个处发送任务执行结果的线程:TriggerCallbackThread
- 这个线程会不停地从队列中获取所有的执行结果,将执行结果批量发送给调度中心
- 调用中心接收到请求时,会根据执行的结果修改这次任务的执行状态和进行一些后续的事,比如失败了是否需要重试,是否有子任务需要触发等等
三、推荐阅读
XXL-JOB官网
xxl-job架构原理讲解
XXL-JOB调度算法
xxl-job源码解读