【阻塞队列】阻塞队列的模拟实现及在生产者和消费者模型上的应用

文章目录

  • 📄前言
  • 一. 阻塞队列初了解
    • 🍆1. 什么是阻塞队列?
    • 🍅2. 为什么使用阻塞队列?
    • 🥦3. Java标准库中阻塞队列的实现
  • 二. 阻塞队列的模拟实现
    • 🍚1. 实现普通队列
    • 🍥2. 实现队列的阻塞功能
    • 🧊3. 解除阻塞状态
  • 三. 使用模拟的阻塞队列验证生产者和消费者模型

📄前言

本文是对阻塞队列的应用场景的介绍,对阻塞队列的作用以及具体实现的讨论。


一. 阻塞队列初了解

🍆1. 什么是阻塞队列?

阻塞队列是一种带有阻塞功能的“先进先出”线性表。即在一个带有最大容量的队列中,在某时刻队列容量已满时继续入队 或 队列为空时继续出队,就会进入阻塞等待状态,直到队列变为 未满或非空 便解除阻塞状态,继续入队或出队。

🍅2. 为什么使用阻塞队列?

若存在以下简易的分布式系统:
在这里插入图片描述
上述分布式系统虽然能完成客户端与服务器端的交互需求,但可能存在以下问题:

  1. 在正常情况下,用户可以通过客户端想服务器发起请求并获取相应的服务,但假如在某刻服务器A突然出现了故障,与服务器A直接通信的服务器B也可能因此出现故障,导致整个服务瘫痪。
  2. 若未来想增加 更多的服务器 来处理服务器A发起的请求,则需求对 服务器A 的接口 进行一定的改动,付出一定的时间和人力成本。
  3. 当某个时刻,很多的客户端同时向 服务器A 发起请求,作为与用户直接交互的服务器,服务器A具备承载这些并发量的能力,但服务器集群中负责其他功能的服务器接收请求的承载能力可能较弱,此时可能造成其他服务器的崩溃。

造成上述现象的原因可以归结为以下两点:

  1. 模块间的耦合性较高(例如问题1和2)
  2. 承载能力较弱的模块不具备抗冲击能力。(例如问题3)

上述的解决方法是在服务器之间加入一个阻塞队列,利用生产者和消费者模型解决以上问题。
什么是生产者消费者模型呢?(如下图)
在这里插入图片描述

当服务器A接收来自客户端的请求时,不把请求直接发给服务器B,而是将请求数据加入到队列中,服务器B通过队列接收请求并把请求除了的结果返回给A。


当上述分布式系统引入阻塞队列后工作模式如下图所示:
在这里插入图片描述

引入阻塞队列的好处:

  1. 解耦合。当服务器A或服务器B出现问题时,就不会对其他服务器造成直接的影响;当需要添加新的服务器来处理这些请求时,新的服务器也同样只需从队列中取数据,无需对原有服务器的接口(代码)进行任何的改动。
  2. 削峰填谷”。当服务器A 瞬间接收客户端发来的大量请求时,由于服务器B处理请求的速度较慢,剩余的请求会在阻塞队列里面堆积,虽然客户端获取服务的时间相对增加了,但一定程度上缓解了其他承受并发量能力较弱的服务器的压力。

🥦3. Java标准库中阻塞队列的实现

在这里插入图片描述

BlockingQueue的主要方法:
在这里插入图片描述
方法演示如下:(使用普通入队方法入队4次,再使用带有阻塞的出队方法出队4次)

public static void main(String[] args) throws InterruptedException {BlockingQueue<Integer> q = new ArrayBlockingQueue<>(3);System.out.println("数据 5 入队状态: " + q.offer(5));System.out.println("数据 6 入队状态: " + q.offer(6));System.out.println("数据 7 入队状态: " + q.offer(7));System.out.println("数据 8 入队状态: " + q.offer(8));System.out.print("队列中的数据: ");System.out.println(q);System.out.println("数据出队: ");for (int i = 0; i < 4; i++) {System.out.print(q.take() + " ");}System.out.println("程序结束 !");
}

