💐个人主页:初晴~
📚相关专栏:多线程 / javaEE初阶
JUC是“Java Util Concurrency”的缩写,指的是Java并发工具包,它位于java.util.concurrent
包及其子包中。JUC包提供了大量用于构建并发应用程序的工具和类,这些工具和类使得开发者能够更容易地编写多线程和并发程序。接下来就让我们深入研究一下JUC的一些常用类吧
目录
一、Callable接口
二、ReentrantLock
三、同步器
1、信号量 Semaphore
2、CountDownLatch
四、并发集合
1、CopyOnWrite 集合类
2、ConcurrentHashMap
总结
一、Callable接口
我们还记得Runnable接口是用来描述一个任务的,Callable接口也是类似的,不同的是,Callable允许任务在执行结束后返回一个结果,并且可以抛出异常。相当于把线程封装了一个返回值,通过这种方式,可以灵活地处理异步任务,并获得它们的执行结果。
Callable<V>
接口是一个泛型接口,其中V
代表任务执行后返回的结果类型。Callable
接口只有一个抽象方法call()
,这个方法返回一个V
类型的值,并且可以抛出异常。
public interface Callable<V> {V call() throws Exception;
}
Callable接口与Runnable接口的区别
- 有返回值:
Callable
接口的任务可以返回一个结果,而Runnable
接口的任务没有返回值。- 可以抛出异常:
Callable
接口的call()
方法可以声明抛出异常,而Runnable
接口的run()
方法不可以抛出检查异常。- FutureTask:
Callable
任务通常会被包装成FutureTask
对象,以便于获取任务的执行结果。Runnable
任务则不需要这样的包装,可以直接提交给Executor
执行。
为了方便理解,让我们看一下下面这个情景:
创建一个线程,让这个线程计算1+2+3+...+1000
如果是采用之前Runnable的方式的话,为了得到执行结果,我们需要定义一个成员变量,来负责存储数据:
public class Main {private static int result;public static void main(String[] args) throws InterruptedException {Thread t=new Thread(()->{for (int i = 0; i < 1000; i++) {result+=i;}});t.start();t.join();System.out.println(result);}
}
这样程序就会引入了新的依赖,主线程和 t 线程与成员变量之间的耦合度就会比较高,不利于后期代码维护。这时使用Callable就能很好地解决这一问题:
public class Main {public static void main(String[] args) throws ExecutionException, InterruptedException {Callable<Integer> callable=new Callable<Integer>() {@Overridepublic Integer call() throws Exception {int result=0;for (int i = 0; i < 1000; i++) {result+=i;}return result;}};FutureTask<Integer> futureTask=new FutureTask<>(callable);Thread t=new Thread(futureTask);t.start();System.out.println(futureTask.get());}
}
在上述代码中,我们使用FutureTask来包装一个Callable任务,并在线程 t 中执行它。通过调用get()方法,我们可以在主线程中等待任务完成并获取其结果。
注意:
futureTask 的 get() 方法具有阻塞功能,当 t 线程未执行完时,会进入阻塞等待,直到 t 线程执行完毕后才会返回结果。可以这么理解,get() 就相当于 “带有返回结果” 的 join()
Callable
接口的任务也可以直接通过ExecutorService
提交执行,并通过Future
对象获取结果:
public class CallableExample {public static void main(String[] args) {// 创建一个Callable接口的实现类Callable<Integer> callableTask = () -> {int result = 0;for (int i = 0; i < 1000; i++) {result += i;}System.out.println("计算结束");return result;};// 创建一个ExecutorServiceExecutorService executor = Executors.newSingleThreadExecutor();// 提交Callable任务并获取Future对象Future<Integer> future = executor.submit(callableTask);// 获取任务的结果try {Integer result = future.get(); // get()方法会阻塞,直到任务完成System.out.println("计算结果: " + result);} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}// 关闭ExecutorServiceexecutor.shutdown();}
}
二、ReentrantLock
特点:
- 可中断的等待:当线程等待锁时,可以响应中断请求。
- 公平与非公平锁:公平锁按照请求锁的顺序来获取锁,而非公平锁可能会让后来的请求抢占先前请求的锁。
- 条件变量:
ReentrantLock
提供了Condition
对象作为条件变量,用于更精细的线程间同步。- 可重入性:线程可以多次获取同一个锁而不会造成死锁。
- 显式的锁释放:必须显式地调用
unlock()
方法来释放锁,否则可能会导致资源泄露。
基本使用:
1、创建实例
public class LockExample {private final ReentrantLock lock = new ReentrantLock();
}
这里创建了一个默认的非公平锁。如果你想创建一个公平锁,可以传递一个参数给构造函数:
private final ReentrantLock lock = new ReentrantLock(true); // 公平锁
2、锁的获取与释放
lock.lock(); // 获取锁
try {// 执行临界区代码
} finally {lock.unlock(); // 释放锁
}
注意:
在
finally
块中释放锁是非常重要的,这样可以确保即使在发生异常的情况下锁也能被正确释放。
3、tryLock机制
ReentrantLock
提供了一个tryLock
方法,它试图获取锁而不阻塞。如果锁可用,则立即获取锁并返回true
;如果锁不可用,则不获取锁并返回false,通过返回值来反馈加锁是否成功,可以由程序员自行决定后续操作,而不是一味地“死等”,提供了更高的可操作空间。
if (lock.tryLock()) {try {// 执行临界区代码} finally {lock.unlock();}
} else {// 锁不可用时的操作
}
4、唤醒机制
ReentrantLock
通过newCondition()
方法创建Condition
对象,用于实现更复杂的同步控制。Condition
对象提供了比 Object
类的 wait()
和 notify()
方法更高级的线程同步功能,包括更精确的线程唤醒控制和多个等待集的支持。
基本方法
- await():使当前线程进入等待状态,并释放锁。当其他线程调用相应的
signal()
或signalAll()
方法唤醒该线程时,它会重新竞争获取锁并继续执行。- signal():唤醒在当前条件队列上等待的一个线程。被唤醒的线程会重新竞争获取锁,并继续执行。
- signalAll():唤醒在当前条件队列上等待的所有线程。被唤醒的线程会重新竞争获取锁,并继续执行。
private final Condition condition = lock.newCondition();// 通知其他等待线程
lock.lock();
try {condition.signal();
} finally {lock.unlock();
}// 等待通知
lock.lock();
try {condition.await();
} catch (InterruptedException e) {Thread.currentThread().interrupt();
} finally {lock.unlock();
}
ReentrantLock
和 synchronized的区别:
-
使用方式:
synchronized
是 Java 语言的关键字,是 JVM 内部实现的(⼤概率是基于 C++ 实现),它直接集成在 Java 语言的语法结构中,可以用作修饰符来声明方法或代码块。ReentrantLock
是java.util.concurrent.locks
包中的一个类,在 JVM 外实现的(基于 Java 实现),它需要显式地创建实例,并调用lock()
和unlock()
方法来获取和释放锁。
-
显式/隐式锁释放:
synchronized
会在线程退出作用域时自动释放锁,无需显式地调用unlock()
方法。ReentrantLock
必须显式地调用unlock()
方法来释放锁,如果没有释放锁就会导致资源泄露。
-
可中断等待:
synchronized
不支持中断等待,如果一个线程因为等待锁而被阻塞,则无法响应中断请求。ReentrantLock
支持中断等待,可以通过tryLock
方法尝试获取锁,如果锁不可用则可以选择性地返回,也可以通过lockInterruptibly()
方法等待锁,此时可以响应中断请求。
-
公平锁/非公平锁:
synchronized
只能是非公平锁。ReentrantLock
支持选择公平锁或非公平锁,可以通过构造⽅法传⼊⼀个 true 开启公平锁模式。公平锁保证了按照请求锁的顺序来获取锁,而非公平锁可能会让后来的请求抢占先前请求的锁。
-
唤醒机制:
synchronized
是通过Object
类的wait()
和notify()
方法来实现等待-唤醒的功能,但每次只能随机唤醒一个等待的线程。ReentrantLock
通过Condition
类实现等待-唤醒,可以更精确控制唤醒某个指定的线程。
-
可重入性:
synchronized
和ReentrantLock
都支持可重入性,即同一线程可以多次获取同一个锁而不会造成死锁。
-
锁的粒度控制:
synchronized
锁定的是整个方法或代码块,没有提供更细粒度的锁定控制。ReentrantLock
可以通过不同的锁对象来实现更细粒度的控制。
-
性能考虑:
- 在某些情况下,
synchronized
可能由于 JVM 的优化而比ReentrantLock
更快。 ReentrantLock
因为其灵活性和高级特性,在复杂情况下可能比synchronized
更适合。
- 在某些情况下,
如何选择使⽤哪个锁?
- 锁竞争不激烈的时候, 使⽤ synchronized, 效率更⾼, ⾃动释放更⽅便.
- 锁竞争激烈的时候, 使⽤ ReentrantLock, 搭配 trylock 更灵活控制加锁的⾏为, ⽽不是死等.
- 如果需要使⽤公平锁, 使⽤ ReentrantLock
基于 ReentrantLock 实现生产者消费者模式:
public class ProducerConsumerExample {private LinkedList<Integer> queue = new LinkedList<>();private int capacity = 10;private final ReentrantLock lock = new ReentrantLock();private final Condition notFull = lock.newCondition();private final Condition notEmpty = lock.newCondition();public void put(int value) throws InterruptedException {lock.lock();try {// 如果队列已满,则等待while (queue.size() == capacity) {notFull.await();}// 生产数据queue.addLast(value);System.out.println("Produced: " + value);// 唤醒消费者notEmpty.signal();} finally {lock.unlock();}}public int take() throws InterruptedException {lock.lock();try {// 如果队列为空,则等待while (queue.isEmpty()) {notEmpty.await();}// 消费数据int value = queue.removeFirst();System.out.println("Consumed: " + value);// 唤醒生产者notFull.signal();return value;} finally {lock.unlock();}}
}
在上述代码中,我们使用 ReentrantLock
来保护对队列的访问,并使用两个 Condition
对象 notFull
和 notEmpty
来分别控制队列满和队列空的情况。生产者在队列满时等待,消费者在队列空时等待,只有当条件满足时才会唤醒相应的线程继续执行
三、同步器
1、信号量 Semaphore
信号量通常用于控制对共享资源的访问次数或者限制系统中的某个操作可以同时进行的最大次数,用来表示“可用资源的个数”,本质上就是一个计数器。Semaphore
提供了许可(permit)的概念,允许一定数量的线程获取许可,超过这个数量的线程将被阻塞,直到有许可被释放。
Semaphore的主要方法:
acquire()
:获取一个许可。如果没有可用的许可,则当前线程会被阻塞,直到有许可可用。release()
:释放一个许可,将其返回给信号量。availablePermits()
:返回当前可用的许可数量。drainPermits()
:从信号量中获取所有可用的许可,并将它们释放回信号量。reducePermits(int reduction)
:减少信号量的许可数量。tryAcquire()
:尝试获取一个许可。如果没有可用的许可,则立即返回false
。tryAcquire(long timeout, TimeUnit unit)
:尝试获取一个许可,但只等待指定的时间。如果在等待时间内没有可用的许可,则立即返回false
。
其中获取资源被称之为 P 操作,会让·“计数器”-1;释放资源则被称之为 V 操作,会让“计数器”+1
创建实例:
public class SemaphoreExample {public static void main(String[] args) {Semaphore semaphore = new Semaphore(5); // 初始许可数量为5}
}
当许可数量为1时,甚至可以用它来实现 “锁” 的功能,因为此时当线程 t1 获取当资源时,此时剩余资源就会为零,当其它线程申请资源时就会进入阻塞等待,等到 t1 执行完毕释放资源时,其它资源才有机会竞争获取资源,从而达到了类似于加锁的操作:
public class Main {private static int count=0;public static void main(String[] args) throws InterruptedException {//创建 Semaphore 实例Semaphore semaphore=new Semaphore(1);Thread t1=new Thread(()->{for(int i=0;i<5000;i++){try {//申请资源semaphore.acquire();count++;//释放资源semaphore.release();} catch (InterruptedException e) {throw new RuntimeException(e);}}});Thread t2=new Thread(()->{for(int i=0;i<5000;i++){try {//申请资源semaphore.acquire();count++;//释放资源semaphore.release();} catch (InterruptedException e) {throw new RuntimeException(e);}}});t1.start();t2.start();t1.join();t2.join();System.out.println(count);}
}
相比于传统的互斥锁,Semaphore 可以自由地设定资源数量,更加具有灵活性,可以很容易地应用于多种并发场景中,还是十分有用的。
2、CountDownLatch
实际开发中,很多时候需要把一个大任务,拆分成多个小任务,一般会采用多线程/线程池来执行,但在多个线程的并发执行下,该如何判断总任务是否完成了呢?这时候就需要用到 CountDownLatch 了。
就好比跑步比赛时,需要每位选手都就位后,发令枪才会响起,所有选手都通过终点,才能公布成绩。CountDownLatch 提供了一个用于协调多个线程的计数器,通常用于等待一组操作完成之后再继续执行其他操作。它的主要用途是在一个或多个线程完成一系列初始化工作之前,阻止其他线程的进展。
主要方法:
await()
:阻塞当前线程,直到计数器的值变为零或线程被中断。await(long timeout, TimeUnit unit)
:阻塞当前线程,直到计数器的值变为零、线程被中断或等待时间到达指定的超时时间。countDown()
:递减计数器的值。getCount()
:返回当前的计数器值。
实例创建:
public class CountDownLatchExample {public static void main(String[] args) {CountDownLatch latch = new CountDownLatch(3); // 初始计数值为3}
}
使用原理:CountDownLatch
维护了一个内部计数器,当计数器减为零时,在 await()
方法上等待的线程将被释放。计数器的值由创建 CountDownLatch
实例时提供的构造函数参数确定。线程可以通过调用 countDown()
方法来递减计数器的值。
简单应用:
public class Main {public static void main(String[] args) throws InterruptedException {ExecutorService executorService= Executors.newFixedThreadPool(4);//构造方法里的数字,就是拆分出来任务的个数CountDownLatch countDownLatch=new CountDownLatch(20);for(int i=1;i<=20;i++){int id=i;executorService.submit(()->{System.out.println("第"+id+"个任务开始执行");try {Thread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("第"+id+"个任务执行完毕");//代表一个任务执行完成countDownLatch.countDown();});}//当 CountDownLatch 收到 20 个完成时,就代表所有任务都完成了//await -> all waitcountDownLatch.await();System.out.println("所以任务执行完毕!");}
}
应用场景:
- 等待一组操作完成:当一个线程需要等待一组操作全部完成之后再继续执行时,可以使用
CountDownLatch
。- 多线程初始化:在多线程环境中,如果有多个线程负责初始化不同的部分,主初始化线程可以使用
CountDownLatch
来等待所有初始化线程完成它们的工作。- 定时任务:如果需要等待一段时间后执行某个任务,可以使用带有超时时间的
await
方法。
注意:
CountDownLatch
的计数器是不可重置的,一旦计数器归零就不能再增加。如果你需要重复使用类似的功能,可以考虑使用CyclicBarrier
。- 当使用
await
方法时,如果当前线程被中断,将会抛出InterruptedException
并且中断状态会被设置。
四、并发集合
像我们之前介绍的一些集合类,比如ArrayList、Queue、HashMap等,⼤部分都不是线程安全的。像Vector, Stack, HashTable, 是线程安全的,但并不建议直接去使用,因为加锁是需要一定的开销的,盲目加锁反而会影响程序效率。
1、CopyOnWrite 集合类
此时别的线程执行读操作,仍然可以读取旧版本的内容,而写操作都是在新版本的内容上修改,避免了之前会读到修改了一半的数据的情况发生,也就避免了线程安全问题的产生。
基本使用:
public class CopyOnWriteArraySetExample {public static void main(String[] args) {CopyOnWriteArraySet<Integer> set = new CopyOnWriteArraySet<>();// 添加元素set.add(1);set.add(2);set.add(2); // 重复元素不会被添加// 输出元素for (Integer number : set) {System.out.println(number);}// 删除元素set.remove(2);// 检查是否包含某个元素boolean containsOne = set.contains(1);System.out.println("Contains One: " + containsOne);}
}
- 线程安全:所有的写入操作都是通过复制整个数组来实现的。
- 读取性能:读取操作不需要加锁,性能较好。
缺点:
- 内存消耗:每次写入操作都需要复制整个数组,这会导致额外的内存消耗。
- 写入性能:写入操作的性能较低,特别是在集合较大时,复制整个数组的成本较高。
- 不支持并发修改:虽然
CopyOnWrite
集合类支持在迭代过程中修改集合,但不能支持真正的并发修改,因为写入操作会阻塞读取操作。
适用场景:
- 读多写少:如果应用程序主要是读操作,偶尔有一些写操作,那么
CopyOnWrite
集合类非常适合,因为读操作不需要加锁。- 集合大小较小:如果集合的大小不是特别大,那么
CopyOnWrite
集合类的性能影响是可以接受的。
2、ConcurrentHashMap
虽然有线程安全的Hashtable或Collections.synchronizedMap(),但它们加锁的方式比较“粗暴”,运行效率往往不高。JUC中就提供了一个 ConcurrentHashMap 类,它在多线程环境下就有更高的性能,因为它采用了分段锁(segmentation)技术,而不是在整个映射上加锁。这意味着多个线程可以同时进行读取操作,甚至在某些条件下可以同时进行写入操作。
基本使用:
创建一个 ConcurrentHashMap
实例非常简单,可以直接使用默认构造器或者提供初始容量和负载因子。
public class ConcurrentHashMapExample {public static void main(String[] args) {// 创建一个默认容量和负载因子的ConcurrentHashMapConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();// 添加键值对map.put("one", 1);map.put("two", 2);// 获取值Integer value = map.get("one");System.out.println("Value of 'one': " + value);// 删除键值对map.remove("two");}
}
优势
(1)优化了锁的粒度
像Hashtable 的加锁,就是直接在 put、get 等方法上套上 synchronized,不管当前操作是否会引起线程安全问题都会一味加锁,任何一个针对这个哈希表的操作都会引起锁竞争,效率就比较低下了。
而ConcurrentHashMap则是给每个Hash表中的“链表”都进行加锁,形成一个“锁桶”,这样只有在同时发生两次 “哈希冲突” ,且冲突的是同一个链表上的元素时才会加锁,而我们知道,一个正常的哈希表发生 “哈希冲突” 的概率是非常低的,因此会触发上述锁竞争的概率是非常非常低的,并且一个“链表”加锁之后,也完全不会影响对其它“链表”结点的操作,就可以大幅减少加锁次数,提高效率。
(2)引入了CAS原子操作
ConcurrentHashMap
提供了大量原子操作方法,如 computeIfAbsent
, computeIfPresent
, merge
等,这些方法可以在并发环境下安全地执行计算并更新值。
// 使用computeIfAbsent方法,如果不存在键对应的值,则计算并放入
Integer newValue = map.computeIfAbsent("three", key -> 3);
System.out.println("Value of 'three': " + newValue);
这样对于一些可能会引发线程安全问题的修改操作直接借助CAS完成,并不需要加锁,这使得只有在一些需要全局同步的操作中,例如扩容和转换链表为红黑树,才需要使用 synchronized
加锁,也就进一步减少了加锁的操作。关于CAS操作的详细叙述,可以看看博主的 CAS的原理及应用 一文。
(3)volatile 字段
为了确保线程间的可见性, ConcurrentHashMap
使用了 volatile
字段来修饰关键的共享变量。volatile
变量的值会被强制立即写入主内存,当线程读取 volatile
变量时,总是能读取到最新的值。
(4)优化了哈希的扩容机制
传统的 Hash表扩容,需要创建一个新的 Hash表,将原来表中所有元素搬运过去,这些操作很可能会在一次put中就完成了,就会导致这次的 put 操作开销非常大,在用户的视角下就会出现“卡”了一下的感觉,体验是非常不好的。
ConcurrentHashMap则实现了“化整为零”,不会在一次操作中进行所有数据的搬运,而是一次只搬一部分,之后每次操作都会触发一部分 key 的搬运,最终吧所有的key都搬运完
当新旧表同时存在时:
- 插入操作,就会直接插入到新的空间中
- 查询/修改/删除,会同时访问新的空间与旧的空间
至于线程池、原子类等JUC常见类在博主之前的文章 CAS的原理及应用、深入剖析线程池的应用中有过详细的研究,这里就不做赘述了。
总结
通过使用java.util.concurrent
包中的工具,我们可以灵活地处理异步任务,并获得它们的执行结果。无论是使用ExecutorService
来管理线程池中的任务,还是使用FutureTask
来包装Callable
任务,抑或是使用ReentrantLock
来在锁竞争激烈时灵活控制加锁操作等,这些工具都极大地简化了并发编程的工作量,并提高了程序在多线程环境下的性能和可靠性。
那么本篇文章就到此为止了,如果觉得这篇文章对你有帮助的话,可以点一下关注和点赞来支持作者哦。作者还是一个萌新,如果有什么讲的不对的地方欢迎在评论区指出,希望能够和你们一起进步✊