【Java】定时任务 - Timer/TimerTask 源码原理解析

一、背景及使用

日常实现各种服务端系统时,我们一定会有一些定时任务的需求。比如会议提前半小时自动提醒,异步任务定时/周期执行等。那么如何去实现这样的一个定时任务系统呢? Java JDK提供的Timer类就是一个很好的工具,通过简单的API调用,我们就可以实现定时任务。

现在就来看一下java.util.Timer是如何实现这样的定时功能的。

首先,我们来看一下一个使用demo

Timer timer = new Timer(); 
TimerTask task = new TimerTask() { public void run() { System.out.println("executing now!"); } 
};// 延迟 1s 打印一次 
timer.schedule(task, 1000) 
// 延迟 1s 固定时延每隔 1s 周期打印一次
timer.schedule(task, 1000, 1000); 
// 延迟 1s 固定速率每隔 1s 周期打印一次
timer.scheduleAtFixRate(task, 1000, 1000)

基本的使用方法:

  1. 创建一个Timer对象

  2. 创建一个TimerTask对象,在这里实现run方法

  3. 将TimerTask对象作为参数,传入到Timer对象的scheule方法中,进行调度执行。

加入任务的API如下:

  • 一次性任务的API

   // 指定时延后运行// 默认fixed-delay模式,周期时间按上一次执行结束时间计算public void schedule(TimerTask task, long delay) {if (delay < 0)throw new IllegalArgumentException("Negative delay.");sched(task, System.currentTimeMillis()+delay, 0);}// 指定时间点运行public void schedule(TimerTask task, Date time) {sched(task, time.getTime(), 0);}
  • 周期任务的APi:

    // 指定时延后运行,之后以指定周期运行public void schedule(TimerTask task, long delay, long period) {if (delay < 0)throw new IllegalArgumentException("Negative delay.");if (period <= 0)throw new IllegalArgumentException("Non-positive period.");sched(task, System.currentTimeMillis()+delay, -period);}// 指定时间点运行,之后以指定周期运行public void schedule(TimerTask task, Date firstTime, long period) {if (period <= 0)throw new IllegalArgumentException("Non-positive period.");sched(task, firstTime.getTime(), -period);}// 指定时延后运行,之后以指定周期运行// 默认fixedRate模式,周期时间按任务执行开始时间计算public void scheduleAtFixedRate(TimerTask task, long delay, long period) {if (delay < 0)throw new IllegalArgumentException("Negative delay.");if (period <= 0)throw new IllegalArgumentException("Non-positive period.");sched(task, System.currentTimeMillis()+delay, period);}public void scheduleAtFixedRate(TimerTask task, Date firstTime,long period) {if (period <= 0)throw new IllegalArgumentException("Non-positive period.");sched(task, firstTime.getTime(), period);}

可以看到API方法内部都是调用sched方法,其中time参数下一次任务执行时间点,是通过计算得到。period参数为0的话则表示为一次性任务。

二、实现原理

那么我们来看一下Timer内部是如何实现调度的。

2.1、内部结构

先看一下Timer的组成部分:

public class Timer {// 任务队列private final TaskQueue queue = new TaskQueue();// 工作线程,循环取任务private final TimerThread thread = new TimerThread(queue);private final Object threadReaper = new Object() {protected void finalize() throws Throwable {synchronized(queue) {thread.newTasksMayBeScheduled = false;queue.notify(); // In case queue is empty.}}};// Timer的序列号,命名工作线程(静态变量,在启动多个Timer的情况可以用于区分对应的工作线程)private final static AtomicInteger nextSerialNumber = new AtomicInteger(0);  }

Timer有3个重要的模块,分别是 TimerTask, TaskQueue, TimerThread

  • TimerTask,即待执行的任务

  • TaskQueue,任务队列,TimerTask加入后会按执行时间自动排序

  • TimerThread,工作线程,真正循环执行TimerTask的线程

那么,在加入任务之后,整个Timer是怎么样运行的呢?可以看下面的示意图:

2.2、工作线程

