如何理解 Java 中的阻塞队列:从基础到高级的深度解析

提到阻塞队列,许多人脑海中会浮现出 BlockingQueueArrayBlockingQueueLinkedBlockingQueueSynchronousQueue。尽管这些实现看起来复杂,实际上阻塞队列本身的概念相对简单,真正挑战在于内部的 AQS(Abstract Queuing Synchronizer)。如果你对阻塞队列感到陌生,希望下面的内容能帮助你从全新角度理解它。


文章目录

      • 1、线程间通信
        • 2、线程间通信的实现
        • 2.1、轮询
        • 2.2、等待唤醒机制(wait/notify)
        • 2.3、等待唤醒机制(Condition)
      • 3、自定义阻塞队列
        • 4、Java 中的 BlockingQueue


1、线程间通信

线程间通信是指多个线程对共享资源的操作和协调。在生产者-消费者模型中,生产者和消费者是不同种类的线程,他们对同一个资源(如队列)进行操作。生产者负责向队列中插入数据,消费者负责从队列中取出数据。

主要挑战在于如何在资源达到上限时让生产者等待,而在资源达到下限时让消费者等待。线程间的这种相互调度,就是线程间通信。

以现实生活为例。消费者和生产者就像两个线程,原本做着各自的事情,厂家管自己生产,消费者管自己买,一般情况下彼此互不影响。900 240

image-20240808015130401

但当物资到达某个临界点时,就需要根据供需关系适当作出调整。比如,当厂家做了一大堆东西,产能过剩时,应该暂停生产,扩大宣传,让消费者过来消费。

image-20240808015015146

同理,当消费者发现某个热销商品售罄,应该提醒厂家尽快生产。

image-20240808015356312

在上面的案例中,生产者和消费者是不同种类的线程,一个负责存入,另一个负责取出,且它们操作的是同一个资源。但最难的部分在于:资源到达上限时,生产者等待,消费者消费;资源达到下限时,生产者生产,消费者等待。

我们可以发现,原本互不打扰的两个线程之间开始了 “沟通”:

  • 生产者:做的商品太多了,应该扩大宣传,让大家来买。
  • 消费者:都卖完啦,应当提醒商家尽快补货。

这种线程间的相互调度,也就是线程间通信。


2、线程间通信的实现

实现线程间通信的方式有多种:

  • 轮询:生产者和消费者线程通过循环不断检查队列的状态。这种方法简单,但会消耗大量 CPU 资源,且无法保证原子性。
  • 等待唤醒机制(wait/notify):通过 waitnotify 机制,线程可以在队列为空或满时阻塞自己,当状态改变时由其他线程唤醒。synchronized 保证了线程的原子性,但 notify 可能导致线程竞争不均。
  • 等待唤醒机制(Condition):使用ReentrantLockCondition实现等待唤醒机制,可以更加精确地控制线程的阻塞和唤醒。通过创建不同的Condition实例,可以分别管理生产者和消费者的等待状态,避免了notify的随机唤醒问题。
2.1、轮询

设计理念:生产者和消费者线程通过循环不断检查队列的状态,队列为空时生产者才可插入数据,队列不为空时消费者才能取出数据,否则一律 sleep 等待。

image-20240808015823555

代码实现:

import java.util.LinkedList;
import java.util.concurrent.TimeUnit;/*** 自定义阻塞队列实现:轮询版本* * @param <T> 队列中存储的元素类型*/
public class WhileQueue<T> {// 用来存储元素的容器private final LinkedList<T> queue = new LinkedList<>();// 队列的最大容量private final int MAX_SIZE = 1;/*** 将元素添加到队列中* * @param resource 要插入的元素* @throws InterruptedException 如果当前线程被中断*/public void put(T resource) throws InterruptedException {// 如果队列满了,生产者线程将进入轮询等待状态while (queue.size() >= MAX_SIZE) {System.out.println("生产者:队列已满,无法插入...");TimeUnit.MILLISECONDS.sleep(1000); // 线程等待1秒钟再重试}// 插入元素到队列的前面System.out.println("生产者:插入" + resource + "!!!");queue.addFirst(resource);}/*** 从队列中取出元素* * @throws InterruptedException 如果当前线程被中断*/public void take() throws InterruptedException {// 如果队列为空,消费者线程将进入轮询等待状态while (queue.size() <= 0) {System.out.println("消费者:队列为空,无法取出...");TimeUnit.MILLISECONDS.sleep(1000); // 线程等待1秒钟再重试}// 从队列的末尾取出元素System.out.println("消费者:取出消息!!!");queue.removeLast();TimeUnit.MILLISECONDS.sleep(5000); // 模拟消费操作需要时间}
}

