目录
1.可能会遇到的线程协作场景
2.Semaphore
3.CountDownLatch
4.CyclicBarrier
1.可能会遇到的线程协作场景
在并发编程中,线程除了独自向前运行,还可能相互之间要进行协作,以保证完成最终总的目标。可能会遇到的几种任务之间的协作:
-
情景1:限定任务数
-
由于资源有限,限制最多有多少个线程进行工作
-
-
情景2:任务之间有依赖关系
-
一个线程依赖于其它线程的执行结果,这个线程就必须等待其它线程执行完成才能继续往下走
-
-
情景3:任务分阶段
-
批量线程分阶段执行,每一个阶段是一个同步点,执行完的线程必须阻塞在同步点上等待同批的其它线程也执行完,再进入下一个阶段。由于阶段可能有多个,所以要用condition来实现。
-
-
情景4:动态调整线程数量
-
不管是情景2也好还是情景3也好,都有可能有动态调整线程的可能性
-
2.Semaphore
semaphore,信号量,用来解决情景1。
业务情景:
我们有一个资源,只允许最多 3 个线程同时访问。我们将使用 Semaphore 来实现这一功能。
代码示例:
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreExample {
private static final int MAX_CONCURRENT_ACCESS = 3;private static final Semaphore semaphore = new Semaphore(MAX_CONCURRENT_ACCESS);
public static void main(String[] args) {for (int i = 0; i < 10; i++) {new Thread(() -> {try {// 获取许可semaphore.acquire();System.out.println(Thread.currentThread().getName() + " 开始访问资源");// 模拟耗时操作TimeUnit.SECONDS.sleep(2);System.out.println(Thread.currentThread().getName() + " 结束访问资源");} catch (InterruptedException e) {Thread.currentThread().interrupt();System.err.println(Thread.currentThread().getName() + " 被中断");} finally {// 释放许可semaphore.release();}}).start();}}
}
semaphore的实现底层其实就是AQS,没抢到资源的线程阻塞在队列中,也分了公平锁和非公平锁,其实现了AQS的共享模式tryAcquireShared(),每次去抢占资源的时候就对state做CAS减法。
acquire()去抢资源,没抢到或者抢失败了就把线程阻塞进CLH队列中:
最终调用到的是tryAcquireShared(),每次去抢占资源的时候就对state做CAS减法:
释放release()就不展开了,也是很简单的。
3.CountDownLatch
CountDownLatch,栅栏,用来解决情景2。
业务场景:
1个主线程需要等待10Worker线程完成工作才能退出。
这时候就要用CountDownLatch:
CountDownLatch countDownLatch=new CountDownLatch(10);
countDownLatch.await();//主线程阻塞在这里
其余Worker线程各自去:
countDownLatch.countDown();//每调用一次countDown,计数就会减1,减到0的时候主线程会被唤醒
countDownLatch也有一个继承AQS的Sync,countDown会去调用AQS共享模式的释放方法releaseShared()
releaseShared会CAS去对state进行-1,当发现state减到0后,会用doReleaseShared唤醒躺在CLH队列中的调用过await()的主线程:
4.CyclicBarrier
业务场景:
一共有10个线程,分阶段执行任务,每一个阶段必须所有10个线程都执行后,才能一同去执行下一个阶段的任务。
代码示例:
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
public class CyclicBarrierDemo {
public static void main(String[] args) throws InterruptedException {// 创建一个 CyclicBarrier 实例CyclicBarrier barrier = new CyclicBarrier(10, () -> {System.out.println("All threads have arrived at the barrier. Moving to the next phase.");});
// 启动 10 个线程for (int i = 0; i < 10; i++) {new Thread(() -> work(barrier)).start();}
// 等待所有线程完成第一个阶段TimeUnit.SECONDS.sleep(2);
// 等待所有线程完成第二个阶段TimeUnit.SECONDS.sleep(2);
// 等待所有线程完成第三个阶段TimeUnit.SECONDS.sleep(2);}
private static void work(CyclicBarrier barrier) {try {// 模拟工作TimeUnit.SECONDS.sleep(1); // 暂停一段时间
// 到达屏障barrier.await();
// 模拟第二阶段的工作TimeUnit.SECONDS.sleep(1); // 暂停一段时间
// 到达屏障barrier.await();
// 模拟第三阶段的工作TimeUnit.SECONDS.sleep(1); // 暂停一段时间
// 到达屏障barrier.await();} catch (InterruptedException | BrokenBarrierException e) {Thread.currentThread().interrupt();System.err.println(Thread.currentThread().getName() + ": Interrupted or barrier broken.");}}
}
上面的代码模拟了三阶段的任务,没执行完一个阶段的任务线程就会调用CyclicBarrier的await()来等待其它的合作伙伴线程,要大家都达到后才会继续向下执行。可以看到CyclicBarrier的await()是线程同步的核心方法。一起来看看源码:
await()里面调用了doawait(),所以doawait才是核心方法:
来看看doawait()的源码:
private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {final ReentrantLock lock = this.lock;lock.lock();try {// 获取当前的 Generation 对象final Generation g = generation;
// 如果屏障已经损坏,则抛出 BrokenBarrierExceptionif (g.broken)throw new BrokenBarrierException();
// 如果线程被中断,则破坏屏障并抛出 InterruptedExceptionif (Thread.interrupted()) {breakBarrier();throw new InterruptedException();}
// 减少 count 计数器的值,表示有一个线程到达了屏障int index = --count;// 如果 count 变为 0,这意味着所有线程都已经到达屏障if (index == 0) { // trippedboolean ranAction = false;try {// 如果设置了屏障动作(回调函数),则执行该动作final Runnable command = barrierCommand;if (command != null)command.run();ranAction = true;// 开始新的屏障周期nextGeneration();// 返回 0 表示当前线程触发了屏障动作return 0;} finally {// 如果未执行屏障动作(回调函数),则破坏屏障if (!ranAction)breakBarrier();}}
// 循环等待其他线程到达for (;;) {//阻塞在condition上(trip是个condition),这样就能将lock释放出来,后面的线程可以继续争抢try {// 如果没有设置超时时间,则等待所有线程到达if (!timed)trip.await();// 如果设置了超时时间,则等待所有线程到达或超时else if (nanos > 0L)nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {// 如果线程被中断且屏障仍然有效,则破坏屏障并抛出 InterruptedExceptionif (g == generation && ! g.broken) {breakBarrier();throw ie;} else {// 如果线程即将完成等待,即使它被中断,也会忽略中断标志并将中断标记传递给后续执行Thread.currentThread().interrupt();}}
// 如果屏障已经损坏,则抛出 BrokenBarrierExceptionif (g.broken)throw new BrokenBarrierException();
// 如果屏障周期已经改变,则返回当前线程的索引if (g != generation)return index;
// 如果设置了超时并且超时时间已到,则破坏屏障并抛出 TimeoutExceptionif (timed && nanos <= 0L) {breakBarrier();throw new TimeoutException();}}} finally {// 释放锁lock.unlock();}
}
其实可以看到上面的逻辑很简单,要是没到齐就先阻塞等待,要是到齐了就调用nextGeneration()去刷新轮次,这个方法里也就是一些资源的重置。