  • 创建任务,调用scheule方法
public void schedule(TimerTask task, Date firstTime, long period) { if (period <= 0) throw new IllegalArgumentException("Non-positive period."); sched(task, firstTime.getTime(), -period); 
}
  • 内部调用sched方法

// sched方法的入参是task任务,执行的时间,以及执行周期
private void sched(TimerTask task, long time, long period) {if (time < 0)throw new IllegalArgumentException("Illegal execution time.");// 防止溢出if (Math.abs(period) > (Long.MAX_VALUE >> 1))period >>= 1;// 对queue加锁,避免并发入队synchronized(queue) {if (!thread.newTasksMayBeScheduled)throw new IllegalStateException("Timer already cancelled.");// 对task加锁,避免并发修改synchronized(task.lock) {if (task.state != TimerTask.VIRGIN)throw new IllegalStateException("Task already scheduled or cancelled");task.nextExecutionTime = time;task.period = period;task.state = TimerTask.SCHEDULED;}// 任务入队queue.add(task);/* 如果任务是队列当前第一个任务,则唤醒工作线程这里是因为工作线程处理完上一个任务之后,会 sleep 到下一个 task 的执行时间点。如果有 nextExecutionTime 更早的 task 插队到前面,则需要马上唤醒工作线程进行检查避免 task 延迟执行*/if (queue.getMin() == task)queue.notify();}
}

流程中加了一些锁,用来避免同时加入TimerTask的并发问题。可以看到sched方法的逻辑比较简单,task赋值之后入队,队列会自动按照nextExecutionTime排序(升序,排序的实现原理后面会提到)。

