分布式定时任务系列5:XXL-job中blockingQueue的应用

传送门

分布式定时任务系列1:XXL-job安装

分布式定时任务系列2:XXL-job使用

分布式定时任务系列3:任务执行引擎设计

分布式定时任务系列4:任务执行引擎设计续

 Java并发编程实战1:java中的阻塞队列

引子

 这篇文章的主要目不是讨论XXL-job的使用,而是要通过它的任务线程实现机制来分析java中阻塞队列的应用!

而这一切要从上周某天,公司一个普通的下午说起。

当时一个同事要添加任务,就随口问了一句:“阻塞处理策略选啥?”,我心里一惊,以前没注意过这个地方,每次都是默认的,也就是单机串行

就是下图这个:

 这里面有3个选项:

 众人先是侃侃讨论了半天,后面再去翻了源码来查看,发现了下面的blockingQueue

阻塞队列

什么是阻塞队列

关于java阻塞队列,可以看看java中的阻塞队列。里面例子用到的是有界队列ArrayBlockingQueue,XXL-job里面用的是无界队列LinkedBlockingQueue。不论它们的相似及区别点,最重要的都是拥有阻塞特性。

要分析XXL-job是怎么使用阻塞队列的,就从XXL-job的调度触发机制入手!

什么是XXL-job任务

前面在分布式定时任务系列2:XXL-job使用里面介绍过XXL-job的使用,注册一个任务也很简单,只要在业务代码方法中打上@XxlJob注解即可!下面是官方提供的demo:

