Java源码学习之高并发编程基础——AQS源码剖析之线程间通信之条件等待队列

1.前言&目录

前言:

在Java中,使用synchronized关键字构建的锁,线程间通信可以使用某对象实例的wait/notify机制完成。AQS同样也提供了一套线程间通信的解决方案——条件等待队列。

在AQS源码分析的两篇文章AQS源码分析(上)、AQS源码分析(下)中,我们知道了,无论是独占锁模式还是共享锁模式,AQS提供的能力是将获取不到锁的线程将它们封装成链表节点的形式组织起来进行同步等待。

AQS也提供了如wait/notity等机制,它就是条件等待队列,队列元素是Condition——条件,条件的实例对象是ConditionObject。AQS的条件等待队列是一个单向队列,它的节点和AQS同步等待队列的节点是同一个类,都是AbstractQueuedSynchronizer.Node,目的就是为了条件等待节点最终能并入AQS同步等待队列。

以下就是AQS同步队列与条件等待队列模型的关系图:

c7e9a7dd50c143b3a672199099041073.png

条件等待节点最终会并入AQS同步队列中,意味着当前在等待条件的线程将重新进入AQS同步队列排队竞争锁。接下来,还是会以源码讲解的形式深入理解AQS中的条件等待。 

目录:

1.前言&目录

2.AQS条件使用场景

3.AQS条件源码剖析

3.1 ConditionObject条件实例

3.1 await()方法

3.2 signal()方法

3.3 AQS条件总结

4.简单案例

5.总结

2.AQS条件使用场景

AQS的条件是用作线程间通信的,一般来说多应用于生产者/消费者模型中,如果你使用的是如ReentrantLock等继承AQS实现的独占锁,若需要线程间通信就需要到条件。

在生产者/消费者模型中,生产者线程创建的商品不是无限制可以创建的, 它们是受到库存容量的限制的,消费者线程消费的商品也是有限的,最多能消费生产出来的商品。

这种模型,在阻塞队列比较常见,如LinkedBlockingQueue、ArrayBlockingQueue、LinkedBlockingDeque,它们的某些增加、获取元素方法使用到了AQS的条件等待,目标就是实现一定条件下的”阻塞“等待。

3.AQS条件源码剖析

AQS条件的源码解读,主要分三部分:熟悉条件等待队列模型、掌握关键的等待、释放方法。

3.1 ConditionObject条件实例

 工欲善其事,必先利其器,在学习掌握AQS条件的源码之前,我们必须先了解条件等待队列的模型——ConditionObject,它底层也是像AQS一样,由Node节点组成的队列,它是单向链表,AQS是双向链表。

ConditionObject有Node firstWaiter、Node lastWaiter两个成员变量,分别表示条件队列的头节点和尾节点。

public abstract class AbstractQueuedSynchronizerextends AbstractOwnableSynchronizer{public class ConditionObject implements Condition {// 队列头元素private transient Node firstWaiter;// 队列尾元素private transient Node lastWaiter;// 实现条件等待的方法public final void await() throws InterruptedException {...}// 唤醒条件等待节点的方法public final void signal() {...}}
}

并且它的两个await()、signal()等方法分别表示实现条件等待和唤醒条件等待,掌握这两个方法对理解条件等待节点是怎么并入AQS同步等待队列是非常重要的。

3.1 await()方法

await是AQS内部类ConditionObject的方法,它的作用用一句话概括就是,添加条件等待节点到队列,并将自己阻塞起来直到被唤醒,然后加入AQS同步等待队列。

往细一点说, 一共是下面的步骤:

  • 通过addConditionWaiter()方法,将当前线程封装为Node节点(下文称条件等待节点),该节点waitStatus是CONDITION(-2),接着将该条件等待节点添加到ConditionObject条件等待队列的尾部去。
  • 调用fullyRelease(node)方法释放当前线程持有的独占锁,为什么需要在这里释放呢?原因是条件需要和独占锁配合使用,这种情况通常是生产者/消费者模型。
  • 自旋检查当前条件等待节点是否在AQS的同步队列中,如果不是则说明此时的条件等待节点还没有并入、接入AQS同步队列中,会将该当前线程阻塞起来。唤醒的时机是同一个ConditionObject实例对象调用了signal()方法。
  • 如果被唤醒了,则会进入acquireQueued方法,这个方法在AQS阻塞队列上文中介绍过,该方法是将获取不到独占锁的线程进行自旋操作:二次获取锁和经过最多两次阻塞预判会阻塞当前线程——会一直阻塞到其他持有独占锁的线程主动释放锁。
public abstract class AbstractQueuedSynchronizerextends AbstractOwnableSynchronizer{public class ConditionObject implements Condition{public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 往条件等待队列添加绑定了当前线程的等待节点,并返回它Node node = addConditionWaiter(); int savedState = fullyRelease(node); // 释放当前线程持有的独占锁int interruptMode = 0;// 只要当前条件等待节点没有存在于AQS同步队列时,就将其阻塞起来while (!isOnSyncQueue(node)) {LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}// 当条件等待节点被唤醒后其实会被添加到同步队列尾部,因此在这里会进入acquireQueued// 方法重新自旋获取锁if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);}}
}

