目录
一.synchronized + wait/notify/notifyAll = 线程通信
二.Lock + Condition 实现线程通信
三.Condition实现通信分析
四.JUC工具类的示例
一.synchronized + wait/notify/notifyAll = 线程通信
关于线程间的通信,简单举例下:
1.创建ThreadA传入共享资源对象获取锁,执行业务后wait释放锁。
public class ThreadA extends Thread{private Object lock;public ThreadA(Object lock){this.lock = lock;}@Overridepublic void run() {synchronized (lock) {System.out.println("ThreadA start");try {lock.wait();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("ThreadA end");}} }
2.创建ThreadB获取共享资源对象的锁,执行notify执行业务后会唤醒等待队列中的线程。
public class ThreadB extends Thread{private Object lock;public ThreadB(Object lock){this.lock = lock;}@Overridepublic void run() {synchronized (lock) {System.out.println("ThreadB start");lock.notify();System.out.println("ThreadB end");}} }
3.创建A线程B线程验证,执行结果:
public class WaitNotify {public static void main(String[] args) {Object lock = new Object();ThreadA threadA = new ThreadA(lock);threadA.start(); ThreadB threadB = new ThreadB(lock);threadB.start();} }
执行结果:
线程间wait、notify通信的流程分析如下所示:
二.Lock + Condition 实现线程通信
Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到 Condition对象关联的锁。Condition对象是由Lock对象(调用Lock对象的newCondition()方法)创 建出来的,换句话说,Condition是依赖Lock对象的。
public class ConditionWait implements Runnable{private Lock lock; private Condition condition; public ConditionWait(Lock lock, Condition condition) {this.lock = lock;this.condition = condition;} @Overridepublic void run() {lock.lock();try {System.out.println("start - ConditionWait...");condition.await();System.out.println("end - ConditionWait");} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}} }
public class ConditionNotify implements Runnable{private Lock lock;private Condition condition; public ConditionNotify(Lock lock, Condition condition) {this.lock = lock;this.condition = condition;} @Overridepublic void run() {lock.lock();try {System.out.println("start - ConditionNotify...");condition.signal();System.out.println("end - ConditionNotify...");} finally {lock.unlock();}} }
public class ConditionCommunication {public static void main(String[] args) {Lock lock = new ReentrantLock();Condition condition = lock.newCondition();new Thread(new ConditionWait(lock, condition), "await").start(); // 1new Thread(new ConditionNotify(lock, condition), "signal").start(); // 2} }
输出结果:
如果把1和2顺序交换,ConditionWait的唤醒就会一直阻塞。输出结果:
三.Condition实现通信分析
线程 awaitThread 先通过 lock.lock()方法获取锁成功 后调用了 condition.await 方法进入等待队列,而另一个 线程 signalThread 通过 lock.lock()方法获取锁成功后调用 了 condition.signal 或者 signalAll 方法,使得线程 awaitThread 能够有机会移入到同步队列中,当其他线程释放 lock 后使得线程 awaitThread 能够有机会获取 lock,从而使得线程 awaitThread 能够从 await 方法中退出执行后续操作。如果 awaitThread 获取 lock 失败会直 接进入到同步队列。
阻塞:await()方法中,在线程释放锁资源之后,如果节点 不在 AQS 等待队列,则阻塞当前线程,如果在等待队 列,则自旋等待尝试获取锁 释放:signal()后,节点会从 condition 队列移动到 AQS 等待队列,则进入正常锁的获取流程。
四.JUC工具类的示例
-
CountDownLatch(倒计数门栓):
-
用途:CountDownLatch用于等待一个或多个线程完成某个操作。它初始化一个计数器,然后一个或多个线程等待这个计数器变为零,一旦变为零,等待的线程被唤醒。
-
示例:常见于主线程等待多个工作线程全部完成后再执行的场景。
CountDownLatch latch = new CountDownLatch(3); // 初始化计数器为3 // 启动三个工作线程 new Thread(() -> {// 执行任务latch.countDown(); // 减少计数器 }).start(); // 其他两个线程类似 latch.await(); // 主线程等待计数器变为0
-
-
CyclicBarrier(循环屏障):
-
用途:CyclicBarrier用于多个线程互相等待,直到所有线程都到达某个屏障点,然后同时执行下一步操作。它可重复使用,因此在每个线程完成后,可以重复使用CyclicBarrier等待下一轮。
-
示例:常见于多个线程同时进行某个任务,然后在某个点同步结果。
CyclicBarrier barrier = new CyclicBarrier(3); // 初始化屏障为3 // 启动三个工作线程 new Thread(() -> {// 执行任务barrier.await(); // 等待其他线程到达屏障 }).start(); // 其他两个线程类似
-
-
Semaphore(信号量):
-
用途:Semaphore用于控制对某一资源的访问线程数。它维护一个计数器,每次线程访问资源时,计数器减一,当计数器为零时,其他线程需要等待。
-
示例:常见于限制同时访问某一资源的线程数量的场景。
Semaphore semaphore = new Semaphore(3); // 初始化信号量为3,允许3个线程同时访问 // 启动多个线程尝试访问资源 new Thread(() -> {try {semaphore.acquire(); // 获取资源// 执行任务} catch (InterruptedException e) {e.printStackTrace();} finally {semaphore.release(); // 释放资源} }).start(); // 其他线程类似
-
-
Exchanger(交换器):
-
用途:Exchanger用于两个线程之间交换数据,每个线程将数据放入Exchanger后等待,当两个线程都到达时,它们可以交换数据然后继续执行。
-
示例:常见于两个线程之间协同工作,互相交换数据的场景。
Exchanger<String> exchanger = new Exchanger<>(); // 启动两个线程,交换数据 new Thread(() -> {try {String data = "Thread A data";exchanger.exchange(data); // 等待交换数据// 处理对方线程的数据} catch (InterruptedException e) {e.printStackTrace();} }).start(); // 另一个线程类似
-