大家好,我是此林。
今天来分享一下CountDownLatch的底层源码。
CountDownLatch 是 Java 并发包 (java.util.concurrent) 中的线程之间同步工具类,主要用于协调多个线程的执行顺序。其核心思想是通过计数器实现线程间的"等待-唤醒"机制,也就是AQS那一套。
话不多说,先上一个例子。
@Slf4j
public class A {public static void main(String[] args) throws InterruptedException {CountDownLatch latch = new CountDownLatch(1);for (int i = 0; i < 5; i++) {new Thread(() -> {try {latch.await();log.info(Thread.currentThread().getName() + ":启动");} catch (InterruptedException e) {throw new RuntimeException(e);}}, "线程-"+i).start();}log.info("准备中...");Thread.sleep(5000);latch.countDown();}
}
这段代码里,5个线程需要等待主线程统一发号施令,然后才会启动执行。
可以看到,主线程休眠5秒后,发号施令,5个线程才各自启动。
1. CountDownLatch的源码
接下来,进入正题,来看下CountDownLatch的源码。
怎么看源码呢?一说到看源码,有些朋友可能觉得源码冗长,眼花缭乱的,下面说下具体方法。
1.1. 先看整体结构。
(快捷键:Alt + 7)
可以看到,CountDownLatch 里还有个 Sync 静态内部类,这个类继承了 AQS,重写了 tryAccquireShared() 和 tryReleaseShared() 方法。
其他方法很常见了吧,之前案例里就用过。
1. CountDownLatch(int n) :构造方法,传入计数器。
2. await() :线程阻塞等待到 CountDownLatch 减到 0。
3. latch.await(1, TimeUnit.SECONDS):线程最多阻塞等待1秒钟,1秒钟过后无论CountDownLatch 是否减到 0,都开始执行。
4. countDown():计数器的值减1。
1.2. 接下来,我们去看各个方法的源码。
1. CountDownLatch(int n)构造方法
CountDownLatch latch = new CountDownLatch(1);
实际上,CountDownLatch构造方法里会实例化静态内部类 Sync。
这个 setState() 方法就是父类AQS里的方法,设置 state 属性。
我们都知道 AQS 里锁的状态、重入等等都是由这个 state 公共变量来维护的。
为了保证多线程环境下内存的可见性,state 变量用了 volatile 关键字修饰。
2. countDown() 方法
具体会执行到这个方法:
这个无非是无限for循环,不断CAS自旋,直到成功、原子性地把state减1。
减一之后如果发现state等于0了,那么就返回true,接下来会去唤醒阻塞队列的线程。
这里会去唤醒阻塞队列的第一个线程,即头结点的后继节点。
问:为什么是头节点的后继?
答:AQS 队列设计,头节点是虚拟节点(不关联线程),仅作为占位符。head (dummy) → Node1 (thread1) → Node2 (thread2) → ...
再问:那为什么只是唤醒第一个线程?不应该唤醒所有线程吗?
答:第一个线程被唤醒了之后,它最后会执行AQS的acquire()方法。
如果是共享模式下,会额外执行这一步,它会自动唤醒下一个线程,也就是会不断链式唤醒,相当于唤醒所有线程了。
3. 再来看 await() 方法
await() 的作用是让当前线程阻塞等待。
如果执行 countdown() 方法发现 CountDownLatch 减到 0了,那就会唤醒阻塞等待的线程。
latch.await();
这个 acquireSharedInterruptibly() 是AQS里的 模板方法 。
这个 acquireSharedInterruptibly() 也很简单,就一个 if 条件判断,如果为 true,就抛异常。
第一个条件:
Thread.interrupted()
如果线程当前已经是中断状态,那直接抛异常,没必要阻塞等待下去了。
第二个条件:
tryAcquireShared(arg) < 0 &&acquire(null, arg, true, true, false, 0L) < 0)
这个 tryAcquireShared(arg) 实际执行的就是 Sync 里重写 的方法,之前我们类的结构图里也看过。tryAcquireShared(arg) 特别简单,就一行代码。
如果 state 计数器为0了,返回1,state计数器不为0,返回-1。
很明显,
tryAcquireShared(arg) < 0
代表 state 计数器不为0,就说明当前线程要阻塞等待了,继续进入 acquire() 方法,加入阻塞队列。
这个 acquire() 方法就是 AQS 的模板方法,它是AQS同步机制的核心方法,实现了独占/共享模式的资源获取逻辑,支持可中断和超时特性。ReentrantLock/Semaphore等同步器的底层都依赖该方法。
3. AQS 的 acquire() 方法
acquire() 方法没啥花头,就是把当前线程通过不断 CAS 自旋直到成功加入阻塞队列。
整体结构上是个 for 无限循环。
流程图如下:
每次for循环,都会执行以下步骤:
1. 前驱节点有效性检查(无需太关注,会自动清理已取消的前驱节点)
2. 如果当前线程是阻塞队列第一个节点,那就去看下state计数器是否已经减到0了。如果减到0了,当前线程出队,调用 unpark() 方法唤醒下一个线程节点。
3. CAS尝试原子性插入阻塞队列尾部。
这个就是调用 park() 阻塞线程,如果传入了超时时间(执行 latch.await(int n, Timeunit unit),就调用 parkNanos()。
2. 总结一下
总结一下,CountDownLatch 本身就是通过 AQS 的 state 公共变量维护一个计数器。
调用 await() 方法,就是把当前线程加入到阻塞队列,直到 state 变量为 0。
调用 countdown() 方法,就是对 state 变量减一,如果减到0了,那么就唤醒阻塞队列的所有线程。
CountDownLatch也有一些缺点,比如:
它只能一次性使用,无法重置计数。如果需要多次使用 CountDownLatch,必须新建实例,不如 CyclicBarrier 可以复用。
今天的分享就到这里了。
关注我吧,我是此林,带你看不一样的世界!