注意:CyclicBarrier不是AQS的派生类,而是CyclicBarrier内部使用了ReentrantLock.Condition
和CountDownLatch一样,都是计数减为0就可以成功获取锁
和CountDownLatch不同的是:
1:CountDownLatch的await和countdown操作是分开的即锁获取和锁释放是两个不同的操作,即一个线程先countdown,把计数-1,然后再await等待计数降为0,而CyclicBarrier中则合二为一了,就是说直接await,await一个函数做完两件事:countdown 扣减资源,然后await等待计数为02:CyclicBarrier是可充用的,也就是说唤醒一次后CyclicBarrier会重置状态,需要重新扣减计数,直到计数再次为0才会放行,而CountDownLatch则只其作用一次,一旦计数为0,则此后则不再恢复计数,也就是说此后任何请求都会直接放行
CyclicBarrier可重用的意思是内部有一个Generation变量,记录当前是第几代,如果计数为0就表示本轮计数完成了,然后通知所有阻塞的线程,通知完后再创建一个新的generation对象,就表示已经进入下一轮迭代了,其他线程被唤醒后如果检测到当前他休眠时记录的generation和当前的generation不是同一个,就表明generation已经进入下一轮了,所以自己可以成功唤醒了
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;public class CyclicBarrier {private static class Generation {boolean broken = false;}//CyclicBarrier是基于ReentrantLock的条件变量的private final ReentrantLock lock = new ReentrantLock();//条件变量private final Condition trip = lock.newCondition();//记录初始限额,进入下一轮的时候会把计数恢复到初始限额private final int parties;//当计数降到0的时候执行指定的函数,由用户指定private final Runnable barrierCommand;//记录当前是第几轮,进入新的一轮计数时会创建一个新的generationprivate Generation generation = new Generation();//计数,每当count降为0就表明本轮结束了,可以唤醒所有等待的线程了//进入下一轮后会把count恢复到partiesprivate int count;//进入下一轮:通知上一轮所有等待的线程,通知完后会再更新状态以进入下一轮//这个函数在持有锁的情况下才会被调用private void nextGeneration() {//signalAll就是把trip这个condition对应的condition list中的所有node丢到sync list//丢完以后signalAll就结束了,但是其他线程只是丢到了sync list,并没有获取锁//所以其他被唤醒的线程需要重新竞争锁,但是nextGeneration是在有锁的情况下调用的//所以这里signalAll后就可以直接更新generation和count因为其他线程此时没有锁,//是做不了任何事的,所以这里更新generation不会造成并发错误trip.signalAll();count = parties;generation = new Generation();}//打破栅栏,也就是终止等待,强制唤醒所有等待的线程private void breakBarrier() {generation.broken = true;count = parties;//唤醒所有等待的线程,这些线程被唤醒后会检测到generation.broker=true//就明白了是被强制打断了本次计数,所以被唤醒的线程就会直接抛异常trip.signalAll();}//这是CyclicBarrier await的核心,做两件事:1:扣减计数;2:await计数归0private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {final ReentrantLock lock = this.lock;//先获取锁lock.lock();try {//记录本线程刚开始doawait时的generation变量//当线程再次唤醒时如果检测到CyclicBarrier对象的generation变量//和它记录的不是同一个generation变量,那么线程就知道他所在的轮次已经结束了//可能是因为计数归零从而被唤醒、超时、中断异常、被break强制中断等待final Generation g = generation;//在本线程还没开始doawait时就有其他线程执行了breakBarrier强制提前结束了本轮计数if (g.broken)//那么就直接抛异常throw new BrokenBarrierException();//如果本线程已经被中断if (Thread.interrupted()) {//就调用breakBarrier强制唤醒所有等待的线程,强制结束本轮计数breakBarrier();//然后再抛中断异常throw new InterruptedException();}//1:扣减计数int index = --count;//如果计数归零,就表明本轮计数已结束,可以唤醒所有线程if (index == 0) { boolean ranAction = false;try {//当计数结束时,执行预定义的操作final Runnable command = barrierCommand;if (command != null)command.run();ranAction = true;//唤醒所有阻塞的线程,然后更新generation以进入下一轮迭代nextGeneration();return 0;} finally {//如果command不为null并且command.run抛异常了//就表明本轮计数失败,打上broken标记,然后强制唤醒所有线程//而这些被唤醒的线程会检测到broken,然后就抛异常了if (!ranAction)breakBarrier();}}//2:死循环等待被唤醒for (;;) {try {//如果没有开启超时if (!timed)//则直接condition.await,直到被其他线程signaltrip.await();else if (nanos > 0L)//否则awaitNanos,至多await这么多秒nanos = trip.awaitNanos(nanos);//如果await过程中抛出了中断异常} catch (InterruptedException ie) {//如果检测到被唤醒后时的generation和await之前的generation是同一个//则表明还是统一伦次//如果此事没有检测到被其他线程已经抛出broken//即其他人还在正常await,但是本线程已经检测到中断异常了//所以本线程就抛出broken并强制唤醒所有阻塞的线程//也就是说同一轮中只要有任意一个线程抛出了异常,// 就强制结束本轮,所有被唤醒的线程都会检测到broken然后抛异常if (g == generation && ! g.broken) {breakBarrier();throw ie;} else {//否则说明已经进入下一轮了或者已经有人抛出broken了//所以这里就简单标记一下中断Thread.currentThread().interrupt();}}//检查是否是因为broken被唤醒if (g.broken)//如果是则抛异常throw new BrokenBarrierException();//检查是否已经进入下一轮了,如果是的话则说明本轮正常结束了//说明本线程是被正常signal的,也就是说本线程成功结束了本次await//可以继续往下执行了,所以此时就返回本次线程的indexif (g != generation)return index;//如果还是同一轮,则说明还没有结束,因为一个线程只能扣减一次index//如果他在await前扣减完index后index不为0//则说明他不是最后一个到达的,还需要等待其他线程到达//所以这里检测是不是因为超时被唤醒的if (timed && nanos <= 0L) {//如果超时,则表示本轮计数失败//所以这里就强制broken并唤醒所有阻塞的线程//其他线程唤醒后就会检测到broken,然后就会抛出异常了breakBarrier();//抛出超时异常throw new TimeoutException();}}} finally {//释放锁//备注:当线程发现自己到达时index刚好为0即本线程是最后一个到达的,//所以本线程需要唤醒其他线程并更新generation和count//也就是执行nextGeneration()函数,此时还没有释放lock,所以此时是安全的//因为lock.unlock会等到执行完nextGeneration后才会在finally中执行lock.unlock();}}public CyclicBarrier(int parties, Runnable barrierAction) {if (parties <= 0) throw new IllegalArgumentException();this.parties = parties;this.count = parties;this.barrierCommand = barrierAction;}public CyclicBarrier(int parties) {this(parties, null);}public int getParties() {return parties;}public int await() throws InterruptedException, BrokenBarrierException {try {return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe); }}public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException {return dowait(true, unit.toNanos(timeout));}public boolean isBroken() {final ReentrantLock lock = this.lock;lock.lock();try {return generation.broken;} finally {lock.unlock();}}//重置barrierpublic void reset() {final ReentrantLock lock = this.lock;//先获取锁lock.lock();try {//然后强制broken并唤醒所有等待的线程//这些线程被唤醒后会检测到broken,然后就会抛异常breakBarrier();//唤醒其他线程后就更新generation表示进入下一轮nextGeneration(); } finally {//释放锁lock.unlock();}}//表示有多少个线程在等待public int getNumberWaiting() {final ReentrantLock lock = this.lock;lock.lock();try {//parties表示初始限额,count表示还剩几个线程没有到return parties - count;} finally {lock.unlock();}}
}