 经过上面的步骤划分,是否想起了熟悉的脉络?当某个线程拿到了独占锁后进入到同一时刻只能有一个线程访问的代码区域,如果处理背景不是生产者/消费者模型,则在最后的finally语句块释放锁即可。

但是对于生产者/消费者模型来说,其特点是,当库存商品已经达到上限了,就需要停止生产并通知消费者过来将库存消耗到上限值以下。当消费者将库存商品都一扫而空了,就需要停止消费并通知生产者重新投入生产中。

即在独占锁锁住的代码区域中处理的是生产者/消费者模型时,就需要通过一些手段或者机制将当前持有锁的线程进行阻塞(停止生产或消费)、唤醒阻塞线程(重写投入生产/消费)。

那么这里所讲的await方法就是应对于当库存商品已经达到上限了,需要暂时将生产者线程停止生产的情况,并通知消费者线程开始消费。

3.2 signal()方法

await方法用于阻塞当前线程并将其加入条件等待队列,signal方法就是负责将条件等待队列的节点接入到AQS的同步队列中。

 signal是这么将条件等待队列节点并入到AQS的同步队列中的:

  • 调用isHeldExclusively()方法,判断当前线程持有独占锁才能执行后面的并入操作。
  • 将条件等待队列的头节点获取并调用doSignal方法,在这里会先做转移和删除无用的节点,什么算是无用节点呢?答案在transferForSignal方法里,当要转移的条件等待节点的waitStatus不是CONDITION(-2)时,说明它可能已经被取消了(CANCELLED)。
  • transferForSignal方法首先做的就是将条件等待节点的waitStatus从CONDITION转换为0,目的是将它视为正常加入AQS同步等待队列的节点一样(等锁节点初始化时waitStatus都是0)。然后,通过AQS#enq(node)方法将该条件等待节点通过自旋的CAS操作添加到AQS同步队列尾部,注意返回的变量p其实是刚添加节点的前驱节点,这里做了一个额外保障:如果其前驱节点被取消了或者无法通过CAS更新其waitStatus为SIGNAL,则会直接将该条件等待节点绑定的线程唤醒。
        public final void signal() {if (!isHeldExclusively()) // 判断当前执行signal()方法的线程是否是持有独占锁的线程throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)// 找到队列头节点进行从条件节点并入AQS同步等待队列doSignal(first);}private void doSignal(Node first) {do {if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;first.nextWaiter = null;// 找到可用的一个条件等待节点将其并入AQS同步队列时退出doSignal方法} while (!transferForSignal(first) &&(first = firstWaiter) != null);}final boolean transferForSignal(Node node) {// 条件等待节点的预期状态是CONDITION,不是的话则直接退出将其并入AQS同步队列if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;// 自旋添加到AQS同步队列尾部 ,并返回其前驱节点Node p = enq(node);int ws = p.waitStatus;if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))// 如果刚添加节点的前驱节点被取消了或者无法通过CAS更新为SIGNAL,则// 直接唤醒新添加的节点去竞争锁   LockSupport.unpark(node.thread);return true;}

signal方法的使用场景就是当消费者线程消耗了库存商品,此时库存容量也空出来了,就通过该方法去通知生产者线程重新投入生产。

3.3 AQS条件总结

条件需要搭配独占锁使用,通常应用于生产者/消费者模型中,目的是保证生产总额和消耗总额的一个动态平衡。

根据前面两个小节,可以总结出AQS同步队列和条件等待队列的模型关系如下:

c7e9a7dd50c143b3a672199099041073.png

当持有独占锁的线程,假设它是生产者线程,它发现此时库存容量已经达到最大容量了,再生产的商品也堆积不下,此时它就需要停止生产,即将自己持有的独占锁释放并加入到条件等待队列中去,等待消费者去消费,把库存的商品数量减到能继续投入生产为止。

当消费者把库存都一扫而空了,就要通知生产者赶紧生产补货,同时消费者也将阻塞自己并让出持有的独占锁,其过程跟生产者如出一辙。

4.简单案例

在这里我们通过一个简单的生产者/消费者模型去复盘并验证上面章节所讲述的。

在下面简单的生产者/消费者模型中,定义了超市的最大库存量为3、有5个生产者线程、有2个消费者线程,按照上个章节的分析,当生产者线程生产的商品已经达到阈值时,就会进入阻塞状态,直到消费者线程消费了商品后才能通知生产者线程重新投入生产。

注意notifyConsumer方法和notifyProduce方法中使用的是signalAll()方法,其实signalAll是将条件等待队列中的所有节点都并入AQS同步队列中,signal每次只能操作一个条件等待节点。

那么让我们运行以下的main函数,看看结果是否符合预期?

public class MyAqsConditionTest {private static volatile int maxGoodCount = 3; // 任一时刻超时库存最大容量private static List<Integer> supermarket = new ArrayList<>();public static void main(String[] args) {ReentrantLock produceLock = new ReentrantLock(true);ReentrantLock consumeLock = new ReentrantLock(true);Condition produceCondition = produceLock.newCondition();Condition consumeCondition = consumeLock.newCondition();for (int i=0;i<5;i++){new Thread(new ProduceRunnable(i, produceLock ,produceCondition, consumeLock, consumeCondition)).start();}for (int i=0;i<5;i++){new Thread(new ConsumeRunnable(produceLock ,produceCondition, consumeLock, consumeCondition)).start();}}static class ProduceRunnable implements Runnable {private int goodsNum;private  ReentrantLock produceLock ;private  ReentrantLock consumeLock ;private  Condition produceCondition ;private  Condition consumeCondition ;// ...构造函数@Overridepublic void run() {produceLock.lock();try {int currentSize = supermarket.size();if (currentSize >= maxGoodCount){// 当前超时商品容量已经达到上限,无法继续生产,需要进行阻塞System.out.println("生产者:当前商品数量已经达到库存上限,需要等待消费者线程消费");produceCondition.await();}supermarket.add(goodsNum);System.out.println("生产者线程创建了商品"+goodsNum);}catch (Exception e){}finally {produceLock.unlock();}// 如果此时超时有货了就通知消费者线程if (supermarket.size()>0){notifyConsumer(consumeLock, consumeCondition);}}}static class ConsumeRunnable implements Runnable {private  ReentrantLock produceLock ;private  ReentrantLock consumeLock ;private  Condition produceCondition ;private  Condition consumeCondition ;// ...构造函数@Overridepublic void run() {consumeLock.lock();try {int currentSize = supermarket.size();if (currentSize == 0){// 此时超市商品卖光了,阻塞当前消费者线程直到生产者将商品生产出来consumeCondition.await();}int buyGoodsNum = supermarket.remove(supermarket.size()-1);System.out.println("消费者线程买到了商品"+buyGoodsNum);}catch (Exception e){}finally {consumeLock.unlock();}if (supermarket.size() < maxGoodCount){notifyProduce(produceLock, produceCondition);}}}// 由生产者线程调度,当此时库存商品超过上限了,通知消费者线程消费static void notifyConsumer(ReentrantLock consumeLock, Condition consumeCondition){try {consumeLock.lock();consumeCondition.signalAll();}finally {consumeLock.unlock();}}// 由消费者线程调度,当此时超时库存讲到阈值以下了,通知生产者线程投入生产static void notifyProduce(ReentrantLock produceLock, Condition produceCondition){try {produceLock.lock();produceCondition.signalAll();}finally {produceLock.unlock();}}
}

输出结果符合预期,具体如下所示,生产者线程的确最多只能同时生产3个商品,等消费者线程消费以后才能继续投入生产。即AQS的await()方法成功的将持有独占锁的线程阻塞了起来,signalAll方法也在合适的时机唤醒了在条件等待队列中的被阻塞的线程。

-------输出结果-------
生产者线程创建了商品0
生产者线程创建了商品2
生产者线程创建了商品3
生产者:当前商品数量已经达到库存上限,需要等待消费者线程消费
消费者线程买到了商品3
生产者线程创建了商品4
消费者线程买到了商品4
生产者线程创建了商品1
-------输出结果-------

5.总结

AQS的条件一般和独占锁配合使用,一般不能单独使用,目的是为了持有独占锁的线程能在合适的时机阻塞自己或者唤醒其他等待条件(唤醒)的线程。 

这种情况一般用于生产者/消费者模型的并发编程模型,用于解决多线程环境下的资源共享问题。生产者和消费者都访问一个共享的数据存储区域,但是同一个时机只能有一个线程能操作该数据存储区域,否则就出现竞争条件和数据不一致的问题。

因此,AQS不仅仅提供了锁同步等待功能,还推出了await/signal的通信机制,目的就是为了解决在这样的背景下线程间通信的问题。并且从源码分析上看,比起synchronized/wait/notity

的组合拳来说,此机制还是相对公平的。因为在AQS同步阻塞队列(上)一文中就分析过,synchronized关键字创建的锁是非公平锁,不是先来后到的机制。

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/415766.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

逻辑器件输出高阻态时,输出端口的电平是什么状态呢?

高阻态是逻辑器件输出端口的一种状态&#xff0c;当端口处于高阻态时&#xff0c;输入端口的电平变化不会引起输出端口变化&#xff0c;不会对与之相连的后级输入端口或总线产生影响&#xff0c;对于总线架构的电路极为重要。   输出端口处于高阻态时&#xff0c;输出端口处于…

优秀软件工程师的工作思维

引言 在快速迭代的软件开发领域&#xff0c;软件工程师不仅需要精通编程技术&#xff0c;还需要具备产品思维、技术思维和工程思维&#xff0c;这三种思维相辅相成&#xff0c;共同推动产品的成功。本文将借鉴陈春花等管理学者的思考方式&#xff0c;深入剖析软件工程师如何在…

数据恢复工具,电脑+手机双端,十分好用!

哈喽&#xff0c;各位小伙伴们好&#xff0c;我是给大家带来各类黑科技与前沿资讯的小武。 今天给大家安利两款数据恢复工具&#xff0c;分别为电脑手机双端&#xff0c;无论是因为格式化误操作、设备损坏还是其他意外情况&#xff0c;都能轻松找回重要的文件、照片、视频等数…

什么是串口服务器?

1.什么是串口服务器&#xff1f; 了解串口服务器之前&#xff0c;我们需要先了解什么串口。 串口&#xff1a;又叫串行数据接口&#xff0c;主要是用来表示传递各种的数据的通信接口&#xff0c;通常指COM口。一般分为RS232、RS422、与RS485三种。RS232接口&#xff1a;采用全…

Datawhale X 李宏毅苹果书 AI夏令营 Task_1深度学习详解入门

目录 一、机器学习的基本概念 二、机器学习的主要任务类型 三、案例学习&#xff08;以视频的点击次数预测为例&#xff09; 四、梯度下降问题 一、机器学习的基本概念 机器学习&#xff0c;顾名思义&#xff0c;是让机器具备学习的能力。具体来说&#xff0c;机器学习就是…

ASP.NET MVC+LayUI视频上传完整教程

前言 前段时间在使用APS.NET MVCLayUI做视频上传功能的时&#xff0c;发现当上传一些内存比较大的视频就会提示上传失败&#xff0c;后来通过查阅相关资料发现.NET MVC框架为考虑安全问题&#xff0c;在运行时对请求的文件的长度&#xff08;大小&#xff09;做了限制默认为4M…

维信小程序禁止截屏/录屏

一、维信小程序禁止截屏/录屏 //录屏截屏,禁用wx.setVisualEffectOnCapture({visualEffect:hidden});wx.setVisualEffectOnCapture(Object object) 测试安卓手机&#xff1a; 用户截屏&#xff0c;被禁用 用户录屏&#xff0c;录制的是空白内容/黑色内容的视频。 二、微信小…

一种常用嵌入式开发代码库

链接&#xff1a;https://gitee.com/zhangxinyuanqi/varch 使用开源协议&#xff1a;GPL-2.0 varch简介 varch&#xff08;we-architecture&#xff0c;意为我们的框架库&#xff09;是嵌入式C语言常用代码模块库&#xff0c;包含了嵌入式中常用的算法库, 数据结构&#xff…

【云原生系列之SkyWalking的部署】

1、分布式链路追踪 1.1概念 在较大的web集群和微服务环境中&#xff0c;客户端的一次请求需要经过不同的模块&#xff0c;多个不同中间件&#xff0c;多个不同机器一起相互协作才能处理完成客户端的请求&#xff0c;而在这一系列的请求过程之中,处理流程可能是串行执行,也可能…

SprinBoot+Vue实验室考勤管理微信小程序的设计与实现

目录 1 项目介绍2 项目截图3 核心代码3.1 Controller3.2 Service3.3 Dao3.4 application.yml3.5 SpringbootApplication3.5 Vue3.6 uniapp代码 4 数据库表设计5 文档参考6 计算机毕设选题推荐7 源码获取 1 项目介绍 博主个人介绍&#xff1a;CSDN认证博客专家&#xff0c;CSDN平…

【笔试练习】深信服校园招聘c/c 软件开发H卷

题目链接 一、填空题 如图所示&#xff0c;平面上有两条平行的线段&#xff0c;上面的线段有A0~A3 4个点&#xff0c;下面的线段有B0到B5 6个点&#xff0c;现在需要把所有的点都连接起来&#xff0c;有如下约束&#xff1a; 每个端点&#xff0c;都至少有一条到另一平行线上端…

HTML+CSS+Query实现二级菜单

在网页设计中&#xff0c;导航菜单是非常重要的部分之一&#xff0c;尤其是具有二级下拉菜单的导航栏&#xff0c;可以提升用户体验。本文将通过HTML、CSS和jQuery实现一个具有二级菜单标题的导航栏&#xff0c;并详细讲解每一步的实现过程。 <!DOCTYPE html> <html …

TS 学习(一)

如果我们在 ts 中写 不用运行就能在文件中报错 ts 是一种静态类型的检查 能将运行时出现的错误前置 一般不用 命令行编译 ts 转换成 js 将中文转码 tsc index&#xff08;.ts&#xff09; 输入命令生成 配置文件 能在中间进行 配置转换成 js 的哪个规范 es5 还是 6 和其它转…

JavaScript编程语言的学习

一、JavaScript介绍 JavaScript 是一种轻量级的脚本语言。所谓“脚本语言”&#xff0c;指的是它不具备开发操作系统的能力&#xff0c;而是只用来编写控制其他大型应用程序的“脚本”。 JavaScript 是一种嵌入式&#xff08;embedded&#xff09;语言。它本身提供的核心语法不…

数分基础(06)商业分析四种类型简介

文章目录 1. 商业分析2. 四种类型2.1 描述性分析和诊断性分析2.1.1 加载Global_Superstore数据集2.1.2 描述性分析2.1.3 诊断性分析2.1.4 再进一步各地区的订单数量和平均订单金额按客户群体分析销售额和利润折扣率和利润产品类别和子类别的销售和利润 本小节小结 2.2 销售预测…

在众多编程工具中,哪一个最能提高你的生产力?

随着软件开发行业的快速发展&#xff0c;开发者们需要使用多种工具来管理代码、调试应用程序、测试功能、以及处理数据库操作。每一个环节都可能会影响到整个项目的进展和最终质量&#xff0c;因此选择合适的工具对于提高工作效率至关重要。在这篇文章中&#xff0c;我将从开发…

VMware16安装Win11虚拟机全步骤

目录 准备工作下载镜像安装镜像开启虚拟机安装虚拟机安装Win11成功 准备工作 1、虚拟机&#xff1a;VMware16.2.1&#xff08;建议使用VMware16版本&#xff0c;15可能不兼容&#xff09; 2、Windows11镜像 下载镜像 1、浏览器打开网址&#xff1a;I tell you 可以看到有三…

坐牢第三十四天(c++)

一.作业 1.栈的手写 #include <iostream> using namespace std; // 封装一个栈 class stcak { private:int *data; //int max_size; // 最大容量int top; // 下标 public:// 无参构造函数stcak();// 有参构造函数stcak(int size);// 拷贝构造函数stcak(const s…

MySQL数据库增删查改(基础)CRUD

CRUD 即增加 (Create) 、查询 (Retrieve) 、更新 (Update) 、删除 (Delete) 四个单词的首字母缩写。 1. 新增&#xff08;Create&#xff09; 1.1单行数据&#xff08;全列插入&#xff09; 比如说&#xff1a;创建一张学生表&#xff0c;有姓名&#xff0c;学号。插入两个学…

什么是科学碳目标(SBTI认证)

科学碳目标&#xff08;SBTI认证&#xff09;&#xff0c;这一绿色发展的璀璨明珠&#xff0c;是企业迈向可持续未来的重要里程碑。它不仅是全球环境信息研究中心(CDP)、联合国全球契约组织(UNGC)、世界资源研究所(WRI)与世界自然基金会(WWF)共同铸就的智慧结晶&#xff0c;更是…