一、ReetrantLock的使用示例
static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
new Thread(ClassLayOutTest::reentrantLockDemo, "threadA").start();
Thread.sleep(1000);
new Thread(ClassLayOutTest::reentrantLockDemo, "threadB").start();
Thread.sleep(1000);
new Thread(ClassLayOutTest::reentrantLockDemo, "threadC").start();
}public static void reentrantLockDemo() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + ": 获取到锁! -->" + System.currentTimeMillis() / 1000);
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
二、流程分析
上面的有三个线程都会去执行reentrantLockDemo()方法,在抢占到锁后会睡眠5S。然后在线程A启动后先睡眠1秒再启动B线程,线程B启动后睡眠1秒再启动线程C。这样就能保证了线程ABC按照固定顺序去抢占,那么程序执行的顺序是:
1、线程A抢占锁资源
线程A直接过来后,由于没有锁还没有被线程抢占。可以直接抢占到锁,线程A直接通过CAS将state的值由0修改为1,head及tail为null
2、线程B抢占失败进入AQS队列
线程B执行加锁操作的时候,由于锁已经被线程A占有了,所以没抢占到,执行进入AQS队列操作。具体的操作就是把自己封装成一个Node对象
放入到双向队列的时候,前面需要有一个伪头节点(节点中thread为null,waitStatus = -1),B放入进入后对应的node节点属性(thread = 线程B,waitStatus = 0)
3、线程B抢占失败,继续进入到AQS队列
线程C过来后,由于线程A已经抢占到锁资源,并且其业务代码需要执行5s中,那么此时线程c也无法抢到锁,需要进行进入AQS队列操作。在线程C进入AQS队列后,AQS目前现在应该有3个节点,伪节点 + Node(B) + Node(C)三个节点,状态如下所示:
最终我们看到示意图如下:
三、代码分析
1、lock方法分析
1.1、执行lock方法后,公平锁和非公平锁的执行方法不一样
// 非公平锁的实现 final void lock() {// 上来先尝试使用CAS的方法将0设置为1if (compareAndSetState(0, 1))//获取锁资源成功后,将当前线设置给属性字段exclusiveOwnerThreadsetExclusiveOwnerThread(Thread.currentThread());else// 尝试获取到1个资源acquire(1); }// 公平锁
final void lock() {acquire(1); }
1.2、acquire方法分析,这里的公平锁和非公平锁的逻辑是相同的
public final void acquire(int arg) {
// tryAcquire:再次查看,当前线程是否可以尝试获取锁资源
if (!tryAcquire(arg) &&
// 没有拿到锁资源
// addWaiter(Node.EXCLUSIVE):将当前线程封装为Node节点,插入到AQS的双向链表的结尾
// acquireQueued:查看我是否是第一个排队的节点,如果是可以再次尝试获取锁资源,如果长时间拿不到,挂起线程
// 如果不是第一个排队的额节点,就尝试挂起线程即可
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 中断线程的操作
selfInterrupt();
}
1.3 、tryAcquire方法,,分为公平锁和非公平锁
// 非公平锁竞争锁资源逻辑 final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();// 获取当前state属性值 int c = getState();// state等于0的时候,说明之前持有锁的线程已经释放了锁资源if (c == 0) {// 基于CAS尝试将state由0设置为1if (compareAndSetState(0, acquires)) {// 设置互斥锁的拥有者为当前线程setExclusiveOwnerThread(current);return true;}}// 如果当前线程跟之前锁的拥有者是同一个,那么此处就是锁的重入else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;// 重入成功后,需要检查是否溢出if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");// 设置statu加1的值setState(nextc);return true;}return false; }-------------------------------------------------------------------------------------------------------------------------
// 公平锁实现逻辑
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {// 队列为空或者当前线程排在队列的第一位置if (!hasQueuedPredecessors() &&// 尝试竞争一次锁资源compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}// 锁重入逻辑else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false; }
1.4、addWaiter方法分析
在没有拿到锁资源的时候,把线程放入到AQS队列中去
// 没有拿到锁资源,过来排队, mode:代表互斥锁private Node addWaiter(Node mode) {// 将当前线程封装为Node,Node node = new Node(Thread.currentThread(), mode);// 拿到尾结点Node pred = tail;// 如果尾结点不为nullif (pred != null) {// 当前节点的prev指向尾结点node.prev = pred;// 以CAS的方式,将当前线程设置为tail节点if (compareAndSetTail(pred, node)) {// 将之前的尾结点的next指向当前节点pred.next = node;return node;}}// 如果CAS失败,以死循环的方式,保证当前线程的Node一定可以放到AQS队列的末尾enq(node);return node;}-----------------------------------------------------------------------------------------------------------------
private Node enq(final Node node) {
for (;;) {
// 拿到尾结点
Node t = tail;
// 如果尾结点为空,AQS中一个节点都没有,构建一个伪节点,作为head和tail
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 比较熟悉了,以CAS的方式,在AQS中有节点后,插入到AQS队列的末尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
1.5、acquireQueued方法
判断当前线程是否还能再次尝试获取锁资源,如果不能再次获取锁资源,或者又没获取到,尝试将当前线程挂起
// 当前没有拿到锁资源后,并且到AQS排队了之后触发的方法。 中断操作这里不用考虑
final boolean acquireQueued(final Node node, int arg) {
// 不考虑中断
// failed:获取锁资源是否失败(这里简单掌握落地,真正触发的,还是tryLock和lockInterruptibly)
boolean failed = true;
try {
boolean interrupted = false;
// 死循环…………
for (;;) {
// 拿到当前节点的前继节点
final Node p = node.predecessor();
// 前继节点是否是head,如果是head,再次执行tryAcquire尝试获取锁资源。
if (p == head && tryAcquire(arg)) {
// 获取锁资源成功
// 设置头结点为当前获取锁资源成功Node,并且取消thread信息
setHead(node);
// help GC
p.next = null;
// 获取锁失败标识为false
failed = false;
return interrupted;
}
// 没拿到锁资源……
// shouldParkAfterFailedAcquire:基于上一个节点转改来判断当前节点是否能够挂起线程,如果可以返回true,
// 如果不能,就返回false,继续下次循环
if (shouldParkAfterFailedAcquire(p, node) &&
// 这里基于Unsafe类的park方法,将当前线程挂起
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
// 在lock方法中,基本不会执行。
cancelAcquire(node);
}
}
// 获取锁资源成功后,先执行setHead
private void setHead(Node node) {
// 当前节点作为头结点 伪
head = node;
// 头结点不需要线程信息
node.thread = null;
node.prev = null;
}// 当前Node没有拿到锁资源,或者没有资格竞争锁资源,看一下能否挂起当前线程
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// -1,SIGNAL状态:代表当前节点的后继节点,可以挂起线程,后续我会唤醒我的后继节点
// 1,CANCELLED状态:代表当前节点以及取消了
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 上一个节点为-1之后,当前节点才可以安心的挂起线程
return true;
if (ws > 0) {
// 如果当前节点的上一个节点是取消状态,我需要往前找到一个状态不为1的Node,作为他的next节点
// 找到状态不为1的节点后,设置一下next和prev
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 上一个节点的状态不是1或者-1,那就代表节点状态正常,将上一个节点的状态改为-1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
2、tryLock(time,unit);方法分析
2.1、跟lock方法相似
// tryLock(time,unit)执行的方法
public final boolean tryAcquireNanos(int arg, long nanosTimeout)throws InterruptedException {
// 线程的中断标记位,是不是从false,别改为了true,如果是,直接抛异常
if (Thread.interrupted())
throw new InterruptedException();
// tryAcquire分为公平和非公平锁两种执行方式,如果拿锁成功, 直接告辞,
return tryAcquire(arg) ||
// 如果拿锁失败,在这要等待指定时间
doAcquireNanos(arg, nanosTimeout);
}private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 如果等待时间是0秒,直接告辞,拿锁失败
if (nanosTimeout <= 0L)
return false;
// 设置结束时间。
final long deadline = System.nanoTime() + nanosTimeout;
// 先扔到AQS队列
final Node node = addWaiter(Node.EXCLUSIVE);
// 拿锁失败,默认true
boolean failed = true;
try {
for (;;) {
// 如果在AQS中,当前node是head的next,直接抢锁
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 结算剩余的可用时间
nanosTimeout = deadline - System.nanoTime();
// 判断是否是否用尽的位置
if (nanosTimeout <= 0L)
return false;
// shouldParkAfterFailedAcquire:根据上一个节点来确定现在是否可以挂起线程
if (shouldParkAfterFailedAcquire(p, node) &&
// 避免剩余时间太少,如果剩余时间少就不用挂起线程
nanosTimeout > spinForTimeoutThreshold)
// 如果剩余时间足够,将线程挂起剩余时间
LockSupport.parkNanos(this, nanosTimeout);
// 如果线程醒了,查看是中断唤醒的,还是时间到了唤醒的。
if (Thread.interrupted())
// 是中断唤醒的!
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
2.2、取消节点分析
// 取消在AQS中排队的Node
private void cancelAcquire(Node node) {
// 如果当前节点为null,直接忽略。
if (node == null)
return;
//1. 线程设置为null
node.thread = null;//2. 往前跳过被取消的节点,找到一个有效节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;//3. 拿到了上一个节点之前的next
Node predNext = pred.next;//4. 当前节点状态设置为1,代表节点取消
node.waitStatus = Node.CANCELLED;// 脱离AQS队列的操作
// 当前Node是尾结点,将tail从当前节点替换为上一个节点
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// 到这,上面的操作CAS操作失败
int ws = pred.waitStatus;
// 不是head的后继节点
if (pred != head &&
// 拿到上一个节点的状态,只要上一个节点的状态不是取消状态,就改为-1
(ws == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL)))
&& pred.thread != null) {
// 上面的判断都是为了避免后面节点无法被唤醒。
// 前继节点是有效节点,可以唤醒后面的节点
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 当前节点是head的后继节点
unparkSuccessor(node);
}node.next = node; // help GC
}
}
3、 lockInterruptibly方法 分析
// 这个是lockInterruptibly和tryLock(time,unit)唯一的区别
// lockInterruptibly,拿不到锁资源,就死等,等到锁资源释放后,被唤醒,或者是被中断唤醒
private void doAcquireInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
// 中断唤醒抛异常!
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
// 这个方法可以确认,当前挂起的线程,是被中断唤醒的,还是被正常唤醒的。
// 中断唤醒,返回true,如果是正常唤醒,返回false
return Thread.interrupted();
}