【多线程-从零开始-捌】阻塞队列,消费者生产者模型

什么是阻塞队列

阻塞队里是在普通的队列(先进先出队列)基础上,做出了扩充

  1. 线程安全
    • 标准库中原有的队列 Queue 和其子类,默认都是线程不安全的
  2. 具有阻塞特性
    • 如果队列为空,进行队列操作,此时就会出现阻塞。一直阻塞到其他线程往队列里添加元素为止
    • 如果队列满了,进行队列操作,此时就会出现阻塞。一直阻塞到其他线程从队列里取走元素为止

基于阻塞队列,最大的应用场景,就是实现“生产者消费者模型”(日常开发中,常见的编程手法)

生产者消费者模型

比如:
小猪佩奇一家准备包饺子,成员有佩奇,猪爸爸和猪妈妈,外加一个桌子

  • 佩奇负责擀面皮
  • 猪爸爸和猪妈妈负责包饺子
  • 桌子用来放你擀好的面皮
    每次佩奇擀好一个面皮后,就放在桌子上,猪爸爸和猪妈妈就用这个面皮包出一个饺子

此时:

  • 佩奇就是面皮的生产者——生产者
  • 猪爸爸和猪妈妈就是面皮的消费者——消费者
  • 桌子就是阻塞队列——阻塞队列

为什么是是阻塞队列而不是普通队列?


因为阻塞队列可以很好的协调生产者和消费者

  • 若佩奇擀面皮很快,不一会桌子上就满了
    • 阻塞队列:佩奇就休息一下,等面皮被消耗一些之后继续再擀
    • 普通队列:不会停,放不下了也一直擀
  • 若猪爸爸和猪妈妈包的很快,不一会桌子上就空了
    • 阻塞队列:猪爸爸和猪妈妈休息一下,等到面皮擀出来之后再包
    • 普通队列:不会停,没面皮了也一直包

好处

上述生产者消费者模型在后端开发中,经常会涉及到
当下后端开发,常见的结构——“分布式系统”,不是一台服务器解决所有问题,而是分成了多个服务器,服务器之间相互调用

主要有两方面的好处

1. 服务器之间解耦合

我们希望见到“低耦合”

  • 模块之间的关联程度/影响程度

通常谈到的“阻塞队列”是代码中的一个数据结构
但是由于这个东西太好用了,以至于会把这样的数据结构单独封装成一个服务器程序,并且在单独的服务器机器上进行部署
此时,这样的饿阻塞队列有了一个新的名字,“消息队列”(Message Queue,MQ)

如果是直接调用image.png|354

  • 编写 A 和 B 代码中,会出现很多对方服务器相关的代码
  • 并且,此时如果 B 服务器挂了,A 可能也会直接受到影响
  • 再并且,如果后续想加入一个 C 服务器,此时对 A 的改动就很大

如果是通过阻塞队列
image.png|526

  • A 之和队列通信
  • B 也只和队列通信
  • A 和 B 互相不知道对方的存在,代码中就更没有对方的影子
    看起来,A 和 B 之间是解耦合了,但是 A 和队列,B 和队列之间,不是引入了新的耦合吗?
  • 耦合的代码,在后续的变更工程中,比较复杂,容易产生 bug
  • 但消息队列是成熟稳定的产品,代码是稳定的,不会频繁更改。A、B 和队列之间的耦合,对我们的影响微乎其微
  • 再增加 C 服务器也很方便,也不会影响到原有的 A 和 B 服务器
2. “削峰填谷”的效果

通过中间的阻塞队列,可以起到削峰填谷的效果,在遇到请求量激增突发的情况下,可以有效保护下游服务器,不会被请求冲垮

阻塞队列的作用就相当与三峡大坝在三峡的防汛作用

image.png

  • A 向队列中写入数据变快了,但是 B 仍然可以按照原有的速度来消费数据
  • 阻塞队列扛下了这样的压力,就像三峡大坝抗住上游的大量水量的压力
  • 如果是直接调用,A 收到多少请求,B 也收到多少,那很可能直接就把 B 给搞挂了
  • 当 A 不再写入数据的时候,但队列中还存有数据,可以继续工给 B
