独占锁和共享锁的概念
独占锁也叫排他锁,是指该锁一次只能被一个线程所持有。如果线程T对数据A加上排他锁后,则其他线程不能再对A加任何类型的锁。获得排它锁的线程即能读数据又能修改数据。
共享锁是指该锁可被多个线程所持有。如果线程T对数据A加上共享锁后,则其他线程只能对A再加共享锁,不能加排它锁。获得共享锁的线程只能读数据,不能修改数据。
之前的分析了ReentrantLock,Semaphore, CountDownLatch这三个典型的aqs实现。其中ReentrantLock使用了独占模式,Semaphore和CountDownLatch是共享模式
Aqs独占锁
如下图所示:
只能有一个线程获取到锁,其它线程则每次加入到CLH尾部阻塞等待。
测试代码
static ReentrantLock reentrantLock = new ReentrantLock();public static void main(String[] args) throws InterruptedException {Runnable runnable = () -> {try {reentrantLock.lock();// 业务处理TimeUnit.SECONDS.sleep(2);System.out.println(Thread.currentThread().getName() + ":" + " finished");} catch (Exception e) {e.printStackTrace();} finally {reentrantLock.unlock();}};List<Thread> threads = new ArrayList<>(4);for(int i= 0;i<6;i++) {Thread thread = new Thread(runnable, String.valueOf((char)('A' + i)));threads.add(thread);}for(Thread thread: threads){thread.start();}}
先理解一下aqs的Node状态
static final class Node {/** Marker to indicate a node is waiting in shared mode */static final Node SHARED = new Node();/** Marker to indicate a node is waiting in exclusive mode */static final Node EXCLUSIVE = null;/** waitStatus value to indicate thread has cancelled. */static final int CANCELLED = 1;/** waitStatus value to indicate successor's thread needs unparking. */static final int SIGNAL = -1;/** waitStatus value to indicate thread is waiting on condition. */static final int CONDITION = -2;/*** waitStatus value to indicate the next acquireShared should* unconditionally propagate.*/static final int PROPAGATE = -3;
CANCELLED(1):表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。
SIGNAL(-1):表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。
CONDITION(-2):表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
PROPAGATE(-3):共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
0:新结点入队时的默认状态。
可以看到负值表示结点处于有效等待状态,而正值表示结点已被取消。
在java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire
中
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
}
如果tryAcquire返回false, 则进行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
操作
addWaiter(Node.EXCLUSIVE) 即addWaiter(null)
再来看释放锁的唤醒逻辑
当独占线程执行结束释放锁的成功过后,执行如下
显然当执行tryRelease成功后head不是null且其等待状态是唤醒状态-1; 所以能执行唤醒下一个节点的操作,即unparkSuccessor
方法
aqs unparkSuccessor(Node node)方法
private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)node.compareAndSetWaitStatus(ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node p = tail; p != node && p != null; p = p.prev)if (p.waitStatus <= 0)s = p;}if (s != null)LockSupport.unpark(s.thread);}
首先设置了当前node等待状态为初始化值,然后判断下一节点是否是有效等待状态(>0
的判断); 接着
从等待队列的尾部开始往前依次判断每个节点是否是有效等待状态,最后找到的节点即时要进行唤醒的(执行 LockSupport.unpark(s.thread);
)操作
s = null;
for (Node p = tail; p != node && p != null; p = p.prev)if (p.waitStatus <= 0)s = p;
为什么从尾到头的判断?
因为如队列是从尾部直接加入的,可能还没有设置上一节点的next操作, 有并发问题。可能是刚加入的新等待节点,这样从尾部判断就不会漏掉了
private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initialize//队列为空需要初始化,创建空的头节点if (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;//set尾部节点if (compareAndSetTail(t, node)) {//当前节点置为尾部t.next = node; //前驱节点的next指针指向当前节点return t;}}}
}
Aqs共享锁
共享模式一般是有个资源初始值,然后多个线程共享使用