目录
1.概述
1.1.什么是AQS
1.2.AQS和BlockQueue的区别
1.3.AQS的结构
2.源码分析
2.1.CLH队列
2.2.模板方法的实现
2.2.1.独占模式
1.获取资源
2.释放资源
2.2.2.共享模式
1.概述
1.1.什么是AQS
AQS非常非常重要,可以说是JAVA并发包(java.util.concurrent)的核心。
以下是一个常见的多线程场景:
当多个线程对同一个资源进行争抢时,没有抢到的线程怎么办?直接拒绝然后丢弃?最可行最合适的方式当然是让没有抢到资源的线程排队阻塞起来,这样即让出了CPU的资源,又方便当资源被释放、可争抢的时候唤醒线程来继续争抢。
我们发现一个信号量+一个线程安全的用来存放线程的队列,是并发编程体系的一个底座,这个底座有一个名字叫——同步器。
AQS(AbstractQueuedSynchronizer),一个JDK中提供的同步器,提供一系列线程的控制和同步机制,是JDK中诸多需要进行线程控制和同步的地方,诸如可重入锁ReentrantLock,以及诸多同步工具Semaphore、CountDownLatch、CyclicBarrier等,其底层都是使用的AQS来实现线程控制和同步。
1.2.AQS和BlockQueue的区别
阻塞队列(Blocking Queue)是一种特殊的队列,它的特殊之处在于它具有阻塞特性。当队列为空时,从队列中获取元素的线程会被阻塞,直到队列中有新的元素被加入;同样地,当队列满时,试图向队列中添加元素的线程也会被阻塞,直到队列中有空余的空间。这种特性使得阻塞队列非常适用于生产者-消费者模式中。
阻塞队列:
-
元素的读写是线程安全的
-
里面存的任务
-
阻塞的是外部的生产者线程/消费者线程
AQS,同步器,是用来进行线程控制和同步的一个基础组件,用来让现场排队、阻塞、唤醒等操作。
AQS:
-
元素的读写是安全的
-
里面存的是线程
-
阻塞的是内部存的线程
1.3.AQS的结构
AQS提供了什么?
一套API和一套数据结构。
可以看到AQS是一个抽象类:
既然是抽象类而不是直接是个接口就说明,其提供的不只是一个标准,而是一套资源。其底层用链表来组织其管理的线程,包含:
-
链表的节点Node
-
链表的头尾指针
-
状态state,这是个volatile变量用于表示同步状态,在不同的实现类中,它的含义不同,有可能表示锁是否被持有、持有者持有锁的次数、信号量可用的许可数量等。
-
条件变量ConditionObject
AQS提供了一套标准的API,来完成两方便的操作:
-
状态控制
-
资源争抢
状态控制:
-
getState():返回同步状态
-
setState(int newState):设置同步状态
-
compareAndSetState(int expect, int update):使用CAS设置同步状态
-
isHeldExclusively():当前线程是否持有资源 独占资源(不响应线程中断)
资源争抢和释放一共两种,共享模式、独占模式:
-
tryAcquire(int arg):独占式获取资源,子类实现
-
acquire(int arg):独占式获取资源模
-
tryRelease(int arg):独占式释放资源,子类实现
-
release(int arg):独占式释放资源模板 共享资源(不响应线程中断)
-
tryAcquireShared(int arg):共享式获取资源,返回值大于等于0则表示获取成功,否则获取失败,子类实现
-
acquireShared(int arg):共享式获取资源模板
-
tryReleaseShared(int arg):共享式释放资源,子类实现
-
releaseShared(int arg):共享式释放资源模板
独占模式出现在对一把锁进行争抢、释放的场景中,如ReentrantLock中;共享模式出现在对多个资源的争抢、释放的场景中,如ReadWriteLock、CountDownLatch中。
2.源码分析
2.1.CLH队列
在AQS的源码开头,就描述了其是使用CLH队列来组织没争抢到资源的线程的:
该队列是用链表实现的,没被争抢到资源的线程会被封装成Node,被挂入链表中。node的入队和出队都用的CAS(自旋锁)来保证效率和安全的兼得。
这个Node类源码很简单,主要是要注意Node是有状态的,用状态来控制该节点该不该被唤醒,用waitStatus来表示状态:
Node 类中的静态常量定义了以下状态:
-
SIGNAL (-1):表示当前节点的后续节点需要被唤醒。这是默认状态,当一个节点被添加到队列中时,它的 waitStatus 会被设置为 -1。
-
PROPAGATE (-3):类似于 SIGNAL,但在某些情况下,如释放共享模式下的锁时,可能会设置为 -3,以帮助传播唤醒信
-
CONDITION (-2):表示节点位于条件队列中,而不是同步队列中。这通常与 Condition 对象相关联。
-
0:表示没有特别的状态。当节点被初始化时,它的 waitStatus 默认为 0。
-
CANCELLED (1):表示节点已经被取消。这通常发生在线程被中断或者尝试获取锁失败时。一旦节点被标记为 CANCELLED,它就会从队列中移除。
waitStatus 的使用:
-
线程阻塞:当一个线程被插入到队列中时,它的 waitStatus 通常会被设置为 SIGNAL。这意味着当前节点的后续节点应该被唤醒,当当前节点释放时。
-
线程唤醒:当一个线程释放锁或其他同步状态时,它会检查自己的 waitStatus,并根据情况唤醒后继节点。例如,在独占模式下,当线程释放锁时,它会检查其后继节点的 waitStatus,如果是 SIGNAL,则会唤醒该节点对应的线程。
-
条件队列:当线程等待某个条件满足时,它会被移到条件队列中,并且其 waitStatus 会被设置为 CONDITION。当条件满足时,线程会被放回同步队列,并且 waitStatus 会被设置为 SIGNAL。
-
取消节点:如果线程被中断或超时,或者由于其他原因无法继续等待,那么该节点会被标记为 CANCELLED,并且从队列中移除。
整个CLH队列可以理解为:
2.2.模板方法的实现
2.2.1.独占模式
1.获取资源
AQS的实现类很多,此处我们以ReentrantLock的FairSync(公平锁)为例。
lock()其实是去,调用AQS的acquire():
AQS的acquire()会去调用实现类的tryAcquire()来判断当前线程是否能抢到线程,如果抢不到就入队CLH,并且阻塞:
至此我们就能体会到了,AQS实现了核心的阻塞+入队以及唤醒+出队的一些列对CLH中线程的操作,但具体的争抢策略作为模板开放出去了,也就是上文我们说的一系列模板方法。
我们来看看FairSync的tryAcquire()实现:
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();//获取锁的状态if (c == 0) {//如果锁没有被持有//就去判断CLH队列中还有没有线程,没有的话就去CAS获取锁if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}//如果当前线程是锁的持有者就去增加锁的持有次数else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}
2.释放资源
释放资源应该做些什么?这里我们想一下就能想出来:
-
首先释放锁
-
然后唤醒后面的线程。
释放锁:
可以看到ReentrantLock的unlock调用的AQS的release()去释放锁,AQS的release里会先去tryRelease()尝试释放锁,tryRelease()是交给子类重写的:
所以调用的就是reentrantLock的释放策略:
ReentrantLock的尝试释放的过程很简单,就是线程安全的去减少锁的持有次数+清除锁的持有者的信息:
protected final boolean tryRelease(int releases) {int c = getState() - releases;//检查当前线程是否是锁的所有者,确保只有锁的持有者能释放锁if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;//清除锁的持有者setExclusiveOwnerThread(null);}//刷新状态setState(c);return free;}
唤醒后面的线程:
继续回到AQS的release,继续往下看,tryRelease()后确定可以释放锁后,unparkSuccessor()唤醒CLH队列中需要被唤醒的第一个线程。为什么说是第一个需要被唤醒的喃?前文已经说过了Node节点有状态,状态用来表示该节点是否需要被唤醒,有些状态是表示节点不需要被唤醒的。
public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);//唤醒后面的需要被唤醒的线程return true;}return false;}
unparkSuccessor()很简单,修改当前节点状态,唤醒后面需要唤醒的状态:
private void unparkSuccessor(Node node) {//将当前的节点从待唤醒状态变为已经唤醒int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);
//找到后续第一个需要被唤醒的节点,然后唤醒它Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}
2.2.2.共享模式
之所以有共享模式,是因为有时候资源不一定是唯一的,比如除了锁之外,可能要求能有10条线程同时工作,又或者读写之间不互斥等需求。在这种允许多条线程同时工作的情景中就要用到共享模式,标志位state不再表示一个意思(是否被持有),而是用来表示多个意思,有可能是线程的数量,也有可能是高低位分别表示是否允许读或者写。
共享模式下,资源的获取是通过acquireShared和tryAcquireShared来实现的,释放是通过releaseShared和tryReleaseShared来实现的。具体代码实现的话其实和独占模式大差不差,只是在对标志位state的操作上略有不同,以及可能会有一些位操作。以下是一些常见的使用共享模式的场景:
-
读写锁 (ReadWriteLock): 读写锁是一种常用的同步工具,它允许多个线程同时读取共享资源,但在写入时需要独占资源。 读锁通常使用共享模式实现,因为多个读线程可以同时访问资源。 写锁则使用独占模式,因为写入操作通常需要独占资源以避免数据不一致。
-
信号量 (Semaphore): 信号量用于限制可以同时访问共享资源的线程数量。 它可以通过减少或增加一个计数器来管理资源的可用性,这个计数器通常就是 AQS 的 state 字段。 当线程尝试获取信号量时,它会减少 state 的值;当线程释放信号量时,它会增加 state 的值。 因为多个线程可以同时获取信号量,所以信号量通常使用共享模式。
-
循环屏障 (CyclicBarrier): 循环屏障允许一组线程相互等待,直到所有线程都到达了一个共同的屏障点。 当最后一个线程到达屏障点时,所有线程都会被释放继续执行。 循环屏障通常使用共享模式来管理线程的等待和释放。
-
CountDownLatch: CountDownLatch 是一种同步辅助类,它允许一个或多个线程等待其他线程完成操作。 它维护了一个计数器,每当一个操作完成时,计数器就会减少。 当计数器达到零时,所有等待的线程都会被释放。 CountDownLatch 通常使用共享模式来管理计数器的减少。
-
Exchanger: Exchanger 允许两个线程交换对象。 两个线程各自提供一个对象并等待另一个线程到达交换点。 一旦两个线程都到达交换点,它们就可以交换对象并继续执行。 Exchanger 通常使用共享模式来管理线程的交换过程。 示例代码
由于这是一个系列文章,后面几篇文章我们就将聊到CountDownLatch、Semaphore、CyclicBarrier之类的同步工具类、ReadWriteLock之类的读写锁,到时候分析源码的时候,再来看看各自的具体实现。