问题
  1. 为啥一个服务器,收到的请求变多,就容易挂?
  • 一台服务器,就是一台“电脑”,上面就提供了一些硬件资源(包括但不限于 CPU,内存,硬盘,网络带宽…)
  • 就算你这个及其配置再好,硬件资源也是有限的
  • 服务器每次收到一个请求,处理这个请求的过程,就都需要执行一系列的代码,在执行这些代码的过程中,就需要消耗一定的硬件资源(CPU,内存,硬盘,网络带宽…)
  • 这些请求小号的总的硬件资源的量,超过了及其能提供的上限,那么此时机器就会出现(卡死,程序直接崩溃等…)
  1. 在请求激增的时候,A 为啥不会挂?队列为啥不会挂?反而是 B 更容易挂呢?
  • A 的角色是一个“网关服务器”,收到客户端的请求,再把请求转发给其他的服务器
    • 这样的服务器里的代码,做的工作比较简单(单纯的数据转发),消耗的硬件资源通常更少
    • 处理一个请求,消耗的资源更少,同样的配置下,就能支持更多的请求处理
  • 同理,队列其实也是比较简单的程序,单位请求消耗的硬件资源,也是比较少见的
  • B 这个服务器,是真正干活的服务器,要真正完成一系列的业务逻辑
    • 这一系列的工作,代码量非常庞大,消耗的时间很多,消耗的系统硬件资源,也是更多的

类似的,像 MySQL 这样的数据库,处理每个请求的时候,做的工作就是比较多的,消耗的硬件资源也是比较多的,因此 MySQL 也是后端系统中,容易挂的部分
对应的,像 Redis 这种内存数据库,处理请求,做的工作远远少于 MySQL,消耗的资源更少,Redis 就比 MySQL 硬朗很多,不容易挂

代价

  1. 需要更多的机器来部署这样的消息队列(小代价)
  2. A 和 B 之间的通信延迟会变长
    • 对于 A 和 B 之间的调用,要求响应时间比较短就不太适合了

每个技术都有优缺点,不能无脑吹,也不能无脑黑

比如:微服务

  • 本质上就是把分布式系统服务拆的更细了,每个服务都很小,只做一项功能
  • 非常适合大公司,部门分的很细
  • 但需要更多的机器,处理请求需要更多的响应时间,更复杂的后端结构,运维成本水涨船高

Java 自带的阻塞队列

阻塞队列在 Java 标准库中也提供了现成的封装——BlockingQueue

image.png|565

  • BlockingQueue 本质上是一个接口,不能直接 new,只能 new 一个类
  • 因为是继承与 Queue,所以 Queue 的一些操作,offerpoll 这些,在 BlockingQueue 中同样可以使用(不过不建议使用,因为都不能阻塞
  • BlockingQueue 提供了另外两个专属方法,都能阻塞
    • put——入列
    • take——出队列
BlockingQueue<String> queue = new ArrayBlockingQueue<>(1000);

capacity 指的是容量,是一个需要加上的参数

public class Demo10 {  public static void main(String[] args) throws InterruptedException {  BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);  queue.put("111");  System.out.println("put成功");  queue.put("111");  System.out.println("put成功");  }
}
//运行结果
put成功
put成功
put成功
  • 只打印了三个,说明第四次 put 的时候容量不够,阻塞了
public class Demo10 {  public static void main(String[] args) throws InterruptedException {  BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);  queue.put("111");  System.out.println("put 成功");  queue.put("111");  System.out.println("put 成功");  queue.take();  System.out.println("take 成功");  queue.take();  System.out.println("take 成功");  queue.take();  System.out.println("take 成功");  }
}
//运行结果
put 成功
put 成功
take 成功
take 成功
  • 由于只有 put 了两次,所以也只有两次 take,随后阻塞住了
public class Demo11 {  public static void main(String[] args) {  BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(1000);  Thread t1 = new Thread(() -> {  int i = 1;  while(true){  try {  queue.put(i);  System.out.println("生产者元素"+i);  i++;  Thread.sleep(1000);  } catch (InterruptedException e) {  throw new RuntimeException(e);  }            }        });        Thread t2 = new Thread(() -> {  while(true) {  try {  Integer i = queue.take();  System.out.println("消费者元素"+i);  } catch (InterruptedException e) {  throw new RuntimeException(e);  }            }        });        t1.start();  t2.start();  }
}
  • 上述程序中,一个线程生产,一个线程消费
  • 实际开发中,通常可能是多个线程生产,多个线程消费

自己实现一个阻塞队列

普通队列

基于数组的队列
实现一个基础的队列

//此处不考虑泛型参数,只是基于 String 进行存储  
class MyBlockingQueue {  private String[] data = null;  private int head = 0;  private int tail = 0;  private int size = 0;  public MyBlockingQueue(int capacity) {  data = new String[capacity];  }    public void put(String s) {  if(size == data.length) {  //队列满了  return;  }        data[tail] = s;  tail++;  if(tail >= data.length){  tail = 0;  }        size++;  }    public String take() {  if(size == 0) {  //队列为空  return null;  }        String ret = data[head];  head++;  if(head >= data.length){  head = 0;  }        size--;  return ret;  }
}

阻塞队列