测试:

/*** 测试类:创建生产者和消费者线程来测试WhileQueue的功能*/
public class Test {public static void main(String[] args) {// 创建一个WhileQueue实例WhileQueue<String> queue = new WhileQueue<>();// 创建并启动生产者线程new Thread(new Runnable() {@Overridepublic void run() {for (int i = 0; i < 100; i++) {try {queue.put("消息" + i); // 插入消息到队列} catch (InterruptedException e) {e.printStackTrace(); // 捕获并打印中断异常}}}}).start();// 创建并启动消费者线程new Thread(new Runnable() {@Overridepublic void run() {for (int i = 0; i < 100; i++) {try {queue.take(); // 从队列中取出消息} catch (InterruptedException e) {e.printStackTrace(); // 捕获并打印中断异常}}}}).start();}
}

由于设定了队列最多只能存1个消息,所以只有当队列为空时,生产者才能插入数据。这是最简单的线程间通信:多个线程不断轮询共享资源,通过共享资源的状态判断自己下一步该做什么。

但上面的实现方式存在一些缺点:

  • 轮询的方式太耗费 CPU 资源,如果线程过多,比如几百上千个线程同时在那轮询,会给 CPU 带来较大负担
  • 无法保证原子性(代码里没有演示,但理论上确实如此,如果生产者的操作非原子性,消费者极可能获取到脏数据)
2.2、等待唤醒机制(wait/notify)

相对而言,等待唤醒机制则要优雅得多,底层维护线程队列,线程可以在队列为空或满时阻塞自己,当状态改变时由其他线程唤醒。synchronized 保证了线程的原子性,同时避免了过多线程同时自旋造成的 CPU 资源浪费,颇有点用空间换时间的味道。

当一个生产者线程无法插入数据时,就让它在队列里休眠(阻塞),此时生产者线程会释放 CPU 资源,等到消费者抢到 CPU 执行权并取出数据后,再由消费者唤醒生产者继续生产。

Java 有多种方式可以实现等待唤醒机制,最经典的就是通过 waitnotify 的方式:

import java.util.LinkedList;/*** 自定义阻塞队列实现:使用 wait/notify* * @param <T> 队列中存储的元素类型*/
public class WaitNotifyQueue<T> {// 用来存储元素的容器private final LinkedList<T> queue = new LinkedList<>();// 队列的最大容量private final int MAX_SIZE = 1;/*** 将元素添加到队列中* * @param resource 要插入的元素* @throws InterruptedException 如果当前线程被中断*/public synchronized void put(T resource) throws InterruptedException {// 当队列满时,生产者线程进入等待状态while (queue.size() >= MAX_SIZE) {System.out.println("生产者:队列已满,无法插入...");this.wait(); // 释放锁,并进入等待状态}// 插入元素到队列的前面System.out.println("生产者:插入" + resource + "!!!");queue.addFirst(resource);this.notify(); // 唤醒等待的消费者线程}/*** 从队列中取出元素* * @throws InterruptedException 如果当前线程被中断*/public synchronized void take() throws InterruptedException {// 当队列为空时,消费者线程进入等待状态while (queue.size() <= 0) {System.out.println("消费者:队列为空,无法取出...");this.wait(); // 释放锁,并进入等待状态}// 从队列的末尾取出元素System.out.println("消费者:取出消息!!!");queue.removeLast();this.notify(); // 唤醒等待的生产者线程}
}

基于 waitnotify 的阻塞队列。其原理是通过同步机制和线程通信来处理生产者-消费者问题。在 put 方法中,生产者线程检查队列是否已满,如果已满,则调用 wait 使自己进入等待状态,释放锁,直到队列有空位。生产者在插入元素后调用 notify 唤醒可能等待的消费者线程。在 take 方法中,消费者线程检查队列是否为空,如果为空,则调用 wait 使自己进入等待状态,释放锁,直到队列有新元素。消费者在取出元素后调用 notify 唤醒可能等待的生产者线程。这种机制避免了忙等待,通过有效的线程通信提高了资源利用效率。

Ps:使用 notifyAll 在某些情况下可能更合适,尤其是当有多个生产者和消费者线程时。notifyAll 会唤醒所有等待的线程,而不仅仅是一个线程,这样可以保证系统中的所有线程都有机会被唤醒,避免了因线程唤醒不充分导致的潜在问题。

2.3、等待唤醒机制(Condition)

等待唤醒机制(wait/notify)版本的缺点是随机唤醒容易出现"己方唤醒己方",最终导致全部线程阻塞的乌龙事件,虽然 wait/notifyAll 能解决这个问题,但唤醒全部线程又不够精确,会造成无谓的线程竞争(实际只需要唤醒敌方线程即可)。

因此使用ReentrantLockCondition实现等待唤醒机制,可以更加精确地控制线程的阻塞和唤醒。通过创建不同的Condition实例,可以分别管理生产者和消费者的等待状态,避免了notify的随机唤醒问题。

作为改进版,可以使用 ReentrantLockCondition 替代 synchronizedwait/notify

import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;public class ConditionQueue<T> {// 容器,用来装东西private final LinkedList<T> queue = new LinkedList<>();private final int CAPACITY = 10; // 队列容量// 显式锁(相对地,synchronized锁被称为隐式锁)private final ReentrantLock lock = new ReentrantLock();private final Condition producerCondition = lock.newCondition();private final Condition consumerCondition = lock.newCondition();public void put(T resource) throws InterruptedException {lock.lock();try {while (queue.size() >= CAPACITY) {// 队列满了,不能再塞东西了,等待消费者取出数据System.out.println("生产者:队列已满,无法插入...");// 生产者阻塞producerCondition.await();}System.out.println("生产者:插入" + resource + "!!!");queue.addFirst(resource);// 生产完毕,唤醒消费者consumerCondition.signal();} finally {lock.unlock();}}public void take() throws InterruptedException {lock.lock();try {while (queue.size() <= 0) {// 队列空了,不能再取东西,等待生产者插入数据System.out.println("消费者:队列为空,无法取出...");// 消费者阻塞consumerCondition.await();}System.out.println("消费者:取出消息!!!");queue.removeLast();// 消费完毕,唤醒生产者producerCondition.signal();} finally {lock.unlock();}}
}

如何理解 Condition 呢?可以认为 lock.newCondition() 创建了一个队列,调用 producerCondition.await() 会把生产者线程放入生产者的等待队列中,当消费者调用producerCondition.signal() 时会唤醒从生产者的等待队列中唤醒一个生产者线程出来工作。

也就是说,ReentrantLockCondition 通过拆分线程等待队列,让线程的等待唤醒更加精确了,想唤醒哪一方就唤醒哪一方。


3、自定义阻塞队列

基于以上机制,我们可以自定义实现一个简单的阻塞队列。以下代码示例展示了一个基于 wait/notifyAll 实现的阻塞队列:

public class BlockingQueue<T> {private final LinkedList<T> queue = new LinkedList<>();private int MAX_SIZE = 1;private int remainCount = 0;public BlockingQueue(int capacity) {if (capacity <= 0) {throw new IllegalArgumentException("size最小为1");}this.MAX_SIZE = capacity;}public synchronized void put(T resource) throws InterruptedException {while (queue.size() >= MAX_SIZE) {this.wait();}queue.addFirst(resource);remainCount++;this.notifyAll();}public synchronized T take() throws InterruptedException {while (queue.size() <= 0) {this.wait();}T resource = queue.removeLast();remainCount--;this.notifyAll();return resource;}
}

4、Java 中的 BlockingQueue

BlockingQueue 是 Java 并发包(java.util.concurrent)中的一个接口,继承自 Queue 接口。它提供了额外的阻塞操作,例如在队列为空时等待元素变得可用,或在队列已满时等待空间变得可用。

BlockingQueue 阻塞队列在 Java 中的主要实现有三个:

