CyclicBarrier
作用
同步辅助类,多个线程互相等待,直到到达同一个同步点,再继续一起执行,可有重复利用,通过构造函数可以设置一个等待的线程数,
线程调用await
方法表示自己已经到达屏障点,然后会等待其他线程也到达。如果当前线程不是最后一个到达的,它将被阻塞在这里,直到所有线程都调用了await
方法。
示例代码
public class Test {public static void main(String[] args) {CyclicBarrier barrier = new CyclicBarrier(3);for (int i = 0; i < 3; i++) {final int threadId = i;new Thread(() -> {try {System.out.println("线程 " + threadId + " 开始执行任务");Thread.sleep((long) (Math.random() * 3000));System.out.println("线程 " + threadId + " 到达屏障点");barrier.await();// 在此处等待,直到3个线程都到达此处开始执行下面代码System.out.println("线程 " + threadId + " 继续执行后续任务");} catch (Exception e) {e.printStackTrace();}}).start();}}
}
使用场景
- 模拟并发测试:在进行并发测试时,需要模拟多个并发请求同时到达某个点的情况,
CyclicBarrier
可以方便地实现这种场景,让所有模拟的并发线程在指定点等待,然后一起继续执行后续的测试操作 - 多线程任务协同:例如在一个并行计算任务中,需要多个线程分别处理不同的数据块,当所有数据块都处理完毕后,再进行汇总或下一步的计算,这时可以使用
CyclicBarrier
来确保所有线程都完成各自的任务后再继续。
实现原理
private final ReentrantLock lock = new ReentrantLock();/** Condition to wait on until tripped */private final Condition trip = lock.newCondition();/** The number of parties */private final int parties;/** The command to run when tripped */private final Runnable barrierCommand;/** The current generation */private Generation generation = new Generation();/*** Number of parties still waiting. Counts down from parties to 0* on each generation. It is reset to parties on each new* generation or when broken.*/private int count;
内部维护一个 ReentrantLock 和一个 Condition trip = lock.newCondition();
Generation 表示第几轮,CyclicBarrier 是可以循环利用的
count 表示等待的线程数
- 调用
await
方法时,调用线程会去获取lock
锁,然后–count,count减 1,表示有一个线程已经到达屏障点。 - 接着判断
count
是否为 0,如果count
不为 0,说明还有线程未到达屏障点,此时当前线程会调用trip.await()
方法进入等待状态,并释放lock
锁,让其他线程有机会获取锁并执行await
操作。 - 当count为0 时,就唤醒所有在trip 等待的线程执行,同时Generation 次数加1