  • 队列为空,take 就要阻塞,在其他线程 put 的时候唤醒
  • 队列未满,put 就要阻塞,在其他线程 take 的时候唤醒
//此处不考虑泛型参数,只是基于 String 进行存储  
class MyBlockingQueue {  private String[] data = null;  private int head = 0;  private int tail = 0;  private int size = 0;  private Object locker = new Object();  public MyBlockingQueue(int capacity) {  data = new String[capacity];  }  public void put(String s) throws InterruptedException {  //加锁的对象,可以单独定义一个,也可以直接就地使用this  synchronized (locker) {  if (size == data.length) {  //队列满了,需要阻塞  //return;  locker.wait();  }            data[tail] = s;  tail++;  if (tail >= data.length) {  tail = 0;  }            size++;  //唤醒 take 的阻塞  locker.notify();  }    }  public String take() throws InterruptedException {  String ret = "";  synchronized (locker) {  if (size == 0) {  //队列为空,需要阻塞  //return null;  locker.wait();  }            ret = data[head];  head++;  if (head >= data.length) {  head = 0;  }            size--;  //唤醒 put 的阻塞  locker.notify();  }        return ret;  }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/395806.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++ 重要特性探究

shared_from_this 使用分析 场景 类的成员函数需要获取指向自身的shared_ptr的时候类成员函数传递shared_ptr给其他函数或者对象的时候&#xff0c;目的是为了管理对象生命周期使用方法 首先类必须继承 std::enable_shared_from_this<T>必须使用 shared_from_this 获取指…

智慧交通:将物联网与人工智能完美融合

智慧交通是当今社会面临的一个重要挑战&#xff0c;也是人们生活质量提高的一个重要方面。通过将物联网技术与人工智能相结合&#xff0c;我们能够实现智慧交通系统的全面升级和优化&#xff0c;为人们带来更加便捷、高效和安全的出行体验。 在智慧交通领域&#xff0c;物联网…

电脑图片损坏打不开怎么办?能修复吗?

照片和视频是记录和保存现实生活中的事件的最好方式。由于手机储存空间有限&#xff0c;一般我们会把有纪念意义的照片放到电脑上进行保存&#xff0c;但有时难免会遇到照片被损坏打不开的情况&#xff0c;一旦遇到这种情况&#xff0c;先不要急&#xff0c;也不要因为照片打不…

【智能控制】第7章 神经网络理论基础,神经网络的分类,原理,发展,神经网络学习算法(北京航天航空大学)

目录 第7章 神经网络理论基础 1. 神经网络的发展 2. 神经网络原理 3. 神经网络的分类 (1) 前向网络 (2) 反馈网络 (3) 自组织网络 4. 神经网络学习算法 (1) 智能Hebb学习规则 (2) Delta&#xff08;δ&#xff09;学习规则 5. 神经网络的特征及…

【Mind+】 掌控板入门教程09 魔法之光

光是地球生命的来源&#xff0c;是人类生活的依据&#xff0c;更是人类认识外部世界的工具。在科技发达的今天&#xff0c;我们可以通过传感器来检测光&#xff0c;利用光帮助我们更好的生活。 今天就让我们一起通过几个小项目来感受光的魔法吧。 项目示例 掌控板…

经验是负债,学习是资产

经验是负债&#xff0c;学习是资产 经验是负债&#xff0c;学习是资产。这是李嘉诚先生的一句名言。他一语道出了学习在企业发展中的推动作用。 企业家经营的目的&#xff0c;无非就是将利润最大化。企业能够产生利润&#xff0c;靠的是提升自身业绩、降低运营成本&#xff0c;…

使用 Java Swing 创建一个最大公约数计算器 GUI 应用

使用Java语言,设计一个程序,实现求取两个正整数的最大公约数。 比较基础的一个Java小程序。 1、效果展示 2、程序代码 package demo; import javax.swing.*; import java.awt.*;

Kafka基本讲解

Kafka基本讲解 一&#xff1a;Kafka介绍 Kafka是分布式消息队列&#xff0c;主要设计用于高吞吐量的数据处理和消息传输&#xff0c;适用于日志处理、实时数据管道等场景。Kafka作为实时数仓架构的核心组件&#xff0c;用于收集、缓存和分发实时数据流&#xff0c;支持复杂的…

【博客搭建 第二篇章】项目中怎么引入其他的 icon

一、注册账号并将图标添加到自己的项目中 1、网站地址&#xff1a;https://www.iconfont.cn/ 2、注册 iconfont 账号 3、登录 iconfont 网站中 4、添加图标到购物车中 5、添加图标到项目中 6、生成在线连接 7、复制连接 二、项目中配置连接地址 找到项目中的 them…

R语言医疗数据分析笔记

分组因子又是什么意思&#xff0c;分组因子和数组的区别是什么 举个实际的例子 分组因子 分组因子是分类变量&#xff0c;用于将数据分成不同组以便于比较或分析。例如&#xff0c;在一项研究中&#xff0c;研究对象的性别&#xff08;男性和女性&#xff09;可以视为一个分组…

OBC充电机电力系统的安全保障

OBC&#xff08;On-Board Charger&#xff09;充电机是电动汽车的关键部件&#xff0c;它负责将外部交流电转换为直流电&#xff0c;为电动汽车的动力电池充电。因此&#xff0c;OBC充电机的电力系统安全保障至关重要。 首先&#xff0c;OBC充电机需要有良好的电气隔离和保护功…

【mysql 第三篇章】一条 update语句是怎么持久化到磁盘上的?

首先看一下这个 SQL 语句你会不会写? 下面是说明执行这个 SQL 语句&#xff0c;数据库底层做了什么操作。 update users set namexxx where id10;在引擎要执行更新语句的时候&#xff0c;比如更新 id10 这行数据时&#xff0c;他会先查看数据在缓冲池中是否存在&#xff0c;如…

C语言指针详解-包过系列(二)目录版

C语言指针详解-包过系列&#xff08;二&#xff09;目录版 1、数组名的深入理解1.1、数组名的本质1.2、数组名本质的两个例外1.2.1、sizeof&#xff08;数组名&#xff09;1.2.2、&数组名 2、使用指针访问数组3、一维数组传参本质4、二级指针4.1、二级指针介绍4.2、二级指针…

8.9 C++

1.思维导图 2. 搭建一个货币的场景&#xff0c;创建一个名为 RMB 的类&#xff0c;该类具有整型私有成员变量 yuan&#xff08;元&#xff09;、jiao&#xff08;角&#xff09;和 fen&#xff08;分&#xff09;&#xff0c;并且具有以下功能&#xff1a; (1)重载算术运算符…

PCL 曲线4点细分算法

文章目录 一、简介二、实现代码三、实现效果参考资料一、简介 四点细分算法(Four-Point Subdivision Scheme)是一种用于生成平滑曲线的细分算法。与 Chaikin 逼近型细分算法不同,四点细分算法通过插入新的控制点来细化曲线,并生成一条逐步逼近的平滑曲线。该算法通常用于生…

高效管理:如何快速查询并跟踪批量快递物流信息

在现代快节奏的生活中&#xff0c;我们经常需要处理大量的快递单号&#xff0c;以跟踪货物的物流轨迹。无论是电商卖家、物流公司还是个人用户&#xff0c;都希望能够快速、准确地获取到快递的实时信息。为了解决这个问题&#xff0c;我们可以借助一款名为“固乔快递查询助手”…

八、MyBatis

一、MyBatis介绍 MyBatis 是持久层框架&#xff0c;它支持自定义 SQL、存储过程以及⾼级映射。MyBatis 去除了几乎所有的 JDBC 代码以及设置参数和获取结果集的工作。MyBatis 可以通过简单的 XML 或注解来配置 和映射原始类型、接口和 Java POJO&#xff08;Plain Old Java Obj…

最新版的AutoGPT,我搭建好了

最近AutoGPT不是更新了嘛 安装 我按照官方的教程 在本地搭建好了 改动 可见的改动&#xff0c;主要是把原来的纯命令行改成前后端的形式 看下前端界面 界面比较简单&#xff0c;主要分3个大块 监控 第一个是监控 主要是看你在 build 里构建的Agents的运行情况 build 第一个是Ag…

前端项目中的Server-sent Events(SSE)项目实践及其与websocket的区别

前端项目中的Server-sent Events(SSE)项目实践 前言 在前端开发中&#xff0c;实时数据更新是提升用户体验的重要因素之一。Server-SentEvents(SSE)是一种高效的技术&#xff0c;允许服务器通过单向连接将实时数据推送到客户端。下面将从SSE的基本改变&#xff0c;使用场景展…

大数据Flink(一百零九):阿里云Flink的基本名称概念

文章目录 阿里云Flink的基本名称概念 一、层次结构 二、​​​​​​​​​​​​​​概念说明 1、工作空间&#xff08;Workspace&#xff09; 2、项目空间&#xff08;Namespace&#xff09; 3、资源&#xff08;Resource&#xff09; 4、草稿&#xff08;Draft&#…