  1. ArrayBlockingQueue: 基于数组实现的有界阻塞队列,必须指定固定容量,支持可选的公平性策略。
  2. LinkedBlockingQueue: 基于链表实现的阻塞队列,默认无界或指定容量,有较高的插入和删除性能。
  3. SynchronousQueue: 一个没有内部容量的队列,每个插入操作必须等待一个对应的删除操作,反之亦然,适用于直接交换数据的场景。

更多实现可以参考:Java 并发集合:阻塞队列集合介绍

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/417105.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C:指针学习(1)-学习笔记

目录 前言&#xff1a; 知识回顾&#xff1a; 1、const 1.1 const修饰普通变量 1.2 const修饰指针变量 1.3 总结&#xff1a; 2、指针运算 2.1 指针-整数 2.2 指针-指针 2.3 指针的关系运算 3、指针的使用 结语&#xff1a; 前言&#xff1a; 距离上一次更新关于初…

MLM:多模态大型语言模型的简介、微调方法、发展历史及其代表性模型、案例应用之详细攻略

MLM&#xff1a;多模态大型语言模型的简介、微调方法、发展历史及其代表性模型、案例应用之详细攻略 目录 相关文章 AI之MLM&#xff1a;《MM-LLMs: Recent Advances in MultiModal Large Language Models多模态大语言模型的最新进展》翻译与解读 MLM之CLIP&#xff1a;CLIP…

基于Java+SpringBoot+Vue的新闻稿件管理系统

基于JavaSpringBootVue的新闻稿件管理系统 前言 ✌全网粉丝20W,csdn特邀作者、博客专家、CSDN[新星计划]导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末附源码下载链接&#x1f345; 哈…

Mac 安装 jdk 8详细教程

Mac 电脑上安装Jdk 8 的步骤很简单&#xff0c;不用想Windows那样需要配置环境变量PATH、JAVA_HOME。 具体方法如下&#xff1a; 首先&#xff0c;去JDK官网下载对应版本的JDK 8。 这里需要注册一个账号&#xff0c;然后&#xff0c;账号下载。 下载完后&#xff0c;得到一个…

Golang | Leetcode Golang题解之第386题字典序排数

题目&#xff1a; 题解&#xff1a; func lexicalOrder(n int) []int {ans : make([]int, n)num : 1for i : range ans {ans[i] numif num*10 < n {num * 10} else {for num%10 9 || num1 > n {num / 10}num}}return ans }

04.【个人网站】如何使用Vercel部署网站

源码&#xff1a;https://github.com/Jessie-jzn/Jessie-Blog.dev 网站&#xff1a;https://www.jessieontheroad.com/ 详细介绍 Vercel 是一个前端开发平台&#xff0c;专注于提供简单、快速的部署和管理静态网站和前端框架应用&#xff08;例如 Next.js&#xff09;的服务。…

计算机基础知识复习9.5

数据交换 电路交换&#xff1a;交换信息的两个主机之间简历专用通道&#xff0c;传输时延小&#xff0c;实时性强&#xff0c;效率低&#xff0c;无法纠正错误。 报文交换&#xff1a;信息拆分成小包(报文&#xff09;大小无限制&#xff0c;有目的/源等信息提高利用率。有转…

制造业中工艺路线(工序)与产线(工作中心)关系

一.工艺路线与生产线是数字孪生中的虚实关系&#xff1a; 1.工艺路线为虚&#xff0c;生产线体为实&#xff1b; 2.工艺路线指导生产线的生产组织&#xff0c;生产线承载工艺路线的能力&#xff0c;把虚拟的生产信息流变成真实的产流。 二.工艺路线与生产线是数字孪生中互为“…

MATLAB 仿真跳频扩频通信系统

1. 简介 跳频扩频&#xff08;FHSS&#xff09;是一种通过在不同的频率之间快速切换来对抗窄带干扰的技术。在这篇博客中&#xff0c;我们将使用 MATLAB 进行 FHSS 通信系统的仿真&#xff0c;模拟跳频过程、调制、解调以及信号在不同步骤中的变化。通过对仿真结果进行可视化&…

Unity Xcode方式接入sdk

入口 创建 GameAppController 类 继承 UnityAppController 并且在类的实现之前 需要 加 IMPL_APP_CONTROLLER_SUBCLASS(GameAppController)&#xff0c;表明这个是程序的入口。UnityAppController 实现了 UIApplicationDelegate。 可以简单看下 UIApplicationDelegate 的生命周…

【时间盒子】-【5.绘制闹钟】动态绘制钟表和数字时间

Tips: Preview装饰器&#xff0c;支持组件可预览&#xff1b; Component装饰器&#xff0c;自定义组件&#xff1b; Canvas组件的使用&#xff1b; 使用RenderingContext在Canvas组件上绘制图形&#xff0c;请参考官方文档&#xff1a;https://developer.huawei.com/consume…

Unity(2022.3.41LTS) - UI详细介绍-Scroll View(滚动视图)

目录 零.简介 一、基本功能与用途 二、主要组件 Rect Transform&#xff08;矩形变换&#xff09;&#xff1a; Scroll Rect&#xff08;滚动矩形&#xff09;组件&#xff1a; Scrollbar&#xff08;滚动条&#xff09;组件&#xff1a; Mask&#xff08;遮罩&#xff…

【GD32】RT-Thread实时操作系统移植(GD32F470ZGT6)

1. 简介 最近几年可以发现国产的实时操作系统越来越受欢迎了&#xff0c;本篇要移植的就是当中的翘楚——RT-Thread。 RT-Thread诞生于2006年&#xff0c;是国内以开源中立、社区化发展起来的一款高可靠实时操作系统 &#xff0c;由睿赛德科技负责开发维护和运营 。并且在上一年…

超详细步骤——Keil MDK-ARM 如何修改工程名字

背景&#xff1a; 注意&#xff1a;本项目是基于 STM32 单片机的裸机程序&#xff0c;使用 STM32CubeMX 工具生成的 Keil MDK-ARM 工程。 目标&#xff1a; 在 Keil MDK-ARM 开发环境中&#xff0c;将名为version0805 的工程重命名为 version0910&#xff0c;并确保所有新编译…

文本怎么在线做成二维码?文字信息生成活码的制作方法

文本怎么做成二维码来展示呢&#xff1f;现在通过二维码分享信息的方式越来越常见&#xff0c;可以将文本二维码应用于许多的用途&#xff0c;比如人员信息、物品信息、通知内容、企业介绍等内容都可以生成二维码来展示。那么文本生成二维码该怎么生成呢&#xff1f;下面教大家…

达梦数据库的系统视图v$sysstat

达梦数据库的系统视图v$sysstat 在达梦数据库&#xff08;DM Database&#xff09;中&#xff0c;V$SYSSTAT 视图提供了关于数据库系统性能和状态的一系列统计信息。这个视图是数据库管理员&#xff08;DBA&#xff09;用来监控和管理数据库性能的重要工具之一。它包含许多统计…

CUDA统一内存:简化GPU编程的内存管理

CUDA统一内存&#xff1a;简化GPU编程的内存管理 在现代GPU编程中&#xff0c;内存管理一直是开发者面临的一个重要挑战。特别是在使用NVIDIA CUDA进行高性能计算时&#xff0c;如何在CPU和GPU之间高效地传输数据、以及如何管理这些数据的生命周期&#xff0c;都是影响程序性能…

postman注入csrf

示例脚本 参数配置位置 必要参数 django项目仅需要设置domain即可&#xff0c;比如www.baidu.com,baidu.com尽量域名精确避免修改到其他域的参数 必须把这个domain添加到 cookies->Manage cookies ->Domains Allowlist 中&#xff0c;否则cookie的注入失败 代码 // 必…

图像白平衡

目录 效果 背景 什么是白平衡&#xff1f; 实现原理 将指定图色调调整为参考图色调主要流程 示例代码 效果 将图一效果转换为图二效果色调&#xff1a; 调整后&#xff0c;可实现色调对换 背景 现有两张图像&#xff0c;色调不一致&#xff0c;对于模型重建会有影响。因…

Java体系中的继承

前言 #继承&#xff08;Inheritance&#xff09; 是面向对象编程&#xff08;OOP&#xff09;中的一个重要概念&#xff0c;它允许一个类&#xff08;称为子类或派生类&#xff09;可以从另一个类&#xff08;称为父类、基类或超类&#xff09;继承属性&#xff08;数据&#…