在这里插入图片描述

可以发现,当调用 take()方法取出队列元素时,因为队列最终为空,程序进入了阻塞状态,没有打印“程序结束”。


二. 阻塞队列的模拟实现

🍚1. 实现普通队列

阻塞队列的关键方法是两个带有阻塞功能的 put() 和 take()方法,而这两个方法是在原有出入队方法上使用 Object类 带有wait()方法 和 notify() 方法让线程进入等待状态 或 唤醒线程。
因此,我们可以先把基础的队列进行实现,随后在原有基础上进行修改。队列可以使用数组(环形队列)或链表两种方式实现,这里我采用数组的方式实现队列。(由于队列的实现方法较为常见,这里直接给出实现代码)

class MyBlockingQueue<E> {private Object[] elem;private int defaultCapacity = 11;	// 阻塞队列默认容量private int front;	// 记录队头元素位置private int rear;	// 记录队尾元素位置private int size;   // 用于记录当前队列元素的实际个数public MyBlockingQueue(){this.elem = new Object[defaultCapacity + 1];}public MyBlockingQueue(int capacity) {defaultCapacity = capacity;this.elem = new Object[defaultCapacity + 1];}public boolean offer(E val) {// 判断队列是否已满if (size == defaultCapacity) {return false;}elem[rear] = val;size++;// 如果 rear自增 到达数组末尾,使 rear 重新到数组的头部rear = (rear + 1) % (defaultCapacity + 1);return true;}public E poll() {// 判断队列是否为空if (front == rear) {return null;}Object ret = elem[front];size--;// 如果 front 自增 到达数组末尾,使 front 重新到数组的头部front = (front + 1) % (defaultCapacity + 1);return (E)ret;}
}

🍥2. 实现队列的阻塞功能

当阻塞队列容量已满时,调用 put() 方法会进入阻塞状态,因此在原先 offer()方法判断的基础上,我们需要使用 wait()方法 让线程进入阻塞等待状态,考虑到可能有多个线程同时调用 put()方法,可能会引起线程安全问题,因此我们应在 if()判断条件和整个修改操作上 加锁(或者直接在方法上加锁)。(代码如下)

public void put (E value) throws InterruptedException {// 判断队列是否已满synchronized (this) {if (size == defaultCapacity) {// 队列进入阻塞状态, 直到有元素出队时 解除阻塞this.wait();}queue[rear] = value;size++;// 如果 rear自增 到达数组末尾,使 rear 重新到数组的头部rear = (rear + 1) % (defaultCapacity + 1);}
}

当队列为空时,调用 take() 方法会使线程进入阻塞状态,同理若判空条件成立,我们需要调用 wait() 方法使线程进入阻塞,为防止多个线程在队列即将为空时同时调用 take() 方法引发线程安全问题,我们需要在 if()判断语句 和 整个修改操作 进行加锁操作(或者直接在方法上加锁)。(代码如下)

public E take() throws InterruptedException {// 判断队列是否为空synchronized (this) {if (rear == front) {// 队列进入阻塞状态,直到有新的元素入队时 解除阻塞this.wait();}Object ret = queue[front];// 如果 front 自增 到达数组末尾,使 front 重新到数组的头部front = (front + 1) % (defaultCapacity + 1);size--;return (E)ret;}
}

🧊3. 解除阻塞状态

什么情况下队列会接触阻塞状态呢?

  1. 当队满时,某个线程从阻塞队列取出一个元素,即执行完出队操作后,需要使用 notify()方法 唤醒因执行 put()方法而阻塞的线程。
  2. 当队空时,某个线程向队列新增一个元素,即执行完入队操作后,需要使用 notify()方法唤醒因执行 take()方法而阻塞的线程。

对 put()方法和take()方法 修改后代码如下:

