Lock
Lock位于java.util.concurrent.locks包下,是一种线程同步机制,就像synchronized块一样。但是,Lock比synchronized块更灵活、更复杂。
1. Lock继承关系
2. 官方文档解读
3. Lock接口方法解析
public interface Lock {// 获取锁。如果锁不可用,则当前线程将出于线程调度目的而禁用,并处于休眠状态,直到获得锁为止。void lock();// 如果当前线程未被中断,则获取锁。如果锁可用,则获取锁并立即返回。// 如果锁不可用,出于线程调度目的,将禁用当前线程,该线程将一直处于休眠状态。// 下面两种情形会让当前线程停止休眠状态:// 1.锁由当前线程获取。// 2.其他一些线程中断当前线程,并且支持对锁获取的中断。// 当前线程出现下面两种情况时,将抛出InterruptedException,并清除当前线程的中断状态。// 1.当前线程在进入此方法时,已经设置为中断状态。// 2. 当前线程在获取锁时被中断,并且支持对锁获取中断。void lockInterruptibly() throws InterruptedException;// 尝试获取锁,如果锁处于空闲状态,则获取锁,并立即返回true。如果锁不可用,则立即返回false。// 典型用法:// 确保解锁前一定获取到锁// if (lock.tryLock()) {// try {// // manipulate protected state// } finally {// lock.unlock();// }// } else {// // perform alternative actions// }boolean tryLock();// 该方法为tryLock()的重载方法,两个参数分别表示为:// time:等待锁的最长时间// unit:时间单位// 如果在给定的等待时间内是空闲的并且当前线程没有被中断,则获取锁。如果锁可用,则此方法立即获取锁并返回true,如果锁不可用,出于线程调度目的,将禁用当前线程,该线程将一直处于休眠状态。// 如果指定的等待时间超时,则返回false值。如果时间小于或等于0,则该方法永远不会等待。// 下面三种情形会让当前线程停止休眠状态:// 1.锁由当前线程获取。// 2.其他一些线程中断当前线程,并且支持对锁获取的中断。// 3.到了指定的等待时间。// 当前线程出现下面两种情况时,将抛出InterruptedException,并清除当前线程的中断状态。// 1.当前线程在进入此方法时,已经设置为中断状态。// 2.当前线程在获取锁时被中断,并且支持对锁获取中断。boolean tryLock(long time, TimeUnit unit) throws InterruptedException;// 释放锁,与lock()、tryLock()、tryLock(long , TimeUnit)、lockInterruptibly()相对应。void unlock();// 返回绑定到此锁实例的Condition实例。当前线程只有获得了锁,才能调用Condition实例的方法。Condition newCondition();
}
ReentrantLock
ReentrantLock位于java.util.concurrent(J.U.C)包下,是Lock接口的实现类,属于独占锁。可重入特性与synchronized相似,但拥有扩展的功能。
ReentrantLock表现为API层面的互斥锁,通过lock()和unlock()方法完成,是显式的,而synchronized表现为原生语法层面的互斥锁,是隐式的。
在JDK 1.6之后,虚拟机对于synchronized关键字进行整体优化后,在性能上synchronized与ReentrantLock已没有明显差距,因此在使用选择上,需要根据场景而定,大部分情况下我们依然建议是synchronized关键字,原因之一是使用方便语义清晰,二是性能上虚拟机已为我们自动优化。而ReentrantLock提供了多样化的同步特性,如超时获取锁、可以被中断获取锁(synchronized的同步是不能中断的)、等待唤醒机制的多个条件变量(Condition)等,因此当我们确实需要使用到这些功能是,可以选择ReentrantLock
ReentrantLock都是把具体实现委托给内部类(Sync、NonfairSync、FairSync)
1. 可重入性
可重入性:任意线程在获取到锁之后能够再次获取该锁而不会被锁阻塞,synchronized和Reentrant都是可重入的,隐式显式之分。
实现条件:
- 锁需要去识别获取锁的线程是否是当前占据锁的线程,如果是的话,就成功获取。
- 锁获取一次,内部锁计数器需要加一,释放一次减一,计数为零表示为成功释放锁。
ReentrantLock的重入计数是使用AbstractQueuedSynchronizer的state属性的,state大于0表示锁被占用,等于0表示空闲,小于0则是重入次数太多导致溢出了。
2. 公平锁模式和非公平锁模式
ReentrantLock的构造函数接受可选的公平参数,参数为true则表示获取一个公平锁,不带参数或者false表示一个非公平锁。
构造方法:
// 无参构造方法// 默认是非公平锁模式public ReentrantLock() {sync = new NonfairSync();}// 有参构造方法public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}
公平性锁和非公平性锁父类:Sync
sync继承于AQS(AQS参考解析AQS实现原理_我不是欧拉_的博客-CSDN博客,CAS参考解析CAS原理_我不是欧拉_的博客-CSDN博客)
static abstract class Sync extends AbstractQueuedSynchronizer {
abstract void lock();// 非公平获取, 非公平锁都需要这个方法
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) { // state == 0表示无锁// 通过CAS的方式竞争锁,体现非公平性,任何线程来了不用排队都可以抢锁if (compareAndSetState(0, acquires)) { // 加锁成功,state状态改为1// 当前哪一个线程获取到锁,将线程信息记录到AQS里面// 设置当前持有锁的线程setExclusiveOwnerThread(current); return true; }} else if (current == getExclusiveOwnerThread()) {// 当前线程正是锁持有者,此段逻辑体现可重入性// 同一个线程可以在不获取锁的情况再次进入// nextc表示被加锁次数,即重入次数int nextc = c + acquires;if (nextc < 0) // 被锁次数上溢(很少出现)throw new Error("Maximum lock count exceeded");// 设置加锁次数,lock几次就要unlock几次,否则无法释放锁setState(nextc);return true;}return false;}// 释放protected final boolean tryRelease(int releases) {int c = getState() - releases;// 只有锁的持有者才能释放锁if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) { // 锁被释放free = true;setExclusiveOwnerThread(null);}setState(c);return free;}// 当前线程是否持有锁protected final boolean isHeldExclusively() {return getExclusiveOwnerThread() == Thread.currentThread();}final ConditionObject newCondition() {return new ConditionObject();}// 锁的持有者final Thread getOwner() {return getState() == 0 ? null : getExclusiveOwnerThread();}// 加锁次数final int getHoldCount() {return isHeldExclusively() ? getState() : 0;}// 是否上锁,根据state字段可以判断final boolean isLocked() {return getState() != 0;}
}
公平锁模式:FairSync
static final class FairSync extends Sync {private static final long serialVersionUID = -3000897897090466540L;final void lock() {acquire(1);}// 方法逻辑与Snyc中的nonfairTryAcquire方法一致,只是加了线程排队的逻辑protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {// !hasQueuedPredecessors()// 判断队列中是否有其他线程,没有才进行锁的获取,否则继续排队// 体现公平性if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}}
非公平锁模式:NonfairSync
static final class NonfairSync extends Sync {private static final long serialVersionUID = 7316153563782823691L;/*** Performs lock. Try immediate barge, backing up to normal* acquire on failure.*/final void lock() {// 不管锁是否已经被占用,采用CAS的方式进行锁竞争// 抢锁成功,则把当前线程设置为活跃的线程// 抢锁失败则走acquire(1)逻辑// 体现非公平性,线程不必排队if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());else// 抢锁失败,调用tryAcquire方法,最终调用nonfairTryAcquire方法acquire(1);}protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}}
3. condition
在并发编程中,每个Java对象都存在一组监视器方法,如wait()、notify()以及notifyAll()方法,通过这些方法,我们可以实现线程间通信与协作(也称为等待唤醒机制),如生产者-消费者模式、等待-通知模式,而且这些方法必须配合着synchronized关键字使用。
与synchronized的等待唤醒机制相比Condition具有更多的灵活性以及精确性,这是因为notify()在唤醒线程时是随机(同一个锁),而Condition则可通过多个Condition实例对象建立更加精细的线程控制,也就带来了更多灵活性了,我们可以简单理解为以下两点:
- 通过Condition能够精细的控制多线程的休眠与唤醒。
- 对于一个锁,我们可以为多个线程间建立不同的Condition。
Condition相关方法
// 返回绑定到此锁实例的Condition实例。当前线程只有获得了锁,才能调用Condition实例的方法。Condition newCondition();
public interface Condition {// 线程程进入等待状态直到被通知(signal)或中断// 当其他线程调用singal()或singalAll()方法时,该线程将被唤醒// 当其他线程调用interrupt()方法中断当前线程// await()相当于synchronized等待唤醒机制中的wait()方法void await() throws InterruptedException;// 与wait()方法相同,唯一的不同点是,该方法不会再等待的过程中响应中断void awaitUninterruptibly();// 当前线程进入等待状态,直到被唤醒或被中断或超时// 其中nanosTimeout指的等待超时时间,单位纳秒long awaitNanos(long nanosTimeout) throws InterruptedException;// 同awaitNanos,但可以指明时间单位boolean await(long time, TimeUnit unit) throws InterruptedException;// 线程进入等待状态,直到被唤醒、中断或到达某个时// 间期限(deadline),如果没到指定时间就被唤醒,返回true,其他情况返回falseboolean awaitUntil(Date deadline) throws InterruptedException;// 唤醒一个等待在Condition上的线程,该线程从等待方法返回前必须// 获取与Condition相关联的锁,功能与notify()相同void signal();// 唤醒所有等待在Condition上的线程,该线程从等待方法返回前必须// 获取与Condition相关联的锁,功能与notifyAll()相同void signalAll();
}
使用await之前必须加锁,使用signal、signalAll之后记得释放锁。
Condition实现原理
Condition的具体实现类是AQS的内部类ConditionObject,前面我们分析过AQS中存在两种队列,一种是同步队列,一种是等待队列,而等待队列就相对于Condition而言的。注意在使用Condition前必须获得锁,同时在Condition的等待队列上的结点与前面同步队列的结点是同一个类即Node,其结点的waitStatus的值为CONDITION。在实现类ConditionObject中有两个结点分别是firstWaiter和lastWaiter,firstWaiter代表等待队列第一个等待结点,lastWaiter代表等待队列最后一个等待结点,代码如下:
public class ConditionObject implements Condition, java.io.Serializable {private static final long serialVersionUID = 1173984872572414699L;/** First node of condition queue. */// 等待队列第一个等待结点private transient Node firstWaiter;/** Last node of condition queue. */// 等待队列最后一个等待结点private transient Node lastWaiter;/*** Creates a new {@code ConditionObject} instance.*/public ConditionObject() { }
}
每个Condition都对应着一个等待队列,也就是说如果一个锁上创建了多个Condition对象,那么也就存在多个等待队列。等待队列是一个FIFO的队列,在队列中每一个节点都包含了一个线程的引用,而该线程就是Condition对象上等待的线程。当一个线程调用了await()相关的方法,那么该线程将会释放锁,并构建一个Node节点封装当前线程的相关信息加入到等待队列中进行等待,直到被唤醒、中断、超时才从队列中移出。Condition中的等待队列模型如下:
Node节点的数据结构,在等待队列中使用的变量与同步队列是不同的,Condtion中等待队列的结点只有直接指向的后继结点并没有指明前驱结点,而且使用的变量是nextWaiter而不是next。
firstWaiter指向等待队列的头结点,lastWaiter指向等待队列的尾结点,等待队列中结点的状态只有两种即CANCELLED和CONDITION,前者表示线程已结束需要从等待队列中移除,后者表示条件结点等待被唤醒。再次强调每个Codition对象对于一个等待队列,也就是说AQS中只能存在一个同步队列,但可拥有多个等待队列。
实现代码:
await方法:
public final void await() throws InterruptedException {// 判断线程是否被中断if (Thread.interrupted())throw new InterruptedException();// 创建新结点加入等待队列并返回Node node = addConditionWaiter();// 释放当前线程锁即释放同步状态int savedState = fullyRelease(node);int interruptMode = 0;// 判断结点是否同步队列(SyncQueue)中,即是否被唤醒while (!isOnSyncQueue(node)) {// 挂起线程LockSupport.park(this);// 判断是否被中断唤醒,如果是退出循环。if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}// 被唤醒后执行自旋操作争取获得锁,同时判断线程是否被中断if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;// clean up if cancelledif (node.nextWaiter != null) // 清理等待队列中不为CONDITION状态的结点unlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);}private Node addConditionWaiter() {Node t = lastWaiter;// 判断是否为结束状态的结点并移除if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();t = lastWaiter;}// 创建新结点状态为CONDITIONNode node = new Node(Thread.currentThread(), Node.CONDITION);// 加入等待队列if (t == null)firstWaiter = node;elset.nextWaiter = node;lastWaiter = node;return node;
}
signal()方法:
public final void signal() {// 判断是否持有独占锁,如果不是抛出异常// 从这点也可以看出只有独占模式先采用等待队列,而共享模式下是没有等待队列的,也就没法使用Conditionif (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;// 唤醒等待队列第一个结点的线程if (first != null)doSignal(first);}private void doSignal(Node first) {do {// 移除条件等待队列中的第一个结点,// 如果后继结点为null,那么说没有其他结点将尾结点也设置为nullif ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;first.nextWaiter = null;// 如果被通知节点没有进入到同步队列并且条件等待队列还有不为空的节点,则继续循环通知后续结点} while (!transferForSignal(first) &&(first = firstWaiter) != null);}// transferForSignal方法
final boolean transferForSignal(Node node) {// 尝试设置唤醒结点的waitStatus为0,即初始化状态// 如果设置失败,说明当期结点node的waitStatus已不为// CONDITION状态,那么只能是结束状态了,因此返回false// 返回doSignal()方法中继续唤醒其他结点的线程,注意这里并// 不涉及并发问题,所以CAS操作失败只可能是预期值不为CONDITION,// 而不是多线程设置导致预期值变化,毕竟操作该方法的线程是持有锁的。if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;// 加入同步队列并返回前驱结点pNode p = enq(node);int ws = p.waitStatus;// 判断前驱结点是否为结束结点(CANCELLED=1)或者在设置// 前驱节点状态为Node.SIGNAL状态失败时,唤醒被通知节点代表的线程if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))// 唤醒node结点的线程LockSupport.unpark(node.thread);return true;}
流程:signal()被调用后,先判断当前线程是否持有独占锁,如果有,那么唤醒当前Condition对象中等待队列的第一个结点的线程,并从等待队列中移除该结点,移动到同步队列中,如果加入同步队列失败,那么继续循环唤醒等待队列中的其他结点的线程,如果成功加入同步队列,那么如果其前驱结点是否已结束或者设置前驱节点状态为Node.SIGNAL状态失败,则通过LockSupport.unpark()唤醒被通知节点代表的线程,到此signal()任务完成。
注意被唤醒后的线程,将从前面的await()方法中的while循环中退出,因为此时该线程的结点已在同步队列中,那么while (!isOnSyncQueue(node))将不在符合循环条件,进而调用AQS的acquireQueued()方法加入获取同步状态的竞争中,这就是等待唤醒机制的整个流程实现原理,流程如下图所示(注意无论是同步队列还是等待队列使用的Node数据结构都是同一个,不过是使用的内部变量不同罢了)
流程图:
Condition它更强大的地方在于:能够更加精细的控制多线程的休眠与唤醒。对于同一个锁,我们可以创建多个Condition,在不同的情况下使用不同的Condition。例如,假如多线程读/写同一个缓冲区:当向缓冲区中写入数据之后,唤醒"读线程";当从缓冲区读出数据之后,唤醒"写线程";并且当缓冲区满的时候,"写线程"需要等待;当缓冲区为空时,"读线程"需要等待。如果采用Object类中的wait(),notify(),notifyAll()实现该缓冲区,当向缓冲区写入数据之后需要唤醒"读线程"时,不可能通过notify()或notifyAll()明确的指定唤醒"读线程",而只能通过notifyAll唤醒所有线程(但是notifyAll无法区分唤醒的线程是读线程,还是写线程)。 但是,通过Condition,就能明确的指定唤醒读线程。
利用Condition实现等待-通知模式:
public class ABCThread implements Runnable {private ReentrantLock lock;private Condition sCondition;private Condition aCondition;public ABCThread(ReentrantLock lock, Condition sCondition, Condition aCondition) {this.lock = lock;this.sCondition = sCondition; // 唤醒下一个线程this.aCondition = aCondition; // 阻塞当前线程}@Overridepublic void run() {int i = 0;while (i < 10) {// 加锁lock.lock();try {// 接收到线程通知则继续执行,否则就阻塞aCondition.await();} catch (InterruptedException e) {e.printStackTrace();}System.out.print(Thread.currentThread().getName()+" ");// 唤醒其他线程线程sCondition.signal();i++;// 释放锁lock.unlock();}} }ReentrantLock reentrantLock = new ReentrantLock();Condition ab = reentrantLock.newCondition();Condition bc = reentrantLock.newCondition();Condition ca = reentrantLock.newCondition();new Thread(new ABCThread(reentrantLock,ab,ca),"A").start();new Thread(new ABCThread(reentrantLock,bc,ab),"B").start();new Thread(new ABCThread(reentrantLock,ca,bc),"C").start();reentrantLock.lock();ca.signal();reentrantLock.unlock();
参考:Lock锁和ReentrantLock锁_resumebb的博客-CSDN博客_lock reentrantlock
深入剖析基于并发AQS的(独占锁)重入锁(ReetrantLock)及其Condition实现原理_w_s_h_y的博客-CSDN博客