文章目录
- 前言
- CountDownLatch
- CountDownLatch应用
- CountDownLatch核心源码
- Semaphore
- Semaphore应用
- Semaphore核心源码
- CyclicBarrier
- CyclicBarrier应用
- CyclicBarrier核心源码
- 总结
前言
JUC 是Java并发编程工具类库,提供了一些常用的并发工具,例如锁、信号量、计数器、事件循环、线程池、并发集合等。这些工具可以帮助开发人员简化并发编程的复杂性,提高程序效率和可靠性。
CountDownLatch
CountDownLatch应用
CountDownLatch本身就好像一个计数器,可以等待一个或多个线程完成后再执行,其基于AQS实现。
public static void main(String[] args) throws InterruptedException, BrokenBarrierException {CountDownLatch countDownLatch = new CountDownLatch(3);new Thread(() -> {System.out.println("111");countDownLatch.countDown();}).start();new Thread(() -> {System.out.println("222");countDownLatch.countDown();}).start();new Thread(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("333");countDownLatch.countDown();}).start();// 主线会阻塞在这个位置,直到CountDownLatch的state变为0countDownLatch.await();System.out.println("main");
}
CountDownLatch核心源码
// CountDownLatch 的有参构造
public CountDownLatch(int count) {// 健壮性校验if (count < 0) throw new IllegalArgumentException("count < 0");// 构建Sync给AQS的state赋值this.sync = new Sync(count);
}
countDown方法,本质就是调用了AQS的释放共享锁操作
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {// 唤醒在AQS队列中排队的线程。doReleaseShared();return true;}return false;
}// countDownLatch实现的业务
protected boolean tryReleaseShared(int releases) {for (;;) {int c = getState();if (c == 0)return false;// state - 1int nextc = c-1;// 用CAS赋值if (compareAndSetState(c, nextc))return nextc == 0;}
}
// 如果CountDownLatch中的state已经为0了,那么再次执行countDown跟没执行一样。
// 而且只要state变为0,await就不会阻塞线程。
功能都是AQS提供的,只有tryReleaseShared需要实现的类自己去编写业务。
await方法,调用了AQS提供的获取共享锁并且允许中断的方法
// await方法
public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}// AQS获取共享锁并且允许中断的方法
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// countDownLatch操作if (tryAcquireShared(arg) < 0)// 如果返回的是-1,代表state肯定大于0doAcquireSharedInterruptibly(arg);
}// CountDownLatch实现的tryAcquireShared
protected int tryAcquireShared(int acquires) {// state为0,返回1,。否则返回-1return (getState() == 0) ? 1 : -1;
}// 让当前线程进到AQS队列,排队去
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {// 将当前线程封装为Node,并且添加到AQS的队列中final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head) {// 再次走上面的tryAcquireShared,如果返回的是的1,代表state为0int r = tryAcquireShared(arg);if (r >= 0) {// 会将当前线程和后面所有排队的线程都唤醒。setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}
}
Semaphore
Semaphore应用
Semaphore一般用于流控。比如有一个公共资源,多线程都可以访问时,Semaphore可以当作信号量做限制。每当有一个线程获取连接对象时,对信号量-1,当这个线程归还资源时对信号量+1。如果线程拿资源时,发现Semaphore内部的资源个数为0,就会被阻塞。
public static void main(String[] args) throws InterruptedException, BrokenBarrierException {// 声明信号量Semaphore semaphore = new Semaphore(1);// 能否去拿资源semaphore.acquire();// 拿资源处理业务System.out.println("main");// 归还资源semaphore.release();
}
Semaphore核心源码
Semaphore有公平和非公平两种竞争资源的方式。
//
public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}// 设置资源个数,State其实就是信号量的资源个数
Sync(int permits) {setState(permits);
}
在调用 acquire 获取资源时也是基于AQS提供的获取共享锁方法。
release就是将state+1,归还资源。
// 两个一起 阿巴阿巴
public void release() {sync.releaseShared(1);
}public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {// 唤醒在AQS中排队的Node,去竞争资源doReleaseShared();return true;}return false;
}// 信号量实现的归还资源
protected final boolean tryReleaseShared(int releases) {for (;;) {// 拿stateint current = getState();// state + 1int next = current + releases;// 资源最大值,再+1,变为负数if (next < current)throw new Error("Maximum permit count exceeded");// CAS 改一手if (compareAndSetState(current, next))return true;}
}
共享锁在释放资源后,如果头节点为0,无法确认真的没有后继节点。如果头节点为0,需要将头节点的状态修改为-3,当最新拿到锁资源的线程,查看是否有后继节点并且为共享锁,就唤醒排队的线程
CyclicBarrier
CyclicBarrier应用
CyclicBarrier 一般称为栅栏,和CountDownLatch很像。CountDownLatch在操作时,只能使用一次,也就是state变为0之后,就无法再使用了。CyclicBarrier是可以复用的,他的计数器可以归位,然后再处理。而且可以在计数过程中出现问题后,重置当前CyclicBarrier,再次重新操作!
public static void main(String[] args) throws InterruptedException, BrokenBarrierException {// 声明栅栏CyclicBarrier barrier = new CyclicBarrier(3,() -> {System.out.println("开始!");});new Thread(() -> {System.out.println("第一位选手到位");try {barrier.await();System.out.println("第一位往死里跑!");} catch (Exception e) {e.printStackTrace();}}).start();new Thread(() -> {System.out.println("第二位选手到位");try {barrier.await();System.out.println("第二位也往死里跑!");} catch (Exception e) {e.printStackTrace();}}).start();System.out.println("裁判已经到位");barrier.await();
}
CyclicBarrier核心源码
CyclicBarrier没有直接使用AQS,而是使用ReentrantLock,间接的使用AQS
// CyclicBarrier的有参
public CyclicBarrier(int parties, Runnable barrierAction) {、// 健壮性判断!if (parties <= 0) throw new IllegalArgumentException();// parties是final修饰的,需要在重置时,使用!this.parties = parties;// count是在执行await用来计数的。this.count = parties;// 当计数count为0时 ,先执行这个Runnnable!在唤醒被阻塞的线程this.barrierCommand = barrierAction;
}
线程执行await方法,会对count-1,再判断count是否为0,如果不为0,需要添加到AQS中的ConditionObject的Waiter队列中排队,并park当前线程。如果为0,证明线程到齐,需要执行nextGeneration,会先将Waiter队列中的Node全部转移到AQS的队列中,没有后继节点设置为0。然后重置count和broker标记。等到unlock执行后,每个线程都会被唤醒。
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {// 相当于synchronized中使用wait和notifyfinal ReentrantLock lock = this.lock;lock.lock();try {// 里面就是boolean,默认falsefinal Generation g = generation;// 判断之前栅栏加入线程时,是否有超时、中断等问题,如果有,设置boolean为true,其他线程再进来,直接凉凉if (g.broken)throw new BrokenBarrierException();if (Thread.interrupted()) {breakBarrier();throw new InterruptedException();}// 对计数器count--int index = --count;// 如果--完,是0,代表突破栅栏,干活!if (index == 0) { // 默认falseboolean ranAction = false;try {// 如果你用的是2个参数的有参构造,说明你传入了任务,index == 0,先执行CyclicBarrier有参的任务final Runnable command = barrierCommand;if (command != null)command.run();// 设置为trueranAction = true;nextGeneration();return 0;} finally {if (!ranAction)breakBarrier();}}// --完之后,index不是0,代表还需要等待其他线程for (;;) {try {// 如果没设置超时时间。 await()if (!timed)trip.await();// 设置了超时时间。 await(1,SECOND)else if (nanos > 0L)nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {if (g == generation && ! g.broken) {breakBarrier();throw ie;} else {Thread.currentThread().interrupt();}}if (g.broken)throw new BrokenBarrierException();if (g != generation)return index;if (timed && nanos <= 0L) {breakBarrier();throw new TimeoutException();}}} finally {lock.unlock();}
}// 挂起线程
public final void await() throws InterruptedException {// 允许中断if (Thread.interrupted())throw new InterruptedException();// 添加到队列(不是AQS队列,是AQS里的ConditionObject中的队列)Node node = addConditionWaiter();int savedState = fullyRelease(node);int interruptMode = 0;while (!isOnSyncQueue(node)) {// 挂起当前线程LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}
}// count到0,唤醒所有队列里的线程线程
private void nextGeneration() {// 这个方法就是将Waiter队列中的节点遍历都扔到AQS的队列中,真正唤醒的时机,是unlock方法trip.signalAll();// 重置计数器count = parties;// 重置异常判断generation = new Generation();
}
总结
使用这些工具类时需要注意:
- Semaphore的使用要避免死锁和过度同步导致的性能问题。
- CyclicBarrier在屏障点之后的代码要保证所有线程都能正确执行,否则可能导致部分线程一直等待。
- CountDownLatch的countDown方法要保证在所有线程执行完毕之前被调用,否则可能导致部分线程一直等待。
根据具体的应用场景选择合适的工具类,正确使用并合理设计并发策略,可以提高程序的效率和可靠性。