public void put (E value) throws InterruptedException {// 判断队列是否已满synchronized (this) {if (size == defaultCapacity) {// 队列进入阻塞状态, 直到有元素出队时 解除阻塞this.wait();}queue[rear] = value;size++;// 如果 rear自增 到达数组末尾,使 rear 重新到数组的头部rear = (rear + 1) % (defaultCapacity + 1);// 此处的 notify 用来唤醒 队列为空时的 waitthis.notify();}
}public E take() throws InterruptedException {// 判断队列是否为空synchronized (this) {if (rear == front) {// 队列进入阻塞状态,直到有新的元素入队时 解除阻塞this.wait();}Object ret = queue[front];// 如果 front 自增 到达数组末尾,使 front 重新到数组的头部front = (front + 1) % (defaultCapacity + 1);size--;// 此处的 notify 用来唤醒 队列为满时的 waitthis.notify();return (E)ret;}
}

三. 使用模拟的阻塞队列验证生产者和消费者模型

为了方便看到效果,我们假设阻塞队列的容量为2,并将生产与消费的数据进行打印。
当生产者与消费者处理数据的频率一样,且生产速率为 次/1s、消费速率为 次/1s 时,程序的生产与消费数据应轮流打印:(模拟代码和程序运行结果如下)

public static void main(String[] args) {MyBlockingQueue<Integer> myBlockingQueue = new MyBlockingQueue<>(2);// 生产者Thread producer = new Thread(() -> {for (int i = 0; i < 5; i++) {try {myBlockingQueue.put(i);System.out.println("生产了: " + i);Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}}});// 消费者Thread consumer = new Thread(() -> {for (int i = 0; i < 5; i++) {try {int ret = myBlockingQueue.take();System.out.println("消费了: " + ret);Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}}});producer.start();consumer.start();
}

在这里插入图片描述

当生产速率 > 消费速率,且生产速率为 次/1s、消费速率为 次/2s 时:可以预估到,当经过5s后程序会因队满进入阻塞状态,且后续每消费一次伴随着一次生产,为方便观察阻塞情况,我们可以在方法实现的地方加上阻塞日志的提示(模拟代码和程序运行结果如下)

public static void main(String[] args) {MyBlockingQueue<Integer> myBlockingQueue = new MyBlockingQueue<>(2);Thread producer = new Thread(() -> {for (int i = 0; i < 10; i++) {try {myBlockingQueue.put(i);System.out.println("生产了: " + i);Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}}});Thread consumer = new Thread(() -> {for (int i = 0; i < 10; i++) {try {int ret = myBlockingQueue.take();System.out.println("消费了: " + ret);Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}}});producer.start();consumer.start();
}

在这里插入图片描述

当生产速率 < 消费速率,且生产速率为 次/2s、消费速率为 次/1s 时:可以预估到,当经过2s后程序会因队满进入阻塞状态,且后续每生产一次伴随着一次消费,为方便观察阻塞情况,我们可以在方法实现的地方加上阻塞日志的提示(模拟代码和程序运行结果如下)

public static void main(String[] args) {MyBlockingQueue<Integer> myBlockingQueue = new MyBlockingQueue<>(2);Thread producer = new Thread(() -> {for (int i = 0; i < 10; i++) {try {myBlockingQueue.put(i);System.out.println("生产了: " + i);Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}}});Thread consumer = new Thread(() -> {for (int i = 0; i < 10; i++) {try {int ret = myBlockingQueue.take();System.out.println("消费了: " + ret);Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}}});producer.start();consumer.start();
}

在这里插入图片描述


以上就是本篇文章的全部内容了,如果这篇文章对你有些许帮助,你的点赞、收藏和评论就是对我最大的支持。
另外,文章可能存在许多不足之处,也希望你可以给我一点小小的建议,我会努力检查并改进。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/246264.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

掌握Pyecharts:绘制炫酷词云图的参数解析与实战技巧【第36篇—python:词云图】

