1、CountDownLatch介绍
CountDownLatch 又称为“倒计数门阀”,但大多数称之为“计数器”,是juc包下的一个工具类;
CountDownLatch 核心功能是:用于一个活多个线程等待其他线程执行完成的一组操作。
CountDownLatch 中有个全局的计数器count,当前主线程会执行 CountDownLatch 的await
方法进入阻塞状态,每当有一个子线程完成任务时,计数器就会减1,当count变为0时表示
所有的子线程都已经执行完成,此时可以唤醒当前阻塞的主线程执行后续操作。
CountDownLatch 是基于AQS的共享锁来完成线程的阻塞和唤醒,线程的阻塞也是阻塞在AQS
的双向链表上的;采用AQS的全局变量state 来记录初始化计数器的数值。
2、核心属性&常用构造方法
CountDownLatch 的核心属性只有一个,即 sync,Sync 是 CountDownLatch的内部类,
其继承AQS,由构造方法可以知道CountDownLatch的基本操作底层实现都是基于AQS的
//同步器private final Sync sync;public CountDownLatch(int count) {//todo 注意:// count 可以等于0,但等于0没有意义,若count==0,则await() 方法是不会挂起的if (count < 0) throw new IllegalArgumentException("count < 0");/*** 从这里可以看出,count初始化和核心操作均由同步器Sync完成*/this.sync = new Sync(count);}
3、使用场景
有以下场景:
如果有三个业务需要并行处理,而且需要知道3个业务全部处理完成了。
对于这个场景,则可以通过 CountDownLatch 来实现。
给 CountDownLatch 设置一个数值3,然后创建3个线程来分别处理这3个任务;
每个线程执行完成后,调用CountDownLatch的 countDown 方法让计数器减1,
(计数器的值是3);主线程则可以在业务处理时调用 CountDownLatch 的await 方法
,让主线程进入挂起阻塞,直到 CountDownLatch 的计数器值编为0后,主线程才会被
唤醒执行后续操作。如下图所示:
4、使用示例
针对上边的场景,写个demo,展示 CountDownLatch 的基本用法
示例代码如下:
public class CountDownLatchExample1 {public static void main(String[] args) throws InterruptedException {//先获取商品编号列表int[] products = getProductsByCategoryID();//使用Stream 流,将商品编号列表中的每个商品转换为 ProductPriceList<ProductPrice> list = Arrays.stream(products).mapToObj(ProductPrice::new).collect(Collectors.toList());//定义CountDownlatch,计数器的数量等于子任务个数CountDownLatch latch = new CountDownLatch(products.length);list.forEach(//为每个商品开启一个计算子任务pp -> new Thread(() -> {System.out.println(pp.getProdID()+" -> start calculate price.");try {//模拟调用其他系统耗时TimeUnit.SECONDS.sleep(current().nextInt(10));if(pp.prodID %2 == 0){pp.setPrice(pp.prodID*0.9D);}else {pp.setPrice(pp.prodID*0.71D);}System.out.println(pp.getProdID()+" -> start calculate completed.");} catch (InterruptedException e) {e.printStackTrace();}finally {//计数器CountDown,子任务执行完成latch.countDown();}}).start());//主线程阻塞,等待所有子任务结束,如果有一个子任务没有结束会一直等待//可以给阻塞加上超时时间latch.await();System.out.println("All of prices calculate finished");list.forEach(System.out::println);}//获取商品编号列表private static int[] getProductsByCategoryID(){//商品列表编号为从1,10的数字return IntStream.rangeClosed(1,10).toArray();}//定义商品类,有2个成员变量:商品编号和商品价格private static class ProductPrice{private final int prodID;//商品编号private double price;//商品价格public ProductPrice(int prodID){this(prodID,-1);}public ProductPrice(int prodID,double price){this.prodID = prodID;this.price = price;}public int getProdID(){return this.prodID;}public void setPrice(double price){this.price = price;}@Overridepublic String toString() {return "ProductPrice{" +"prodID=" + prodID +", price=" + price +'}';}}
}
5、常用方法解析
CountDownLatch 中最常用的方法就3个,即 await、带超时时间的await、countDown 这三个
方法,带超时时间的await方法与普通的await方法功能差不多,主要区别是带超时时间的await
的方法被唤醒由2中方式,即:
1)CountDownLatch计数器的值为0;
2)等待时间超过了超时时间
带超时时间的await方法实现流程就不看了,只看下普通的await方法。
5.1、await() 方法
await 方法功能是判断当前 CountDownLatch 的计数器值是否为0,若为0,则直接唤醒阻塞
的线程执行后续任务,若不为0,则将当前调用await()方法的线程以共享锁的方式放入AQS
的双向链表进行阻塞,一直阻塞到 CountDownLatch 的计数器值是否为0 时被唤醒。
await()方法代码如下:
在await 方法内部,把业务委托给CountDownLatch内部类 Sync的方法
acquireSharedInterruptibly 执行的。其中内部类Sync是 AQS 的一个子类,所以Sync也可以
看成是一个AQS。
还有一点要注意,即:acquireSharedInterruptibly传入的参数 “1” 在这里是没有意义的,可以
不用管。
5.2、acquireSharedInterruptibly(int arg) 方法
acquireSharedInterruptibly 方法功能是获取可中断的共享锁;在获取锁之前先判断当前线程
是否已经被其他线程中断,若已经被中断,则抛出中断异常并退出。
若当前线程没有被中断,则执行 tryAcquireShared() ,若 tryAcquireShared() 方法执行失败
,即 tryAcquireShared() 返回值小于0,则将当前线程放入AQS双向链表中阻塞。
acquireSharedInterruptibly 方法代码如下:
5.3、tryAcquireShared(int arg) 方法
tryAcquireShared 方法字面上的含义是 尝试获取共享锁,但 tryAcquireShared 由各个子类
实现,所以其在每个子类中含义也不一样。
在 CountDownLatch.Sync 中 tryAcquireShared方法的功能是判断 CountDownLatch 计数器
数值是否为0,若为0,则返回1,否则返回-1
tryAcquireShared 代码如下所示:
5.4、countDown() 方法
countDown() 方法没执行一次,CountDownLatch 计数器的值就会减1,直到计数器的值为
0 为止。
countDown() 方法中,把业务逻辑委托给内部类Sync方法 releaseShared 执行。
countDown 方法代码如下:
5.5、releaseShared(int arg) 方法
releaseShared 方法功能是释放共享锁,并唤醒后续阻塞的节点线程;
releaseShared 方法代码如下:
在 releaseShared 我们只需要关注方法 tryReleaseShared 的实现,其他方法的实现请参考
前边的“AQS(二):共享锁的获取和释放”
5.6、tryReleaseShared(int arg)
tryReleaseShared 方法的字面量含义是 尝试释放共享锁,但 tryReleaseShared 是一个默认
方法,其由AQS各个子类实现,其在 CountDownLatch.Sync 中的实现功能是 将计数器的值
减1,若计数器的值减1后为0,则返回true。
tryReleaseShared 方法代码如下:
protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zero 递减计数;信号时过渡到零for (;;) {//获取当前的state值,代表完成整个操作数int c = getState();//c等于0,直接返回false//若c已经为0,表示计数器已经完成,当前线程是无效的if (c == 0)return false;//计算更新值,在 CountDownLatch 中表示计数器减1,有个现成执行完成int nextc = c-1;if (compareAndSetState(c, nextc))//CAS原子的修改state的值//CAS修改成功后,判断当前线程是否是最后一个完成的线程,即nextc是否等于0//若 nextc == 0 此时可能会有线程在await方法处挂起,则需要在countDown 处唤醒return nextc == 0;}}