/*** 1、简单任务示例(Bean模式)*/@XxlJob("demoJobHandler")public void demoJobHandler() throws Exception {XxlJobHelper.log("XXL-JOB, Hello World.");for (int i = 0; i < 5; i++) {XxlJobHelper.log("beat at:" + i);TimeUnit.SECONDS.sleep(2);}// default success}

开发步骤

1、任务开发:在Spring Bean实例中,开发Job方法;

2、注解配置:为Job方法添加注解 "@XxlJob(value="自定义jobhandler名称", init = "JobHandler初始化方法", destroy = "JobHandler销毁方法")",注解value值对应的是调度中心新建任务的JobHandler属性的值。

3、执行日志:需要通过 "XxlJobHelper.log" 打印执行日志;

4、任务结果:默认任务结果为 "成功" 状态,不需要主动设置;如有诉求,比如设置任务结果为失败,可以通过 "XxlJobHelper.handleFail/handleSuccess" 自主设置任务结果; 

任务注册

对于XXL-job来说,是怎么注册并管理这些任务的呢?

在程序启动的时候,xxl-job-core包中的XxlJobSpringExecutor类由于实现了接口SmartInitializingSingleton,在Bean实例化完成时会调用afterSingletonsInstantiated()方法:

@Overridepublic void afterSingletonsInstantiated() {// init JobHandler Repository (for method)initJobHandlerMethodRepository(applicationContext);// refresh GlueFactoryGlueFactory.refreshInstance(1);// super starttry {super.start();} catch (Exception e) {throw new RuntimeException(e);}}

在initJobHandlerMethodRepository(applicationContext)方法中,现在看一下这个方法的代码:该方法会提取所有SpringBean的实例中方法上添加了XxlJob注解的方法,并进行注册!

private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {if (applicationContext == null) {return;}// init job handler from methodString[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);for (String beanDefinitionName : beanDefinitionNames) {Object bean = applicationContext.getBean(beanDefinitionName);Map<Method, XxlJob> annotatedMethods = null;   // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBeantry {annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),new MethodIntrospector.MetadataLookup<XxlJob>() {@Overridepublic XxlJob inspect(Method method) {return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);}});} catch (Throwable ex) {logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);}if (annotatedMethods==null || annotatedMethods.isEmpty()) {continue;}for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {Method executeMethod = methodXxlJobEntry.getKey();XxlJob xxlJob = methodXxlJobEntry.getValue();// registregistJobHandler(xxlJob, bean, executeMethod);}}// ---------------------- job handler repository ----------------------private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();public static IJobHandler loadJobHandler(String name){return jobHandlerRepository.get(name);}public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);return jobHandlerRepository.put(name, jobHandler);}

注册的方法就是registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod)),会将XxlJob注解标记的方法,提取出对应的方法名,通过反射得到Method信息并实例为对应的MethodJobHandler,最后通过ConcurrentMap来管理,其中key是注任务名称,value为JobHandler实例。

 看到这里,不知你有没有一个疑问:这里的JobHandler里面并没有阻塞队列BlockingQueue,它们是怎么关联起来的呢?所以这里就要讨论一下XXL-job的任务执行!

XXL-job的任务执行

在后台管理手动触发一下测试任务,并把代码打上debug断点:

 首先会发送后台请求:

一路跟踪下去,会进入到XxlJobExecutor.runExecutor(TriggerParam triggerParam, String address)方法,最终是到ExecutorBizImpl.run(TriggerParam triggerParam)方法:

public ReturnT<String> run(TriggerParam triggerParam) {// load old:jobHandler + jobThreadJobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;String removeOldReason = null;// valid:jobHandler + jobThreadGlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());if (GlueTypeEnum.BEAN == glueTypeEnum) {// new jobhandlerIJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());// valid old jobThreadif (jobThread!=null && jobHandler != newJobHandler) {// change handler, need kill old threadremoveOldReason = "change jobhandler or glue type, and terminate the old job thread.";jobThread = null;jobHandler = null;}// valid handlerif (jobHandler == null) {jobHandler = newJobHandler;if (jobHandler == null) {return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");}}} else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {// valid old jobThreadif (jobThread != null &&!(jobThread.getHandler() instanceof GlueJobHandler&& ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {// change handler or gluesource updated, need kill old threadremoveOldReason = "change job source or glue type, and terminate the old job thread.";jobThread = null;jobHandler = null;}// valid handlerif (jobHandler == null) {try {IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());} catch (Exception e) {logger.error(e.getMessage(), e);return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());}}} else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {// valid old jobThreadif (jobThread != null &&!(jobThread.getHandler() instanceof ScriptJobHandler&& ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {// change script or gluesource updated, need kill old threadremoveOldReason = "change job source or glue type, and terminate the old job thread.";jobThread = null;jobHandler = null;}// valid handlerif (jobHandler == null) {jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));}} else {return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");}// executor block strategyif (jobThread != null) {ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {// discard when runningif (jobThread.isRunningOrHasQueue()) {return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());}} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {// kill running jobThreadif (jobThread.isRunningOrHasQueue()) {removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();jobThread = null;}} else {// just queue trigger}}// replace thread (new or exists invalid)if (jobThread == null) {jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);}// push data to queueReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);return pushResult;}

 任务执行过程中,如果发现XXL-job对应的执行线程不存在,会创建一个新new Thread实例,并绑定前面的JobHandler实例,这样对于每一个任务都有一个单独的线程,也就将执行线程JobThread跟任务关联起来了:

public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){JobThread newJobThread = new JobThread(jobId, handler);newJobThread.start();logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread);	// putIfAbsent | oh my god, map's put method return the old value!!!if (oldJobThread != null) {oldJobThread.toStop(removeOldReason);oldJobThread.interrupt();}return newJobThread;}public JobThread(int jobId, IJobHandler handler) {this.jobId = jobId;this.handler = handler;// 初始化阻塞队列this.triggerQueue = new LinkedBlockingQueue<TriggerParam>();this.triggerLogIdSet = Collections.synchronizedSet(new HashSet<Long>());// assign job thread namethis.setName("xxl-job, JobThread-"+jobId+"-"+System.currentTimeMillis());}

而执行任务的最后一步,就是将当前任务入队到刚才的执行线程的BlockingQueue中:

// push data to queueReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);

 而这也就跟文章最前面的代码关联起来了:

/*** new trigger to queue** @param triggerParam* @return*/public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {// avoid repeatif (triggerLogIdSet.contains(triggerParam.getLogId())) {logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());}triggerLogIdSet.add(triggerParam.getLogId());triggerQueue.add(triggerParam);return ReturnT.SUCCESS;}

BlockingQueue的应用

消息入队

至此,就可以看出任务触发的大致逻辑了,用户在admin手工触发任务之后最终会放入到任务对应的队列之中:通过add方法将当前执行参数入队(FIFO),不阻塞队列,如果满了就抛出异常(这也是一个比较大的隐患,如果触了过于频繁,可能会导致OOM,而这也就是为什么要设计不同阻塞策略的原因)。

消息消费

而这些触发的任务怎么消费的呢?其实也是在上面的提到的执行线程JobThread:

  • 当实例化一个线程的时候(com.xxl.job.core.executor.XxlJobExecutor类),会立即启动:调用线程的start()方法

  • 启动之后会调用线程的run()方法(com.xxl.job.core.thread.JobThread类)

  • 在执行线程中,会写一个while的无限循环来不停的从阻塞队列中取出任务

 

  • 循环结束的条件就是上面的阻塞队列:如果是覆盖之前调度将会结束循环(com.xxl.job.core.biz.impl.ExecutorBizImpl类)

 

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/92416.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

二.net core 自动化发布到docker (Jenkins安装之后向导)

目录 ​​​​​​​​​​​​​​ 参考资料&#xff1a;https://www.jenkins.io/doc/book/installing/docker/#setup-wizard Post-installation setup wizard.(安装后安装向导) 基于上一篇文章安装&#xff0c;在安装并运行Jenkins&#xff08;不包括使用Jenkins Opera…

日志采集分析ELK

这里的 ELK其实对应三种不同组件 1.ElasticSearch&#xff1a;基于Java&#xff0c;一个开源的分布式搜索引擎。 2.LogStash&#xff1a;基于Java&#xff0c;开源的用于收集&#xff0c;分析和存储日志的工具。&#xff08;它和Beats有重叠的功能&#xff0c;Beats出现之后&a…

6.3 社会工程学攻击

数据参考&#xff1a;CISP官方 目录 社会工程学攻击概念社会工程学攻击利用的人性 “弱点”典型社会工程学攻击方式社会工程学攻击防护 一、社会工程学攻击概念 什么是社会工程学攻击 也被称为 "社交工程学" 攻击利用人性弱点 (本能反应、贪婪、易于信任等) 进…

Reis过期删除策略

介绍 在Redis中&#xff0c;我们可以为键值对设置有效期&#xff0c;现在面临一个问题&#xff0c;如果一个键值对过期了&#xff0c;那么我们应该怎么删除呢&#xff1f; 我们目前有三种方案&#xff1a; 定时删除&#xff1a;在设置键的过期时间的同时&#xff0c;为此键设…

小游戏扫雷实现教学(详解)

目录 【前言】 一、模块化程序设计&#xff08;多文件编程&#xff09;介绍 1.概述 2.传统编程的方式 3.模块化程序设计的方法 二、扫雷代码设计思路 三、扫雷代码设计 1.创建菜单函数 2.实现9x9扫雷 3.初始化棋盘 4.打印棋盘 5.随机布置雷的位置 6.排查雷的信息 7.回…

【Apollo】推动创新:探索阿波罗自动驾驶的进步(含安装 Apollo的详细教程)

前言 Apollo (阿波罗)是一个开放的、完整的、安全的平台&#xff0c;将帮助汽车行业及自动驾驶领域的合作伙伴结合车辆和硬件系统&#xff0c;快速搭建一套属于自己的自动驾驶系统。 开放能力、共享资源、加速创新、持续共赢是 Apollo 开放平台的口号。百度把自己所拥有的强大、…

2023年7月京东冰箱行业品牌销售排行榜(京东运营数据分析)

作为日常使用的大家电之一&#xff0c;如今我国冰箱产业已渐趋饱满&#xff0c;市场增长有限。今年上半年&#xff0c;冰箱市场整体销额同比去年来看勉强保持小幅增长。不过&#xff0c;7月份&#xff0c;冰箱大盘的销售表现就略显萧条了。 根据鲸参谋电商数据分析平台的相关数…

NPM与外部服务的集成(下)

目录 1、撤消访问令牌 2、在CI/CD工作流中使用私有包 2.1 创建新的访问令牌 持续整合 持续部署 交互式工作流 CIDR白名单 2.2 将令牌设置为CI/CD服务器上的环境变量 2.3 创建并签入特定于项目的.npmrc文件 2.4 令牌安全 3、Docker和私有模块 3.1 背景&#xff1a;运…

Android的学习系列之Android Studio Setup安装

Android的学习系列之Android Studio Setup安装 [TOC](Android的学习系列之Android Studio Setup安装) 前言Android平台搭建总结 前言 还是项目需要&#xff0c;暂时搭建安卓的运行平台。 Android平台搭建 安装包 双击安装包&#xff0c;进入安装。 下一步 根据自己需求&a…

React源码解析18(7)------ 实现事件机制(onClick事件)

摘要 在上一篇中&#xff0c;我们实现了useState的hook&#xff0c;但由于没有实现事件机制&#xff0c;所以我们只能将setState挂载在window上。 而这一篇主要就是来实现事件系统&#xff0c;从而实现通过点击事件进行setState。 而在React中&#xff0c;虽然我们是将事件绑…

机器学习笔记 - 基于PyTorch + 类似ResNet的单目标检测

一、获取并了解数据 我们将处理年龄相关性黄斑变性 (AMD) 患者的眼部图像。 数据集下载地址,从下面的地址中,找到iChallenge-AMD,然后下载。 Baidu Research Open-Access Dataset - DownloadDownload Baidu Research Open-Access Datasethttps://ai.baidu.com/bro…

MAVEN利器:一文带你了解MAVEN以及如何配置

前言&#xff1a; 强大的构建工具——Maven。作为Java生态系统中的重要组成部分&#xff0c;Maven为开发人员提供了一种简单而高效的方式来构建、管理和发布Java项目。无论是小型项目还是大型企业级应用&#xff0c;Maven都能帮助开发人员轻松处理依赖管理、编译、测试和部署等…

禾赛科技Q2营收交付双新高,国产激光雷达从量变到质变

随着2022年激光雷达元年、2023年城市智能辅助驾驶&#xff08;NOA&#xff09;元年相继到来&#xff0c;激光雷达产业迎来爆发期。 今年以来&#xff0c;自动驾驶公司、汽车制造商以及移动出行公司等各路人马积极推动城市级别的智能辅助驾驶全面落地&#xff0c;北京、上海、深…

Windows - UWP - 网络不好的情况下安装(微软商店)MicrosoftStore的应用

Windows - UWP - 网络不好的情况下安装&#xff08;微软商店&#xff09;MicrosoftStore的应用 前言 UWP虽然几乎被微软抛弃了&#xff0c;但不得不否认UWP应用给用户带来的体验。沙箱的运行方式加上微软的审核&#xff0c;用户使用起来非常放心&#xff0c;并且完美契合Wind…

【elasticSearch系】3.完整搭建详尽版elk

话不多说,我们先看下经典的elk 是由哪些组件搭建组合起来的 elasticSearch和kibana搭建 可以查看之前我搭建elasticsearch和kibana 的这篇文章 logstash搭建 为了和之前我搭建elasticsearch和kibana版本保持一致,这里我们还是选择7.17.3 下载地址 点击下载,这里为了方…

207、仿真-51单片机脉搏心率与血氧报警Proteus仿真设计(程序+Proteus仿真+配套资料等)

毕设帮助、开题指导、技术解答(有偿)见文未 目录 一、硬件设计 二、设计功能 三、Proteus仿真图 四、程序源码 资料包括&#xff1a; 需要完整的资料可以点击下面的名片加下我&#xff0c;找我要资源压缩包的百度网盘下载地址及提取码。 方案选择 单片机的选择 方案一&a…

Leetcode-每日一题【剑指 Offer 31. 栈的压入、弹出序列】

题目 输入两个整数序列&#xff0c;第一个序列表示栈的压入顺序&#xff0c;请判断第二个序列是否为该栈的弹出顺序。假设压入栈的所有数字均不相等。例如&#xff0c;序列 {1,2,3,4,5} 是某栈的压栈序列&#xff0c;序列 {4,5,3,2,1} 是该压栈序列对应的一个弹出序列&#xf…

phpstorm配置ftp同步文件到服务器

这里的默认快捷键 不是 CtrlS &#xff1b;需要设置快捷键&#xff0c;这里原来是save all操作时上传文件到服务器&#xff1b; ** 设置好快捷键后按 CtrlS就会同步文件&#xff08;添加删除文件后保存&#xff0c;服务器也会同步&#xff09; ** 搜索出save all 后&#xf…

FreeRTOS模板-开启资源追踪

目录 开启宏定义 使用API函数 演示效果 测试代码 开启宏定义 #define configUSE_TRACE_FACILITY 1 //TODO 查看任务状态#ifndef INCLUDE_uxTaskGetStackHighWaterMark#define INCLUDE_uxTaskGetStackHighWaterMark 1 //TODO 开启堆栈使用剩余量的检测 #…

一种多策略下RabbitMQ的延时队列实现

1.为什么会用到延时队列? 场景: 最近在开发一款系统中遇到这样一个场景,A系统开通套餐需要把套餐信息以邮件的形式发送给相关工作人员,经过人工审核通过后,在B系统里面开通,A系统会调B系统套餐列表接口查询套餐是否开通成功,开通成功则从A系统去完成订单,假如超过设定时间未开…