图解java.util.concurrent并发包源码系列——深入理解定时任务线程池ScheduledThreadPoolExecutor

深入理解定时任务线程池ScheduledThreadPoolExecutor

  • ScheduledThreadPoolExecutor作用与用法
  • ScheduledThreadPoolExecutor内部执行流程
  • DelayedWorkQueue
  • ScheduledFutureTask
  • 源码分析
    • 任务提交
      • ScheduledFutureTask的属性和方法
      • delayedExecute(t)
    • 任务执行
      • ScheduledFutureTask.super.run()
      • ScheduledFutureTask.super.runAndReset()
      • setNextRunTime()
      • reExecutePeriodic(outerTask)
    • DelayedWorkQueue
      • 小顶堆
      • DelayedWorkQueue成员变量
      • void add(Runnable e)
        • void grow()
        • void siftUp(int k, RunnableScheduledFuture<?> key)
      • RunnableScheduledFuture<?> take()
        • RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f)
          • void siftDown(int k, RunnableScheduledFuture<?> key)
  • 总结

ScheduledThreadPoolExecutor作用与用法

ScheduledThreadPoolExecutor是一个用于执行定时任务或延时任务的线程池,提交到该线程池中的任务会等待执行时间到了才会被执行。与此相对的是ThreadPoolExecutor,提交到ThreadPoolExecutor中的任务,只要ThreadPoolExecutor中有空闲线程,就会被马上执行,如果ThreadPoolExecutor中没有空闲线程,则会把任务放入队列。而提交到ScheduledThreadPoolExecutor中的任务,不管此时ScheduledThreadPoolExecutor有没有空闲线程,任务都会被放入到队列里去,等待任务执行时间到期时被线程从队列中取出并执行。

除此以外,ScheduledThreadPoolExecutor是继承自ThreadPoolExecutor的,因此ThreadPoolExecutor有的东西ScheduledThreadPoolExecutor也有。比如任务队列 BlockingQueue<Runnable> workQueue,工作者线程集合 HashSet<Worker> workers。

在这里插入图片描述

ScheduledThreadPoolExecutor一般可以运用在一些非实时性或者非交互性的场景。比如微服务的注册中心就可以通过定时任务扫描没有在规定时间之内续约的服务,将其下线。

在这里插入图片描述

提交到ScheduledThreadPoolExecutor的任务有两种类型,一种时定时任务,一种时延时任务。我们可以调ScheduledThreadPoolExecutor不同的方法来提交不同类型的任务。