  • 工作线程的mainLoop
public void run() {try {mainLoop();} finally {synchronized(queue) {newTasksMayBeScheduled = false;queue.clear();}}
}/*** 工作线程主逻辑,循环执行*/
private void mainLoop() {while (true) {try {TimerTask task;boolean taskFired; // 标记任务是否应该执行synchronized(queue) {// 如果队列为空,且newTasksMayBeScheduled为true,此时等待任务加入while (queue.isEmpty() && newTasksMayBeScheduled)queue.wait();// 如果队列为空,且newTasksMayBeScheduled为false,说明此时线程应该退出if (queue.isEmpty())break;// 队列不为空,尝试从队列中取task(目标执行时间最早的task)long currentTime, executionTime;task = queue.getMin();synchronized(task.lock) {// 校验task状态if (task.state == TimerTask.CANCELLED) {queue.removeMin();continue;}currentTime = System.currentTimeMillis();executionTime = task.nextExecutionTime;// 当前时间 >= 目标执行时间,说明任务可执行,设置taskFired = trueif (taskFired = (executionTime<=currentTime)) {if (task.period == 0) { // period == 0 说明是非周期任务,先从队列移除queue.removeMin();task.state = TimerTask.EXECUTED;} else { // 周期任务,会根据period重设执行时间,再加入到队列中queue.rescheduleMin(task.period<0 ? currentTime   - task.period: executionTime + task.period);}}}if (!taskFired) // 任务为不需执行状态,则等待queue.wait(executionTime - currentTime);}if (taskFired)  // 任务需要执行,则调用task的run方法执行,这里执行的其实就是调用方创建task时候run方法的逻辑task.run();} catch(InterruptedException e) {}}
}

mainLoop的源码中可以看出,基本的流程如下所示

当发现是周期任务时,会计算下一次任务执行的时间,这个时候有两种计算方式,即前面API中的

  • schedule:period为负值,下次执行时间

  • scheduleAtFixedRate:period为正值

queue.rescheduleMin( task.period<0 ? currentTime - task.period : executionTime + task.period);

2.3、优先队列

当从队列中移除任务,或者是修改任务执行时间之后,队列会自动排序。始终保持执行时间最早的任务在队首。 那么这是如何实现的呢?

看一下TaskQueue的源码就清楚了

class TaskQueue {private TimerTask[] queue = new TimerTask[128];private int size = 0;int size() {return size;}void add(TimerTask task) {if (size + 1 == queue.length)queue = Arrays.copyOf(queue, 2*queue.length);queue[++size] = task;fixUp(size);}TimerTask getMin() {return queue[1];}TimerTask get(int i) {return queue[i];}void removeMin() {queue[1] = queue[size];queue[size--] = null;  // Drop extra reference to prevent memory leakfixDown(1);}void quickRemove(int i) {assert i <= size;queue[i] = queue[size];queue[size--] = null;  // Drop extra ref to prevent memory leak}void rescheduleMin(long newTime) {queue[1].nextExecutionTime = newTime;fixDown(1);}boolean isEmpty() {return size==0;}void clear() {for (int i=1; i<=size; i++)queue[i] = null;size = 0;}private void fixUp(int k) {while (k > 1) {int j = k >> 1;if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime)break;TimerTask tmp = queue[j];  queue[j] = queue[k]; queue[k] = tmp;k = j;}}private void fixDown(int k) {int j;while ((j = k << 1) <= size && j > 0) {if (j < size &&queue[j].nextExecutionTime > queue[j+1].nextExecutionTime)j++; // j indexes smallest kidif (queue[k].nextExecutionTime <= queue[j].nextExecutionTime)break;TimerTask tmp = queue[j];  queue[j] = queue[k]; queue[k] = tmp;k = j;}}void heapify() {for (int i = size/2; i >= 1; i--)fixDown(i);}
}

可以看到其实TaskQueue内部就是基于数组实现了一个最小堆 (balanced binary heap), 堆中元素根据 执行时间nextExecutionTime排序,执行时间最早的任务始终会排在堆顶。这样工作线程每次检查的任务就是当前最早需要执行的任务。堆的初始大小为128,有简单的倍增扩容机制。

2.4、其他方法

TimerTask 任务有四种状态:

  • VIRGIN: 任务刚刚创建,还没有schedule

  • SCHEDULED:任务已经schedule,进入到队列中

  • EXECUTED: 任务已执行/执行中

  • CANCELLED:任务已取消

Timer 还提供了cancelpurge方法

  • cancel,清除队列中所有任务,工作线程退出。

  • purge,清除队列中所有状态置为取消的任务。

2.5、常见应用实现

Java的Timer广泛被用于实现异步任务系统,在一些开源项目中也很常见, 例如消息队列RocketMQ的 延时消息/消费重试 中的异步逻辑。

public void start() {if (started.compareAndSet(false, true)) {super.load();// 新建了一个timerthis.timer = new Timer("ScheduleMessageTimerThread", true);for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {// ...}// 调用了Timer的scheduleAtFixedRate方法this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {if (started.get()) {ScheduleMessageService.this.persist();}} catch (Throwable e) {log.error("scheduleAtFixedRate flush exception", e);}}}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());}
}

上面这段代码是RocketMQ的延时消息投递任务 ScheduleMessageService 的核心逻辑,就是使用了Timer实现的异步定时任务。

三、总结

不管是实现简单的异步逻辑,还是构建复杂的任务系统,Java的Timer确实是一个方便实用,而且又稳定的工具类。从Timer的实现原理,我们也可以窥见定时系统的一个基础实现:线程循环 + 优先队列。这对于我们自己去设计相关的系统,也会有一定的启发。

设计亮点个人总结:

  1. 流程中加了一些锁,用来避免TimeTask同时加入TimerTaskQueue、对TimeTsk修改所带来的并发问题

  2. 通过newTasksMayBeScheduled 来控制工作线程对TimerTaskQueue的执行

  3. 采用小根堆算法来对TimerTaskQueue进行优先级排序

  4. 工作线程通过循环来执行TimerTaskQueue的任务,还判断并处理了Task的状态机及流转

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/189783.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

01:2440----点灯大师

目录 一:点亮一个LED 1:原理图 2:寄存器 3:2440的框架和启动过程 A:框架 B:启动过程 4:代码 5:ARM知识补充 6:c语言和汇编的应用 A:代码 B:分析汇编语言 C:内存空间 7:内部机制 二:点亮2个灯 三:流水灯 四:按键控制LED 1:原理图 2:寄存器配置 3:代码 一:点…

汽车操纵稳定性matlab仿真

1、内容简介 略 14-可以交流、咨询、答疑 2、内容说明 汽车操纵稳定性matlab仿真&#xff0c;包含完整的论文 操纵动力学、两自由度 摘要&#xff1a;当今&#xff0c;仿真技术日益广泛地应用于汽车工程领域&#xff0c;操纵稳定性研究越来越多地使用成熟的计算机仿真理论…

MongoDB基础知识~

引入MongoDB&#xff1a; 在面对高并发&#xff0c;高效率存储和访问&#xff0c;高扩展性和高可用性等的需求下&#xff0c;我们之前所学习过的关系型数据库(MySql,sql server…)显得有点力不从心&#xff0c;而这些需求在我们的生活中也是随处可见的&#xff0c;例如在社交中…

高防CDN:护航网络安全的卓越之选

在当今数字化时代&#xff0c;网络攻击与日俱增&#xff0c;为了确保网站和应用程序的稳定运行&#xff0c;高防CDN&#xff08;高防御内容分发网络&#xff09;应运而生。选择高防CDN的理由不仅源于其强大的防护性能&#xff0c;还体现了其与硬件防火墙异曲同工的奥妙。 选择高…

【蓝桥杯软件赛 零基础备赛20周】第3周——填空题

报名明年4月蓝桥杯软件赛的同学们&#xff0c;如果你是大一零基础&#xff0c;目前懵懂中&#xff0c;不知该怎么办&#xff0c;可以看看本博客系列&#xff1a;备赛20周合集 20周的完整安排请点击&#xff1a;20周计划 文章目录 00. 2023年第14届参赛数据0. 上一周答疑1. 填空…

进亦忧,退亦忧,Github Copilot 集成进入 Visual Studio 带来的思考

开篇想到《岳阳楼记》的结尾&#xff1a; 不以物喜&#xff0c;不以己悲&#xff1b;居庙堂之高则忧其民&#xff1b;处江湖之远则忧其君。是进亦忧&#xff0c;退亦忧。然则何时而乐耶&#xff1f;其必曰&#xff1a;“先天下之忧而忧&#xff0c;后天下之乐而乐”乎。未来30…

【信息安全原理】——传输层安全(学习笔记)

&#x1f4d6; 前言&#xff1a;为保证网络应用&#xff0c;特别是应用广泛的Web应用数据传输的安全性&#xff08;机密性、完整性和真实性&#xff09;&#xff0c;可以在多个网络层次上采取安全措施。本篇主要介绍传输层提供应用数据安全传输服务的协议&#xff0c;包括&…

python异常、模块与包

1.异常 异常&#xff1a;当检测到一个错误时&#xff0c;Python解释器就无法继续执行了&#xff0c;反而出现了一些错误的提示&#xff0c;这就是所谓的“异常”&#xff0c;也就是我们常说的BUG。 1.1捕获异常 基本语法&#xff1a; try:可能发生错误代码 except:如果出现…

数据分析实战 | SVM算法——病例自动诊断分析

目录 一、数据分析及对象 二、目的及分析任务 三、方法及工具 四、数据读入 五、数据理解 六、数据准备 七、模型训练 八、模型应用及评价 一、数据分析及对象 CSV文件——“bc_data.csv” 数据集链接&#xff1a;https://download.csdn.net/download/m0_70452407/88…

阿里云国际站:全球加速GA

文章目录 一、前言 二、阿里云全球加速的概念 三、阿里云全球加速的功能优势 四、阿里云全球加速的原理 五、阿里云全球加速的应用场景 六、写在最后 一、前言 随着互联网的快速发展&#xff0c;网站速度已经成为了用户访问体验的一个重要指标。阿里云加速作为一种新的技…

黑马程序员微服务Docker实用篇

Docker实用篇 0.学习目标 1.初识Docker 1.1.什么是Docker 微服务虽然具备各种各样的优势&#xff0c;但服务的拆分通用给部署带来了很大的麻烦。 分布式系统中&#xff0c;依赖的组件非常多&#xff0c;不同组件之间部署时往往会产生一些冲突。在数百上千台服务中重复部署…

CCNA课程实验-13-PPPoE

目录 实验条件网络拓朴需求 配置实现基础配置模拟运营商ISP配置ISP的DNS配置出口路由器OR基础配置PC1基础配置 出口路由器OR配置PPPOE拨号创建NAT(PAT端口复用) PC1测试结果 实验条件 网络拓朴 需求 OR使用PPPoE的方式向ISP发送拨号的用户名和密码&#xff0c;用户名&#xf…

【案例】超声波测距系统设计

1.1 总体设计 1.1.1 概述 学习了明德扬至简设计法和明德扬设计规范&#xff0c;本人用FPGA设计了一个测距系统。该系统采用超声波进行测量距离再在数码管上显示。在本案例的设计过程中包括了超声波的驱动、三线式数码管显示等技术。经过逐步改进、调试等一系列工作后&#xf…

【数据结构】树与二叉树(十三):递归复制二叉树(算法CopyTree)

文章目录 5.2.1 二叉树二叉树性质引理5.1&#xff1a;二叉树中层数为i的结点至多有 2 i 2^i 2i个&#xff0c;其中 i ≥ 0 i \geq 0 i≥0。引理5.2&#xff1a;高度为k的二叉树中至多有 2 k 1 − 1 2^{k1}-1 2k1−1个结点&#xff0c;其中 k ≥ 0 k \geq 0 k≥0。引理5.3&…

vue,react虚拟dom

Virtual DOM 前言 在传统的Web开发中&#xff0c;直接操作真实的DOM通常是一个昂贵且低效的操作。为了解决这个问题&#xff0c;Virtual DOM&#xff08;虚拟DOM&#xff09;被引入为一个中间层&#xff0c;允许开发者在内存中进行操作&#xff0c;从而避免频繁且不必要的真实D…

深度学习的集体智慧:最新发展综述

一、说明 我们调查了来自复杂系统的想法&#xff0c;如群体智能、自组织和紧急行为&#xff0c;这些想法在机器学习中越来越受欢迎。人工神经网络正在影响我们的日常生活&#xff0c;从执行预测性任务&#xff08;如推荐、面部识别和对象分类&#xff09;到生成任务&#xff08…

git的分支及标签使用及情景演示

目录 一. 环境讲述 二.分支 1.1 命令 1.2情景演练 三、标签 3.1 命令 3.2 情景演示 ​编辑 一. 环境讲述 当软件从开发到正式环境部署的过程中&#xff0c;不同环境的作用如下&#xff1a; 开发环境&#xff1a;用于开发人员进行软件开发、测试和调试。在这个环境中…

【Spring Boot 源码学习】初识 SpringApplication

Spring Boot 源码学习系列 初识 SpringApplication 引言往期内容主要内容1. Spring Boot 应用程序的启动2. SpringApplication 的实例化2.1 构造方法参数2.2 Web 应用类型推断2.3 加载 BootstrapRegistryInitializer2.4 加载 ApplicationContextInitializer2.5 加载 Applicatio…

Codeforces Round 788 (Div. 2) E. Hemose on the Tree(树上构造)

题目 t(t<5e4)组样例&#xff0c;每次给定一个数p&#xff0c; 表示一棵节点数为的树&#xff0c; 以下n-1条边&#xff0c;读入树边 对于n个点和n-1条边&#xff0c;每个点需要赋权&#xff0c;每条边需要赋权&#xff0c; 权值需要恰好构成[1,2n-1]的排列 并且当你赋…

阿里云ACK(Serverless)安装APISIX网关及APISIX Ingress Controller

在k8s上安装apisix全家&#xff0c;通过helm安装很简单&#xff0c;但是会遇到一些问题。 安装 首先登录阿里云控制台&#xff0c;在ACK集群详情页&#xff0c;进入CloudShell&#xff0c;执行下面helm命令安装apisix、apisix-ectd、apisix-dashboard和apisix-ingress-contro…