文章目录 安装Pyecharts基本的词云图绘制自定义词云图样式多种词云图合并高级词云图定制与交互1. 添加背景图片2. 添加交互效果 使用自定义字体和颜色从文本文件生成词云图总结&#xff1a; 在数据可视化领域&#xff0c;词云图是一种极具表现力和趣味性的图表&#xff0c;能够…

【DevOps】Jenkins Extended E-mail 邮件模板添加自定义变量

文章目录 1、配置Jenkins邮箱2、配置告警模板1、配置Jenkins邮箱 略 2、配置告警模板 自定义变量:DYSK_PYTEST_STATUS // Uses Declarative syntax to run commands inside a container. pipeline {agent {kubernetes {cloud "kubernetes" //选择名字是kuberne…

vivado DDS学习

实现DDS通常有两种方式&#xff0c;一种是读取ROM存放的正弦/余弦信号的查表法&#xff0c;另一种是用DDS IP核。这篇学习笔记中&#xff0c;我们要讲解说明的是VIVADO DDS IP核的应用。 目前本篇默认Phase Generator and SIN/COS LUT&#xff08;DDS&#xff09;的standard模式…

10.Elasticsearch应用(十)

Elasticsearch应用&#xff08;十&#xff09; 1.为什么需要聚合操作 聚合可以让我们极其方便的实现对数据的统计、分析、运算&#xff0c;例如&#xff1a; 什么品牌的手机最受欢迎&#xff1f;这些手机的平均价格、最高价格、最低价格&#xff1f;这些手机每月的销售情况如…

SpringCloud-Knife4j文档聚合

在微服务架构下&#xff0c;如果给每个微服务都配置文档&#xff0c;那么每个微服务的接口文档都有自己独立的访问地址&#xff0c;这样要一个个打开每个微服务的文档非常麻烦。一般我们会采用聚合的办法&#xff0c;将所有微服务的接口整合到一个文档中&#xff0c;具体做法有…

web前端项目-五子棋【附源码】

五子棋&#xff08;人机对弈&#xff09; 本项目【五子棋】是一款人机对弈的策略型棋类游戏。可以选择落子方&#xff1b;游戏难度和是否显示落子次序。游戏双方分别使用黑白两色的棋子&#xff0c;在棋盘直线与横线的交叉点上进行对弈。五子棋可以促进大脑发育、提高思维能力…

openlayers+vue实现缓冲区