如果要提交的任务是延时任务,我们可以调用ScheduledThreadPoolExecutor的schedule方法:

    public ScheduledFuture<?> schedule(Runnable command, // 要执行的任务,不带返回值long delay, // 延迟时间TimeUnit unit) // 时间单位
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,  // 要执行的任务,带返回值long delay, // 延迟时间TimeUnit unit) // 时间单位

在这里插入图片描述

如果要提交定时任务,可以调用ScheduledThreadPoolExecutor的scheduleAtFixedRate方法或者scheduleWithFixedDelay方法:

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, // 要执行的任务,不带返回值long initialDelay, // 延迟时间long period, // 执行周期TimeUnit unit) // 时间单位

在这里插入图片描述

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, // 要执行的任务,不带返回值long initialDelay, // 初始延迟时间long delay, // 后面每次执行完上一次任务,延迟多久执行下一次任务TimeUnit unit) // 时间单位

在这里插入图片描述

scheduleAtFixedRate方法以固定的时间周期period运行任务,前一次任务开始执行后,下一次的任务会在固定的周期period之后再执行。而scheduleWithFixedDelay方法是每次执行完上一次的任务,会延迟一定的时间delay之后再执行下一次的任务。

ScheduledThreadPoolExecutor内部执行流程

  1. 提交到ScheduledThreadPoolExecutor中的任务Runnable,会被封装为一个Task,该Task会被入队列
  2. 然后检查ScheduledThreadPoolExecutor中是否有工作者线程,没有的话要新建一个工作者线程,添加到ScheduledThreadPoolExecutor中的工作者线程集合中
  3. ScheduledThreadPoolExecutor后台会有工作者线程从任务队列中拉取任务,但是ScheduledThreadPoolExecutor中的队列是一个延时队列,所以队列中的任务只有到了执行时间之后才会被执行。
  4. ScheduledThreadPoolExecutor中的延时队列内部是一个小顶堆结构,任务会按照到期时间从小到大进行排序,堆顶是最早到期的任务。
  5. 如果一个任务它是定时任务,那么它被执行完以后,会更新下一次的到期时间,然后重新放回到队列

在这里插入图片描述

DelayedWorkQueue

ScheduledThreadPoolExecutor有一个内部类DelayedWorkQueue,他是一个延时阻塞队列。DelayedWorkQueue用于存放提交到ScheduledThreadPoolExecutor中的任务,DelayedWorkQueue内部使用一个小顶堆存储提交到ScheduledThreadPoolExecutor中的任务,放入DelayedWorkQueue中的任务,会按照到期时间从小到大进行排序,堆顶元素是最早到期的元素。

小顶堆是一个特殊的二叉树。这颗二叉树中的每个父节点,都比它的两个字节点要小。其次这颗二叉树不是真的用二叉树的结构来实现的,而是用一个数组实现的,也就是用一个数组去模拟一个符合小顶堆结构的二叉树。比如父节点是数组下标n,那么左子节点的数组下标是2n+1,右子节点的下标就是2n+2。

每次往DelayedWorkQueue中放入任务时,都会从堆底往堆顶做向上调整。每次从DelayedWorkQueue中获取任务后,堆底任务会被提到堆顶,然后从堆顶到堆底做一次堆的向下调整。

在这里插入图片描述

DelayedWorkQueue中有一个Thread类型的leader变量存放等待堆顶任务到期的线程。因为ScheduledThreadPoolExecutor中可能有许多线程,线程从ScheduledThreadPoolExecutor中获取任务是从DelayedWorkQueue里面的堆顶获取,但是堆顶任务只能由一个线程执行,那么该由哪个线程执行呢?那就是leader线程是谁就由谁执行。

那么其他非leader的线程呢?这些线程就要在DelayedWorkQueue内部的一个Condition类型的条件队列available中进行等待。当一个线程成功从DelayedWorkQueue中取走一个任务时,会唤醒available中的一个线程,此时这个线程就可以去竞争当上leader线程了。

在这里插入图片描述

ScheduledFutureTask

我们提交到ScheduledThreadPoolExecutor中的任务都是Runnable类型的。但是ScheduledThreadPoolExecutor需要标记这些Runnable对象什么时候到期被执行,并且这些Runnable之间要互相比较到期时间,好让它们在DelayedWorkQueue的堆中被从小到大排序。因此ScheduledThreadPoolExecutor使用了ScheduledFutureTask类型去封装被提交进来的Runnable对象。

ScheduledFutureTask对象使用一个time属性记录下一次执行的时间,使用一个sequenceNumber记录自己的序号。ScheduledFutureTask重写了Comparable接口的compareTo方法。compareTo方法与其他任务比较时,首先会比较time属性谁更小,谁的到期时间就更早,那么在堆中的排序就越靠近堆顶,如果time属性相等,则比较谁的sequenceNumber更小。

ScheduledFutureTask还有一个period属性记录执行任务的时间间隔,这个属性可以用于计算下一次的执行时间time。

在这里插入图片描述

源码分析

下面我们进入ScheduledThreadPoolExecutor的内部,阅读ScheduledThreadPoolExecutor的源码,了解它的运行逻辑和核心原理。

任务提交

我们以ScheduledThreadPoolExecutor的scheduleAtFixedRate方法为例,看看任务被提交到ScheduledThreadPoolExecutor之后是怎么处理。

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException();if (period <= 0)throw new IllegalArgumentException();// step1ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));RunnableScheduledFuture<Void> t = decorateTask(command, sft);sft.outerTask = t;// step2delayedExecute(t);return t;}

总体上分两步。

第一步(step1)是把我们传进来的Runnable封装为一个ScheduledFutureTask对象。封装为ScheduledFutureTask对象是为了更方便的计算任务的执行时间,以及在堆中的排序。

ScheduledFutureTask利用一个time属性记录下一次任务的执行时间,调用ScheduledFutureTask的setNextRunTime()方法会自动计算下一次的执行时间并更新time属性。ScheduledFutureTask实现了Comparable接口,compareTo方法会比较两个任务的执行时间谁更快到期,谁的排序就更优先。

然后第二步(step2)就是调用delayedExecute(t),把ScheduledFutureTask对象放入到ScheduledThreadPoolExecutor的队列中,等待执行时间到期被ScheduledThreadPoolExecutor中的Worker线程取走并执行。

ScheduledFutureTask的属性和方法

既然第一步把我们传进来的Runnable对象封装为了一个ScheduledFutureTask,我们看看ScheduledFutureTask内部到底有什么。

    private class ScheduledFutureTask<V>extends FutureTask<V> implements RunnableScheduledFuture<V> {// 序列号,每个ScheduledFutureTask都有一个唯一的序列号,顺序递增private final long sequenceNumber;// 下一次的执行时间private long time;// 执行周期private final long period;// 当前任务执行完后,重新放回队列的任务,一般就是当前任务本身RunnableScheduledFuture<V> outerTask = this;// 当前任务在DelayedWorkQueue的堆数组中的下标int heapIndex;}

以上是ScheduledFutureTask的属性。

sequenceNumber是ScheduledThreadPoolExecutor分配给当前ScheduledFutureTask的序列号,作用就是在比较两个任务的排序优先级时,如果time属性相同,会进一步拿sequenceNumber进行比较

time记录的是当前ScheduledFutureTask下一次执行的时间。

period是任务的执行周期,用于计算下一次的执行时间,计算结果会赋值给time。

outerTask就是当前的ScheduledFutureTask对象自己,用于重回队列时作为参数传递。如果我们设置outerTask为其他的ScheduledFutureTask对象,那么下一次执行的就是不同的任务。如果不做修改的话,就是当前ScheduledFutureTask对象自己。

heapIndex是记录当前ScheduledFutureTask对象在DelayedWorkQueue的堆数组中的下标。有了heapIndex属性之后,就可以很快速的从堆数组中找到对应的ScheduledFutureTask对象,比如我们判断一个ScheduledFutureTask对象是否在堆中,就可以拿到ScheduledFutureTask的heapIndex属性,从堆数组中取出heapIndex下标对应的ScheduledFutureTask对象,判断两个对象是否相等,相等表示当前ScheduledFutureTask对象在堆中,而DelayedWorkQueue的contains方法正是这样的逻辑。

delayedExecute(t)

Runnable对象封装为ScheduledFutureTask对象后,下一步就是要把它放入到队列中,并检查ScheduledThreadPoolExecutor中是否有线程,如果没有要创建一个,保证当前任务有线程执行它。

    private void delayedExecute(RunnableScheduledFuture<?> task) {if (isShutdown())reject(task);else {// step2.1super.getQueue().add(task);if (isShutdown() &&!canRunInCurrentRunState(task.isPeriodic()) &&remove(task))task.cancel(false);else// step2.2ensurePrestart();}}

非核心逻辑不看,就看重点的两行代码。

step2.1:super.getQueue().add(task); 就是把当前ScheduledFutureTask对象放入队列中。super.getQueue()是调用父类ThreadPoolExecutor的getQueue()获取线程池中的阻塞队列,ScheduledThreadPoolExecutor的构造方法创建的队列是DelayedWorkQueue,所以这里获取到的时DelayedWorkQueue。然后调用DelayedWorkQueue的add方法把ScheduledFutureTask对象放入DelayedWorkQueue中。

step2.2:ensurePrestart()在任务放入队列之后被调用,用于检测ScheduledThreadPoolExecutor是否需要创建线程,如果需要的话会创建一个线程。

在这里插入图片描述

由于DelayedWorkQueue的add方法比较复杂,我们放到后面再看,先看完大体流程。这里我们暂时先理解为放入队列中的任务会按到期执行时间从小到大排好序。

下面看看ensurePrestart()方法如何判断是否需要创建线程。

    void ensurePrestart() {int wc = workerCountOf(ctl.get());if (wc < corePoolSize)addWorker(null, true);else if (wc == 0)addWorker(null, false);}

如果ScheduledThreadPoolExecutor中的线程数小于核心线程数,那不管ScheduledThreadPoolExecutor中有没有已经创建的线程,都会再创建一个线程,目的是为了让ScheduledThreadPoolExecutor尽快达到核心线程数。如果已经达到了核心线程数,那么就不会再创建线程。

下面的 else if 分支是当我们设置ScheduledThreadPoolExecutor的核心线程数为0时,会进入的一个逻辑,那么就看ScheduledThreadPoolExecutor中有没有线程,没有就创建,有就不创建。

在这里插入图片描述

创建线程的方法是addWorker方法,这个是ThreadPoolExecutor的方法,会创建一个Thread线程对象,并包装成Worker对象放入到一个 HashSet<Worker> workers 集合中,最后会调用Thread对象的start()方法启动线程。

线程启动后会在一个while循环中不停地从队列中拉取任务并执行,拉取任务就是调用BlockingQueue的take()方法,这里就是DelayedWorkQueue的take()方法。

在这里插入图片描述

由于addWorker方法是ThreadPoolExecutor中的方法,不是本篇文章的重点,而且ThreadPoolExecutor的源码是非常简单的,应该是新手都能看得懂,这里就不展开分析了。

至于DelayedWorkQueue的take()方法稍微有点复杂,我们也是放到后面进行分析。这里我们暂时先理解线程会等待队列中最早到期的任务到期后取走。

任务执行

当任务到期被leader线程从队列中取出后,任务ScheduledFutureTask的run()方法就会被执行。

        public void run() {boolean periodic = isPeriodic();if (!canRunInCurrentRunState(periodic))cancel(false);// 延时任务else if (!periodic)ScheduledFutureTask.super.run();// 定时任务else if (ScheduledFutureTask.super.runAndReset()) {setNextRunTime();reExecutePeriodic(outerTask);}}}

如果是延时任务,那么之后执行一次,不会重回队列,直接调用父类的run()方法运行任务,然后会进入到FutureTask的run()方法中。如果是定时任务,会周期性的运行,调用父类的runAndReset()方法进入到FutureTask的runAndReset()方法,然后调用setNextRunTime()设置下一次的运行时间,然后调用reExecutePeriodic(outerTask)方法把任务重回队列。

在这里插入图片描述

ScheduledFutureTask.super.run()

ScheduledFutureTask的父类是FutureTask,ScheduledFutureTask.super.run() 会进入 FutureTask的run()方法。

    public void run() {if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {// 执行Callable的call()方法,正在执行任务result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;setException(ex);}if (ran)set(result);}} finally {runner = null;int s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}}

FutureTask的run()方法调用Callable的call()方法真正执行任务,这里执行的不是Runnable的run()方法,是因为我们的Runnable对象被转成了一个Callable的实现类,最后会调到我们的Runnable的run()方法。

在这里插入图片描述

ScheduledFutureTask.super.runAndReset()

执行 ScheduledFutureTask.super.runAndReset() 会进入到 FutureTask的runAndReset()方法。

    protected boolean runAndReset() {if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return false;boolean ran = false;int s = state;try {Callable<V> c = callable;if (c != null && s == NEW) {try {// 也是调用Callable的call()方法c.call();ran = true;} catch (Throwable ex) {setException(ex);}}} finally {runner = null;s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}return ran && s == NEW;}

FutureTask的runAndReset()方法也是调用Callable的call()方法,只是不会接收返回值并设置到FutureTask的结果中。

在这里插入图片描述

setNextRunTime()

任务执行完之后,就会调用setNextRunTime()方法更新下一次的执行时间。

        private void setNextRunTime() {long p = period;// 调用scheduleAtFixedRate方法提交进来的任务会进这个分支if (p > 0)time += p;// 调用scheduleWithFixedDelay方法提交进来的任务会进这个分支elsetime = triggerTime(-p);}

setNextRunTime()方法会更新ScheduledFutureTask的time属性为下一次的执行时间,我们调用的scheduleAtFixedRate方法进来的任务,会简单的把time加一个period周期。

在这里插入图片描述

reExecutePeriodic(outerTask)

设置好了任务下一次的运行时间后,就会调用reExecutePeriodic(outerTask)方法把任务重新放回队列中。

    void reExecutePeriodic(RunnableScheduledFuture<?> task) {if (canRunInCurrentRunState(true)) {// 调用DelayedWorkQueue的add方法把任务重新放回队列super.getQueue().add(task);if (!canRunInCurrentRunState(true) && remove(task))task.cancel(false);elseensurePrestart();}}

可以看到就是调用DelayedWorkQueue的add方法把任务重新放回队列。

在这里插入图片描述

以上就是ScheduledThreadPoolExecutor的核心流程,包含了任务被提交到ScheduledThreadPoolExecutor之后的处理,从队列中被取出执行,执行完成后计算下次执行时间然后重回队列的整个过程。

在这里插入图片描述

但是在我们上面的流程分析中,我们跳过了DelayedWorkQueue的相关方法,也就是任务入队列时的DelayedWorkQueue#add方法,和任务出队列时的DelayedWorkQueue#take方法,下面我们就着重分析DelayedWorkQueue的相关源码。

DelayedWorkQueue

小顶堆

要理解DelayedWorkQueue的原理,首先要熟悉小顶堆。

小顶堆是一个特殊的二叉树。它不是全局有序,但是保证每个父节点都比两个子节点要小。而且小顶堆通常不会真的使用树结构来实现,而是用一个数组来模拟了树结构,每个树节点在数组中都有对应的下标,获取子节点也是通过下标换算来取得。

当需要获取父节点的子节点时,假如父节点的下标是n,那么左子节点的下标就是2n+1,右子节点的下标就是2n+2。当需要通过子节点寻找父节点时,假设子节点的下标是n,那么父节点就是 (n - 1) / 2。

当向小顶堆中加入新节点时,先添加在堆底,也就是添加在数组的最末尾。然后通过下标换算找到父节点的位置,与父节点进行比较,发现比父节点小,则跟父节点交换位置,然后继续通过下标换算找到新的父节点,再次比较,直到某一次比较发现比父节点大,则不再交换位置,此时新节点就放在这个位置上,整个过程是不断的把新节点从堆底往上提,所以叫做堆的向上调整。

下面是一个往小顶堆添加新节点的例子:

假设现在有一个小顶堆:[1, 4, 3, 8, 5, 6, 7],我们要往堆中加入值为2的节点。那么首先是把它放到数组尾部(堆底),[1, 4, 3, 8, 5, 6, 7, 2];现在新节点的下标是7,通过下标运算 (7 - 1) / 2 = 3,下标为3的节点就是父节点,与父节点进行比较,发现 2 < 8,因此和父节点交换位置,新节点来到下标为3的位置 [1, 4, 3, 2, 5, 6, 7, 8]

在这里插入图片描述
进入下一轮,再次通过下标换算 (3 - 1) / 2 = 1,找到下标为1的父节点,与父节点进行比较,发现 2 < 4,那么继续跟父节点交换位置,新节点来到下标1的位置 [1, 2, 3, 4, 5, 6, 7, 8]。

在这里插入图片描述
进入下一轮,再次通过下标换算 (1 - 1) / 2 = 0,找到下标为0的父节点,与父节点进行比较,发现 2 > 1,不再跟父节点交换位置,新节点停留在下标为1的位置,调整完后堆就是 [1, 2, 3, 4, 5, 6, 7, 8]。

在这里插入图片描述
以上就是新节点加入小顶堆的过程。我们再来看一个从堆中弹出节点的例子。

还是上面的堆结构,假如我们要从堆中弹出堆顶节点,也就是值为1的节点,那么在把堆顶节点1作为结果返回之前,会进行堆调整使得堆再次符合小顶堆的规则。

首先把堆底节点节点8提到堆顶,然后数组长度减1。

在这里插入图片描述
此时节点8来到下标0的位置,也就是堆顶,开始做向下调整。根据下标换算找到子节点2和3,它要和最小的子节点PK,也就是跟2PK,发现比2大那么8和2交换位置,把2提上来。

在这里插入图片描述
进入下一轮,此时8来到了下标1的位置,通过下标换算找到两个子节点4和5,与最小的子节点4进行PK,发现比4大,继续交换位置。

在这里插入图片描述

此时节点8已经来到堆底了,那么堆的向下调整结束。

在这里插入图片描述

可以发现,通过堆调整,小顶堆总是可以保证堆顶元素最小,并且数值越小的元素,就越靠近堆顶,会越先被取走。如果堆中的节点是定时任务,那么图中节点的数值就是到期时间,小顶堆总是可以保证最快到期的任务总是在堆顶,我们取任务时直接取堆顶任务,判断是否到期即可。

了解了小顶堆的原理,我们就可以来看DelayedWorkQueue的源码了。

DelayedWorkQueue成员变量

    static class DelayedWorkQueue extends AbstractQueue<Runnable>implements BlockingQueue<Runnable> {private static final int INITIAL_CAPACITY = 16;private RunnableScheduledFuture<?>[] queue =new RunnableScheduledFuture<?>[INITIAL_CAPACITY];private final ReentrantLock lock = new ReentrantLock();private int size = 0;private Thread leader = null;private final Condition available = lock.newCondition();}        

queue 是用于存放 ScheduledFutureTask 的堆数组,初始化容量是16,RunnableScheduledFuture是ScheduledFutureTask 的父类。

lock是ReentrantLock可重入锁对象,在向DelayedWorkQueue添加任务元素和获取任务时需要先加锁。

size是当前堆大小,也就是堆中有多少个节点。这个跟数组中的元素个数不一样,数组中的元素个数有可能比size多,但是数组中有可能有一部分是无效元素。

leader线程是等待堆顶任务到期取走的线程。

available是一个Condition条件队列,用于存放排队等待获取任务的线程,非leader线程会在available中一直等待直到被唤醒,leader线程会在available中等待堆顶任务的到期时间(带时长的等待,到期自动唤醒)。

在这里插入图片描述

void add(Runnable e)

接下来看一下add方法,add方法是把一个Runnable对象放入到队列中(这里的Runnable就是ScheduledFutureTask,ScheduledFutureTask 间接继承了Runnable接口)。

        public boolean add(Runnable e) {return offer(e);}

add方法调用了offer方法。

        public boolean offer(Runnable x) {if (x == null)throw new NullPointerException();RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;// 1.先上锁final ReentrantLock lock = this.lock;lock.lock();try {int i = size;// 2.判断是否需要进行数组扩容,如果需要则调用grow方法进行扩容if (i >= queue.length)grow();size = i + 1;// 3.把任务放入堆中if (i == 0) {// 堆中没有任务,则直接放在堆顶,不用做调整queue[0] = e;setIndex(e, 0);} else {// 把任务放入堆中,然后从底向上做调整siftUp(i, e);}// 4.如果堆顶任务是刚刚入队列的任务,那么重置leader,唤醒available中的线程去竞争leaderif (queue[0] == e) {leader = null;available.signal();}} finally {// 5.释放锁lock.unlock();}return true;}

一共分5步:

  1. 先获取ReentrantLock锁,获取到ReentrantLock锁的才能往下进行,获取不到锁的线程要在AQS的同步队列中阻塞等待。
  2. 判断是否需要进行数组扩容,如果需要则调用grow方法进行扩容。
  3. 把任务放入堆中。这里如果判断堆中没有任务,那么直接放到堆顶就可以,没有必要做堆调整;否则就要把任务放到堆底,然后从底往上做堆调整。
  4. 如果发现堆顶任务变成了刚刚添加的任务,那么就要重置leader,唤醒available中的一个线程去竞争leader。因为原来的leader等待的任务已被拉下去了,不再是堆顶任务,自然等待这个任务到期的线程也不能继续当leader。
  5. 最后就是释放锁。

在这里插入图片描述

void grow()

grow()是堆数组进行扩容的方法,当数组中元素的个数已满时,再往堆中添加任务,会调用该方法进行扩容。

        private void grow() {int oldCapacity = queue.length;int newCapacity = oldCapacity + (oldCapacity >> 1);if (newCapacity < 0) // overflownewCapacity = Integer.MAX_VALUE;queue = Arrays.copyOf(queue, newCapacity);}

oldCapacity是老数组的长度,oldCapacity + (oldCapacity >> 1) 就是 oldCapacity的1.5倍,得到新数组长度,然后把老数组的元素复制到新数组。

在这里插入图片描述

void siftUp(int k, RunnableScheduledFuture<?> key)

然后再看一下堆向上调整的方法。

        private void siftUp(int k, RunnableScheduledFuture<?> key) {while (k > 0) {int parent = (k - 1) >>> 1;RunnableScheduledFuture<?> e = queue[parent];if (key.compareTo(e) >= 0)break;queue[k] = e;setIndex(e, k);k = parent;}queue[k] = key;setIndex(key, k);}

k是新任务当前所处的位置,key就是新加入的任务。

while (k > 0)表示一直循环向上调整,直到来到堆顶就停止,或者中途break掉。

int parent = (k - 1) >>> 1; 就是找到当前位置k的父节点的位置,(k - 1) >>> 1 就是 (k - 1) / 2,只不过这里用了位移运算加速。

RunnableScheduledFuture<?> e = queue[parent]; 就是根据父任务下标parent,获取到父任务e。

if (key.compareTo(e) >= 0) break; 这一行的意思就是如果当前任务的过期时间比父任务大,那么调整结束,新任务就放在位置k。因为 key.compareTo(e) 返回大于等于0的数值,表示新任务key的过期时间比父任务e的过期时间大,而当前又是小顶堆,过期时间越小的越靠近堆顶,所以新任务key只能放在父任务e的下面。

queue[k] = e; 就是上面的if分支没有进去,代表新任务key的过期时间比父任务e的过期时间要小,那么新任务key是应该要处于比父任务e更靠上的位置的,因此这里先把父任务e拉下来,也就是是移到新任务的当前位置k,然后新任务key继续向上跟更靠上的父任务做比较。

setIndex(e, k); 就是更新父任务在堆中的位置,

k = parent; 就是更新新任务的在数组中的位置,方便做下一轮的比较。

当跳出while循环后,k的值就是新任务在数组中的位置,queue[k] = key; 就是把新任务放到它该放的位置。

最后 setIndex(key, k); 更新新任务在堆中的位置。

在这里插入图片描述

RunnableScheduledFuture<?> take()

看完add方法,我们再来看take方法。take方法是从队列中获取任务的方法,该方法会阻塞当前线程直到获取到任务为止。

        public RunnableScheduledFuture<?> take() throws InterruptedException {// 1.先获取锁final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {RunnableScheduledFuture<?> first = queue[0];// 2.如果堆顶任务为空,在available中等待if (first == null)available.await();else {long delay = first.getDelay(NANOSECONDS);// 3.如果堆顶任务已经到期了,取走,取走前要调做堆调整if (delay <= 0)return finishPoll(first);first = null;// 4.如果此时leader线程不为null,要在available中等待if (leader != null)available.await();else {// 5.设置当前线程为leader线程,然后在available等待堆顶任务到期,醒来后置空leaderThread thisThread = Thread.currentThread();leader = thisThread;try {available.awaitNanos(delay);} finally {if (leader == thisThread)leader = null;}}}}} finally {// 6.leader为空,堆中还有任务,唤醒available中等待的线程if (leader == null && queue[0] != null)available.signal();// 7.释放锁lock.unlock();}}

总共分7步:

  1. 还是先获取锁,防止并发问题。
  2. 获取锁成功后,就进入一个for循环自旋直到取到任务。如果检查到如果堆顶任务为空,在available中等待。
  3. 堆顶任务不为空,并且堆顶任务已经到期了,那么就取走堆顶任务并执行,但是在取走前要调做堆调整,finishPoll(first)就是进行堆调整的方法。
  4. 堆顶任务没到期,并且此时leader线程不为null,当前线程就要在available中等待。
  5. 堆顶任务未到期,并且此时leader线程为null,那么就设置当前线程为leader线程,然后在available进行带超时时间的等待,等待时间就是堆顶任务的到期时间。到期醒来后,会设置leader为null,然后就是进入下一轮循环时取走堆顶任务。
  6. 最后退出方法前,检查到leader为null,并且堆中还有任务,那么唤醒available中的一个线程。

在这里插入图片描述

RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f)

finishPoll方法做的事情就是在取走堆顶任务之前,对小顶堆做调整,然后才把堆顶任务返回。

        private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {int s = --size;RunnableScheduledFuture<?> x = queue[s];queue[s] = null;if (s != 0)siftDown(0, x);setIndex(f, -1);return f;}

int s = --size; 是堆元素个数减1。

RunnableScheduledFuture<?> x = queue[s]; 获取到的是堆底任务,准备用这个任务来做堆调整。

queue[s] = null; 把堆底置空。

siftDown(0, x); 真正做堆调整的方法,从堆顶(数组下标为0)位置向下做堆调整。

setIndex(f, -1); 设置原先的堆顶任务(也就是即将要作为结果返回的任务)的位置为-1,表示该任务已被取走。

return f; 返回原先的堆顶任务,堆顶任务是作为参数传进来的,原封不动返回出去。

在这里插入图片描述

void siftDown(int k, RunnableScheduledFuture<?> key)

我们再来看一下真正做堆调整的siftDown方法,siftDown方法是拿着原先的堆底任务作为当前任务去做堆的向下调整,它跟我们上面描述的从顶向下做堆调整的流程是一致的。

        private void siftDown(int k, RunnableScheduledFuture<?> key) {int half = size >>> 1;while (k < half) {int child = (k << 1) + 1;RunnableScheduledFuture<?> c = queue[child];int right = child + 1;if (right < size && c.compareTo(queue[right]) > 0)c = queue[child = right];if (key.compareTo(c) <= 0)break;queue[k] = c;setIndex(c, k);k = child;}queue[k] = key;setIndex(key, k);}

int half = size >>> 1; 取到的是堆大小一半的位置half,然后 while (k < half) 循环直到当前位置k过了half 就停止。因为过了位置half就是小顶堆的最底一层的位置,没必要再往下了。

然后再下面 int child = (k << 1) + 1; 就是获取左孩子的位置,也就是 (k * 2)+ 1,然后int right = child + 1;就是右孩子的位置。

if (right < size && c.compareTo(queue[right]) > 0) c = queue[child = right]; c是最终要和当前父节点PK的子任务,这里就是两个子任务PK一下,看谁胜出就拿谁去跟父节点PK。

if (key.compareTo© <= 0) break; 就是如果父节点的任务到期时间比子任务小,那么父节点就放在当前位置k,无需再往下调整。

queue[k] = c; 就是把子节点往上提到当前位置k,因为执行到这里代表当前父节点没有PK过子节点,所以自然要把子节点往上提。

setIndex(c, k); 更新子节点在堆数组的位置记录。

k = child; 当前位置k更新为子节点原先的位置,方便做下一轮调整。

queue[k] = key; 就是在while循环结束后,得到了当前任务该放的位置k,把当前任务放在位置k。

setIndex(key, k); 最后更新当前任务在堆数组的位置记录。

在这里插入图片描述

总结

到这里,整个ScheduledThreadPoolExecutor的原理和源码都分析完毕了。

总体逻辑:

  1. 用了一个延时队列DelayedWorkQueue去存放任务,延时队列又使用一个小顶堆去存放任务,小顶堆中的任务ScheduledFutureTask会按照到期时间time从小到大进行排序。
  2. 当我们提交定时任务到ScheduledThreadPoolExecutor时,任务会被放入小顶堆中,然后会从底往上做堆调整。
  3. 任务放入堆中之后,检查是否有必要创建新线程,如果有必要则创建新线程。
  4. ScheduledThreadPoolExecutor中的线程会不停的循环从DelayedWorkQueue中获取任务并执行。
  5. 线程从DelayedWorkQueue中获取任务时,并不是想取就能取,而是要当上了leader才能取。
  6. leader线程每次取任务都是从堆顶取走一个,而且要等待堆顶任务到期才能取走,堆顶任务还没到期前,leader线程会在available条件队列中等待一定的时间后自动唤醒,其他非leader线程就要在available条件队列中等待leader线程执行完任务唤醒。
  7. 每次取走一个任务后,会拿到堆底任务,从堆顶向下做堆调整。
  8. 定时任务执行完成后,会计算并更新下一次的执行时间,然后重写放回到队列中。

在这里插入图片描述

本篇文章到此全部结束。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/207788.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

(C++)三数之和--双指针法

个人主页&#xff1a;Lei宝啊 愿所有美好如期而遇 算法原理 双指针法&#xff0c;不一定是说就要使用指针&#xff0c;只是一种形象的说法&#xff0c;在数组中&#xff0c;我们一般将数组下标当做指针。我们首先对数组进行排序&#xff0c;从左向右标定一个下标i&#xff0…

​LeetCode解法汇总2661. 找出叠涂元素

目录链接&#xff1a; 力扣编程题-解法汇总_分享记录-CSDN博客 GitHub同步刷题项目&#xff1a; https://github.com/September26/java-algorithms 原题链接&#xff1a;力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台 描述&#xff1a; 给你一个下…

迭代器 iterator

一、什么是 iterator? C中&#xff0c;iterator也被称为迭代器&#xff0c;其主要作用就是指向并访问容器中的元素&#xff0c;其像指针但不是指针。 PS&#xff1a; begin()函数返回一个指向容器第一个元素的迭代器&#xff1b;end()函数返回一个指向容器最后一个元素之后位…

scrapy爬虫中间件和下载中间件的使用

一、关于中间件 之前文章说过&#xff0c;scrapy有两种中间件&#xff1a;爬虫中间件和下载中间件&#xff0c;他们的作用时间和位置都不一样&#xff0c;具体区别如下&#xff1a; 爬虫中间件&#xff08;Spider Middleware&#xff09; 作用&#xff1a; 爬虫中间件主要负…

激光SLAM:Faster-Lio 算法编译与测试

激光SLAM&#xff1a;Faster-Lio 算法编译与测试 前言编译测试离线测试在线测试 前言 Faster-LIO是基于FastLIO2开发的。FastLIO2是开源LIO中比较优秀的一个&#xff0c;前端用了增量的kdtree&#xff08;ikd-tree&#xff09;&#xff0c;后端用了迭代ESKF&#xff08;IEKF&a…

YOLOv8优化策略:SENetV2,squeeze和excitation全面升级,效果优于SENet | 2023年11月最新成果

🚀🚀🚀本文改进: SENetV2,squeeze和excitation全面升级,作为注意力机制引入到YOLOv8,放入不同网络位置实现涨点 🚀🚀🚀YOLOv8改进专栏:http://t.csdnimg.cn/hGhVK 学姐带你学习YOLOv8,从入门到创新,轻轻松松搞定科研; 1.SENetV2 论文:https://arxiv.org/…

FLASK博客系列5——模板之从天而降

我们啰啰嗦嗦讲了4篇&#xff0c;都是在调接口&#xff0c;啥时候能看到漂亮的页面呢&#xff1f;别急&#xff0c;今天我们就来实现。 来我们先来实现一个简单的页面。不多说&#xff0c;上代码。 app.route(/) def index():user {username: clannadhh}return <html>&…

AIGC创作ChatGPT源码+AI绘画(Midjourney绘画)+支持GPT-4-Turbo模型+DALL-E3文生图

一、AI创作系统 SparkAi创作系统是基于ChatGPT进行开发的Ai智能问答系统和Midjourney绘画系统&#xff0c;支持OpenAI-GPT全模型国内AI全模型。本期针对源码系统整体测试下来非常完美&#xff0c;可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。那么如何搭建部署AI…

CentOS 部署 WBO 在线协作白板

1&#xff09;WBO 白板工具介绍 1.1&#xff09;WBO 白板简介 WBO 是一个自由和开源的在线协作白板。它允许多个用户同时在一个虚拟的大型白板上画图。该白板对所有线上用户实时更新&#xff0c;并且状态始终保持。它可以用于许多不同的目的&#xff0c;包括艺术、娱乐、设计和…

012 OpenCV sobel边缘检测

目录 一、环境 二、soble原理介绍 三、源码实验 一、环境 本文使用环境为&#xff1a; Windows10Python 3.9.17opencv-python 4.8.0.74 二、soble原理介绍 Sobel边缘检测是一种广泛应用于图像处理领域的边缘检测算法&#xff0c;它通过计算图像灰度函数在水平方向和垂直…

92基于matlab的引力搜索算法优化支持向量机(GSA-SVM)分类模型

基于matlab的引力搜索算法优化支持向量机&#xff08;GSA-SVM&#xff09;分类模型&#xff0c;以分类精度为优化目标优化SVM算法的参数c和g&#xff0c;输出分类可视化结果及适应度变化曲线。数据可更换自己的&#xff0c;程序已调通&#xff0c;可直接运行。 92 引力搜索算法…

MySQL:找回root密码

一、情景描述 我们在日常学习中&#xff0c;经常会忘记自己的虚拟机中MySQL的root密码。 这个时候&#xff0c;我们要想办法重置root密码&#xff0c;从而&#xff0c;解决root登陆问题。 二、解决办法 1、修改my.cnf配置文件并重启MySQL 通过修改配置文件&#xff0c;来跳…

垃圾回收与内存泄漏

前端面试大全JavaScript垃圾回收与内存泄漏 &#x1f31f;经典真题 &#x1f31f;什么是内存泄露 &#x1f31f;JavaScript 中的垃圾回收 &#x1f31f;标记清除 &#x1f31f;引用计数 &#x1f31f;真题解答 &#x1f31f;总结 &#x1f31f;经典真题 请介绍一下 Jav…

《golang设计模式》第三部分·行为型模式-08-状态模式(State)

文章目录 1. 概念1.1 作用1.1 角色1.2 类图 2. 代码示例2.1 设计2.2 代码2.3 类图 1. 概念 1.1 作用 状态&#xff08;State&#xff09;指状态对象&#xff0c;用于封装上下文对象的特定状态行为&#xff0c;使得上下文对象在内部状态改变时能够改变其自身的行为。 1.1 角色…

Modbus平台:协议中间件(支持Modbus TCP、RTU、ASCII)

该程序可放置外网中&#xff0c;适用于DTU长连接&#xff08;心跳包必须包含DTU&#xff0c;可以是tcp/udp&#xff09;&#xff0c;也可以在内网中&#xff0c;短连接访问设备server 支持协议&#xff1a;Modbus TCP | RTU | ASCII 连接方式&#xff1a;TcpAtive: TCP主动 | …

CGAL的三维曲面网格生成

1、介绍 此程序包提供了一个函数模板&#xff0c;用于计算三角网格&#xff0c;以近似表面。 网格化算法要求仅通过一个能够判断给定线段、直线或射线是否与曲面相交&#xff0c;并且如果相交则计算交点的oracle来了解待网格化的表面。这一特性使该软件包具有足够的通用性&…

windows11 phpstudy_pro php8.2 安装redis扩展

环境&#xff1a;windows11 phpstudy_pro php8.2.9 一、命令查看是否安装redis扩展 在对应网站中通过打开&#xff0c;&#xff0c;选择对应的PHP版本&#xff0c;用命令 php -m 查看自己的php 有没有redis扩展 上面如果有&#xff0c;说明已经安装了,如果没有安装&#xff1…

小米智能摄像头mp4多碎片手工恢复案例

小米智能摄像头mp4多碎片手工恢复案例 智能摄像头目前在市场上极为常见&#xff0c;仅需要一张存储卡即可实现视频、音频的采集&#xff0c;同时可以通过手机APP进行远程控制&#xff0c;相比传统安防品牌成本更低、更容易部署。在智能摄像头品牌中小米算是绝对的大厂&#xf…

道可云会展元宇宙平台全新升级,打造3D沉浸式展会新模式

随着VR虚拟现实、人工智能、虚拟数字人等元宇宙技术的快速发展&#xff0c;各个行业正试图通过元宇宙技术寻求新的发展突破口&#xff0c;会展行业也不例外。会展作为经贸领域的重要产业形态&#xff0c;越来越多的企业和组织开始寻求通过元宇宙技术为展会赋能&#xff0c;以满…

numpy知识库:深入理解numpy的repeat函数和numpy数组的repeat方法

前言 numpy中的repeat函数顾名思义&#xff0c;可以将给定的数组沿着指定的轴重复多次&#xff0c;生成一个新的数组。但具体如何重复呢&#xff1f;本次博文就来探讨并试图回答这个问题&#xff0c;感兴趣的小伙伴可以继续阅读下去&#xff0c;希望对你有所启示~ numpy中的r…