文章目录 前言一、准备二、初始化地图1、创建一个地图容器2、引入必须的类库3、地图初始化4、给地图增加底图 三、创建缓冲区1、引入需要的工具类库2、绘制方法 四、完整代码总结 前言 缓冲区是地理空间目标的一种影响范围或服务范围,是对选中的一组或一类地图要素(点、线或面…

华为三层交换机之基本操作

Telnet简介 Telnet是一个应用层协议,可以在Internet上或局域网上使用。它提供了基于文本的远程终端接口&#xff0c;允许用户在本地计算机上登录到远程计算机&#xff0c;然后像在本地计算机上一样使用远程计算机的资源。Telnet客户端和服务器之间的通信是通过Telnet协议进行的…

[蓝桥杯]真题讲解:冶炼金属(暴力+二分)

蓝桥杯真题视频讲解&#xff1a;冶炼金属&#xff08;暴力做法与二分做法&#xff09; 一、视频讲解二、暴力代码三、正解代码 一、视频讲解 视频讲解 二、暴力代码 //暴力代码 #include<bits/stdc.h> #define endl \n #define deb(x) cout << #x << &qu…

C语言第十弹---函数(上)

✨个人主页&#xff1a; 熬夜学编程的小林 &#x1f497;系列专栏&#xff1a; 【C语言详解】 【数据结构详解】 函数 1、函数的概念 2、库函数 2.1、标准库和头文件 2.2、库函数的使用方法 2.2.1、功能 2.2.2、头文件包含 2.2.3、实践 2.2.4、库函数文档的⼀般格式 …

PWM调光 降压恒流LED芯片FP7127:为照明系统注入新能量(台灯、GBR、调光电源、汽车大灯)

目录 一、降压恒流LED芯片FP7127 二、降压恒流LED芯片FP7127具有以下特点&#xff1a; 三、降压恒流LED芯片FP7127应用领域&#xff1a; LED照明和调光的新纪元随着LED照明技术的不断发展&#xff0c;人们对于照明调光的需求也越来越高。PWM调光技术作为一种常用的调光方法&…

RUST笔记:candle使用基础

candle介绍 candle是huggingface开源的Rust的极简 ML 框架。 candle-矩阵乘法示例 cargo new myapp cd myapp cargo add --git https://github.com/huggingface/candle.git candle-core cargo build # 测试&#xff0c;或执行 cargo ckeckmain.rs use candle_core::{Device…

设计模式—行为型模式之责任链模式

设计模式—行为型模式之责任链模式 责任链&#xff08;Chain of Responsibility&#xff09;模式&#xff1a;为了避免请求发送者与多个请求处理者耦合在一起&#xff0c;于是将所有请求的处理者通过前一对象记住其下一个对象的引用而连成一条链&#xff1b;当有请求发生时&am…

wsl下安装ros2问题: Unable to locate package ros-humble-desktop 解决方案

❗ 问题 在wsl&#xff08;Ubuntu 22.04版本&#xff09;下安装ros的过程中&#xff0c;在执行命令 $ sudo apt install ros-humble-desktop一直弹出报错&#xff1a;Unable to locate package ros-humble-desktop 前面设置编码和添加源的过程中一直没有出现其他问题&#…

Docker 配置 Gitea + Drone 搭建 CI/CD 平台

Docker 配置 Gitea Drone 搭建 CI/CD 平台 配置 Gitea 服务器来管理项目版本 本文的IP地址是为了方便理解随便打的&#xff0c;不要乱点 首先使用 docker 搭建 Gitea 服务器&#xff0c;用于管理代码版本&#xff0c;数据库选择mysql Gitea 服务器的 docker-compose.yml 配…

基于Java+SpringBoot+vue+elementui的校园文具商城系统详细设计和实现

基于JavaSpringBootvueelementui的校园文具商城系统详细设计和实现 欢迎点赞 收藏 ⭐留言 文末获取源码联系方式 文章目录 基于JavaSpringBootvueelementui的校园文具商城系统详细设计和实现前言介绍&#xff1a;系统设计&#xff1a;系统开发流程用户登录流程系统操作流程 功能…

剧本杀小程序开发:打造沉浸式推理体验

随着社交娱乐形式的多样化&#xff0c;剧本杀逐渐成为年轻人喜爱的聚会活动。而随着技术的发展&#xff0c;剧本杀小程序的开发也成为了可能。本文将探讨剧本杀小程序开发的必要性、功能特点、开发流程以及市场前景。 一、剧本杀小程序开发的必要性 剧本杀是一种角色扮演的推…

【七、centos要停止维护了,我选择Almalinux】

搜索镜像 https://developer.aliyun.com/mirror/?serviceTypemirror&tag%E7%B3%BB%E7%BB%9F&keywordalmalinux dvd是有界面操作的&#xff0c;minimal是最小化只有命里行 镜像下载地址 安装和centos基本一样的&#xff0c;操作命令也是一样的&#xff0c;有需要我…

Unity配置表xlsx/xls打包后读取错误问题

前言 代码如下&#xff1a; //文本解析private void ParseText(){//打开文本 读FileStream stream File.Open(Application.streamingAssetsPath excelname, FileMode.Open, FileAccess.Read, FileShare.Read);//读取文件流IExcelDataReader excelRead ExcelReaderFactory…

idea中debug Go程序报错error layer=debugger could not patch runtime.mallogc

一、问题场景 在idea中配置了Go编程环境&#xff0c;可以运行Go程序&#xff0c;但是无法debug&#xff0c;报错error layerdebugger could not patch runtime.mallogc: no type entry found, use ‘types’ for a list of valid types 二、解决方案 这是由于idea中使用的d…