【JavaEE初阶 — 多线程】生产消费模型 阻塞队列

   c96f743646e841f8bb30b2d242197f2f.gif

ddb5ae16fc92401ea95b48766cb03d96.jpeg692a78aa0ec843629a817408c97a8b84.gif


    1. 阻塞队列     


   (1) 阻塞队列   


  1. 概念    


阻塞队列是一种特殊的队列,也遵守"先进先出"的原则;
阻塞队列能是一种线程安全的数据结构,主要用来阻塞队列的插入和获取操作:

  • 当队列满了的时候,插入操作会被阻塞,直到队列有空位。
  • 当队列为空的时候,获取操作会被阻塞,直到队列有值。

常用在实现生产者和消费者场景,在笔试题中比较常见。


   2. 如何创建一个阻塞队列      


  • 阻塞队列 BlockingQueue 继承 Queue;

  • 在 Java 标准库中内置了阻塞队列,如果我们需要在一些程序中使用阻塞队列,直接使用标准库中的即可.
  • BlockingQueue是一个 interface 接口,真正实现的类是LinkedBlockingQueue.


   3 查看 put() 和 take() 的阻塞效果  


  • 入队列,出队列操作,对比 queue 的 offer() 和 poll(),阻塞队列更常用的是put(),take();
  • put(),take() 具有阻塞功能;


    (1) 查看 take() 的阻塞功能     
  • 如果队列满了,使用 offer() 会报错,put() 则不会;
  • 因为使用 put() 有阻塞效果,等到别的线程调用 take() 出一个满队列的元素后,put() 的阻塞状态才会结束;

  • 如果没有 put(),直接 take(),此时 take() 的就是一个空队列;
  • 再次执行程序,执行结果什么也没有打印,说明是程序执行 take() 时被阻塞了;

我们通过jconsole来查看当前线程状态:

当前线程状态是waiting,说明该线程进入了没有时间的等待。


   (2) 查看 put() 的阻塞功能     

因为 LinkedBlockingQueue 是基于链表实现的阻塞队列,是可以选择无界(不传 capacity)和有界(传 capacity)的; 

如果不传 capacity,就没那么好验证 put() 和 take() 的阻塞功能,并且在实际开发中,一般建议大家设置 capacity ,避免因为队列过大,导致内存被耗尽,产生“内存超出范围”这样的异常;

所以我们给  LinkedBlockingQueue 传参,并写出如下代码,来验证 put() 的阻塞功能:

运行程序

 从当前执行结果来看,通过打印日志可以发现,程序被阻塞在了最后一个put(),没有打印后面的日志;可以再次 jconsole 查看线程状态:


   总结: 

  • take() 和 put() 是Java标准库中,对于阻塞队列,带有阻塞功能的API;通过上述例子,我们已经初步演示了 take() 和 put() 的阻塞功能;
  • 值得一提,BlockingQueue也有offer, poll, peek等方法,但是这些方法不带有阻塞特性.

    2. 生产消费模型   


   (1) 生产消费模型的概念   


  • 生产者消费者模式是是一种典型的编码技巧,通过一个容器,来解决生产者和消费者的强耦合问题。
  • 生产者和消费者不会直接进行数据交互(通讯),而是通过阻塞队列进行数据交互(通讯);
  • 生产者生产完数据之后,不用等待消费者处理,而是直接扔给阻塞队列,消费者也不会找生产者要数据,而是直接从阻塞队列里取数据。

     (2) 阻塞队列运用场景    


  • 上游服务器A:入口服务器,干的活更简单,单个请求消耗的资源数少;
  • 下游服务器B:通常承担更重的任务量,复杂的计算/存储工作,单个请求消耗的资源数更多;
  • 队列服务器:主要用于存储与转发,针对单个请求,做的事少,可以抗很多的请求量;

生产消费模型的优点:


     解耦合,降低代码耦合度    

  1. A B两个服务器,如果直接进行数据交互,后续单独对A或者B的数据进行修改,大概率就会影响到另一个服务器的正常运转。
  2. 使用阻塞队列作为交易平台,在修改服务器的数据时,由于阻塞队列中的结构固定,两个服务器之间的耦合度就降低,因此能节约后期修改代码的成本

  • 削峰削谷:

  1. 在服务器中,波峰就是请求量高的时候,波谷就是请求量低的时候;
  2. 如果 A 和 B 直接进行交互,当上游服务器A经历波峰,将大量请求传给下游服务器B 的时候,B就有可能挂掉;
  3. 通过队列进行交互,A处理的请求的数量和速度,和B处理的请求的数量和速度就不一样了A处理请求的数量和速度,取决于外面用户的访问量,而B服务器则根据自身来决定;

  4. 哪怕A已经是惊涛骇浪了,B依然波澜不惊,根据自己能够承担的处理请求速度,慢条斯理地消费阻塞队列中的数据;

  5. 运用生产消费模型,即使遇到突发的流量峰值,B服务器也不会轻易挂掉。

  6. 由于流量峰值一般是突发的,时间也比较短;趁着峰值过去了后,B会利用波谷的时间,消费之前波峰积压的数据;


   (3) 生产消费模型代码演示    


    生产消费模型初步演示    


   创建阻塞队列   


   生产者线程   

   消费者线程   


    上述代码逻辑    

  • producer 会因为 while(true),而不断生产数字n,并且 n 随着循环次数增加而增加,并且会不断把生产的数字通过 put(n),入队列;
  • consumer 也会因为 while(true),不断通过 take(),从队列中取出 producer 生产的数字 n,并且打印当前 n 的值;

程序运行结果

    结论     


  • 我们通过打印日志,可以发现,两个线程的执行速度旗鼓相当,生产者线程的生产速度很快,消费也很快,并没有阻塞效果;
  • 上述情况在开发中是一个典型的情况,虽然生产消费模型也会产生阻塞,但是只要我们协调好生产和消费的速度,两个线程的速度相差不大,那么程序都会一直高效地运行

   查看模型阻塞效果    


上述的 producer 和 consumer 两个线程的速度旗鼓相当,直接运行,很难观察到阻塞效果;

我们可以手动协调生产者线程和消费者线程入队列和出队列的速度,来查看阻塞队列产生的阻塞效果:


   观察队列为空的阻塞效果    

我们让 producer 在每次生产元素时,休眠 1 秒,而 consumer 保持原来的消费速度;

阻塞队列中的元素被消耗的速度远远大于生产的速度,进而阻塞队列对 consumer 产生阻塞效果 

   程序运行结果   

通过降低 producer 调用 put() 的速度,进而让阻塞队列对 consumer 的 take() 产生阻塞效果;


    观察队列为满的阻塞效果    

上述是通过降低 producer 的生产速度,来查看阻塞队列对消费者的阻塞效果;

现在,我们来降低 consumer 的消费速度,来观察阻塞队列对 producer 的阻塞效果:

    程序运行结果    


  • 在减低 consumer 的消费速度后,程序会在执行后的一秒,从只调度 producer ,到 producer 和 consumer 并发调度,并且程序只在执行的第一秒,producer 就把阻塞队列填满了;
  • 之后因为队列满了,所以只能 consumer 消费一个元素,生产者才能再生产一个元素;

所以这个案例,是通过减低 consumer 的调用 take(),来让阻塞队列对 producer 调用 put() 的操作产生阻塞效果。


    3. 阻塞队列的模拟实现   


   (1) 实现原理     


 通过基于数组,实现循环队列,在循环队列的基础上,实现阻塞功能:


   (2) 模拟实现     


    1. 基于数组,先模拟实现一个普通队列    


上面不小心把 MyBlockingQueue 设置成内部类了,修改一下:

   2. 成员变量    


通过定义 front 和 tail,来记录队守和队尾在队列中的相关信息,确定入队列和出队列在队列中的相对位置,并且定义 size ,记录队列中的元素个数;

    3. 循环队列原理    

  • tail 指针指向要 put() 元素的空间,front 指针指向要 take 的区间;
  • tail 指针指向一个空的下标元素后,会对这个空间 put() 一个元素,然后 tail++; 
  • front 指针指向一个的下标元素后,会 take() 这个空间的元素,然后 front++; 
  • 如果 front 和 tail 大于 data.length,则令这两个指针重新置为0;

  • 当 front 和 tail 指向同一个元素时,要么队列为空,要么队列为满,put 和 take 两个方法一定会有一个被阻塞;
  • 所以不用担心一个空间的元素被连续 put()两次 (入队列的 elem 还没出队列就被新的elem 覆盖),或者连续 take() (elem已经出队列,还对空的空间进行出队列操作) 两次;
  • 一定是 take() 和 put() 两个反复穿插操作同一个空间。

   4. 实现 put() 方法   


  • 如果队列满了,进入阻塞状态,否则将 tail 指向的数组空间置为要插入的元素 elem;
  • 如果 tail 走出 data 范围,把 tail 重新置为0;
  • 每 put 一个元素,元素个数 size++;


   5. 实现 take() 方法   


  • 如果队列为空,进入阻塞状态,否则接收 front 指向的数组元素并返回;
  • 返回元素后,后续该元素会被 tail 指针覆盖;
  • 如果 front 走出 data 范围,把 front 重新置为0;
  • 每 take 一个元素,元素个数 size--;


   6. 完善 put() 和 take()   


      (1) 确保线程安全     

在多线程调用 put() 和 take() 的情况下,put() 和 take() 的操作并不是原子的,并且都包含了多步写操作,为了保证线程安全,我们需要对两个线程进行上锁:

哪个线程调用 put() 或者 take(),就对哪个线程上锁,确保该线程完整地执行完  put() 或者 take();


    (2) 完善阻塞功能     

  • 阻塞队列只会针对 put() 或者 take() 其中一个方法进行阻塞,如果执行其中一个方法的线程进入阻塞,只能通过其他线程调用另一个方法来唤醒;

  • 一个线程 t 在执行take() 时,因为队列为空,执行wait方法,陷入阻塞状态,只能等执行put() 的线程来唤醒;
  • 如果此时没有执行 put() 线程,但是有别的线程调用 t . interrupt(),从而结束正在 wait() 的 t;

  • 如果出现这种情况,我们当前这个代码,是直接向外抛异常 InterruptedException;但是如果我们是通过 try catch 来捕获,并且解决异常的情况又有不同:

  • 如果我们是通过 throw 抛出 wait() 相应的异常 ,那么 wait() 如果被 interrupt() 打断等待状态,就会抛出异常;
  • 如果是通过 try catch 来处理 wait() 相应的异常,这样的操作会捕获并且处理 wait() 被 interrupt() 打断而抛出的异常;
  • 我们的预期结果,就是让 wait() 被打断而不是被唤醒的时候抛出异常,但是 try catch 把异常给捕获了,因此 wait() 在被打断就不会抛出异常了并且不管队列是否为空或者满,直接打破阻塞状态而执行下面的逻辑;

标准库建议:

  • 如果要通过特定的条件,来使当前线程进入等待状态,那么建议这个特点的条件的判断操作,应该放在 while 循环中,而不是放在 if 中;
  • if 只会判断一次,在判断一次成功执行 wait() 后,可能 wait() 被打破不是因为不满足 if 中的条件,而是被别的线程调用 interrupt() 类似的方法,而非法终止 wait() ;
  • 通过在 wait() 外面嵌套一层 while ,而不是嵌套 if,可以避免 wait() 非法唤醒;


     7. 模拟生产消费模型     


生产速度等于消费速度,producer 和 consumer  都不会被阻塞;

生产速度小于消费速度,阻塞 consummer 

生产速度大于消费速度,阻塞 producer


     8. 模拟实现阻塞队列完整代码    

package Thread;class MyBlockingQueue {private String[] data = null;private int front = 0; //队首private int tail = 0; //队尾private int size = 0; //元素个数public MyBlockingQueue(int capacity) {data = new String[capacity];}public void put(String elem) throws InterruptedException {synchronized (this) {while (size == data.length) {//队列满了,需要阻塞this.wait();}data[tail] = elem;tail++;if (tail >= data.length) tail = 0;size++;this.notify();}}public String take() throws InterruptedException {synchronized (this) {while (size == 0) {//队列为空,需要阻塞this.wait();}String ret = data[front];front++;if (front >= data.length) front = 0;size--;this.notify();return ret;}}
}public class Demo26 {public static void main(String[] args) {MyBlockingQueue queue = new MyBlockingQueue(1000);Thread producer = new Thread(() -> {int n = 0;while (true){try {queue.put(n + "");System.out.println("生产元素" + n);n++;} catch (InterruptedException e) {throw new RuntimeException(e);}}},"producer");Thread consumer = new Thread(() -> {String ret = null;while (true){try {ret = queue.take();System.out.println("消费元素" + ret);Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}}},"consumer");producer.start();consumer.start();}
}

   4. 常见的阻塞队列   



   1. ArrayBlockingQueue   


  • 一个有界队列,底层基于数组实现
  • 需要在初始化时指定队列的大小,队列满时,生产者会被阻塞,队列空时,消费者会被阻塞。

   2. LinkedBlockingQueue   


  • 基于链表的阻塞队列,允许可选的界限(有界或无界)

  • 无界模式下可以不断添加元素,直到耗尽系统资源。
  • 有界模式则类似于ArrayBlockingQueue,但吞吐量通常较高。

   3. PriorityBlockingQueue   


  • 一个无界的优先级队列,元素按照自然顺序,或者指定的比较器顺序进行排序。
  • 与其他阻塞队列不同的是,PriorityBlockingQueue不保证元素的FIFO 顺序。

   4. DelayQueue    


  • 一个无界队列,队列中的元素必须实现 Delayed 接口,只有当元素的延迟时间到期时,才能被取出。常用于延迟任务调度。

   5. SynchronousQueue   


  • 一个没有内部容量的队列,每个插入操作必须等待对应的移除操作,反之亦然。常用于在线程之间的直接传递任务,而不是存储任务

   6. LinkedTransferQueue   


  • LinkedTransferQueue,相对于其他阻塞队列,从名字来看它有 Transfer 功能,其实也不是什么神奇功能;
  • 一般阻塞队列都是将元素入队,然后消费者从队列中获取元素。LinkedTransferQueue 的transfer 是元素入队的时候看看是否已经有消费者在等了,如果有在等了直接给消费者即可,所以就是这里少了一层,没有锁操作。

   5. 不同阻塞队列的使用场景   


     ArrayBlockingQueue 和 LinkedBlockingQueue 使用场景   


  • ArrayBlockingQueue 和 LinkedBlockingQueue 常用于典型的生产者-消费者场景。
  • 例如:任务处理系统中,生产者生成任务,消费者从队列中取出任务并执行。

    ArrayList 和 LinkedList 的区别    


   1. 底层数据结构不同   


  • ArrayList:基于动态数组实现,元素在内存中连续存储。
  • LinkedList:基于双向链表实现,元素通过节点链接,内存中不需要连续存储。

   2. 性能区别    


   (1)  ArrayList: 

  • 随机访问速度快,查找元素的时间复杂度为0(1)。
  • 插入和删除操作慢,尤其是在中间插入或删除时,时间复杂度为0(n),因为需要移动后续元素。

   (2) LinkedList: 

  • 随机访问速度慢,查找元素的时间复杂度为0(n)。
  • 插入和删除操作快,尤其是在头尾插入或删除时,时间复杂度为0(1)。

     PriorityBlockingQueue 使用场景    


PriorityBlockingQueue 更适合处理带有优先级的任务场景;

例如:任务调度系统。


   DelayQueue 使用场景    


  • DelayQueue 适用于需要延迟处理的任务;
  • 例如:缓存失效策略、定时任务调度等。

   SynchronousQueue 使用场景    


  • SynchronousQueue 适合在线程间直接传递数据,而不希望数据被存储在队列中。
  • 例如:ThreadPoolExecutor 的直接交接模式中使用 SynchronousQueue 来传递任务。

   ArrayBlockingQueue 和 LinkedBlockingQueue 区别   


 ArrayBlockingQueue 和 LinkedBlockingQueue,分别是基于 数组链表 的有界阻塞队列。

两者原理都是基于 ReentrantLock Condition

ArrayBlockingQueue 基于数组,内部实现只用了一把锁,可以指定公平或者非公平锁

LinkedBlockingQueue 基于链表,内部实现用了两把锁,take 一把、put一把,所以入队和出队这两个操作是可以并行的,从这里看并发度应该比 ArrayBlockingQueue 高。


   c96f743646e841f8bb30b2d242197f2f.gif

692a78aa0ec843629a817408c97a8b84.gif

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/471438.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

重构开发之道,Blackbox.AI为技术注入智能新动力

本文目录 一、引言二、Blackbox.AI实战体验2.1 基于网页界面生成前端代码进行应用开发2.2 与AI助手实现实时智能对话2.3 重塑大型文件交互方式2.4 链接Github仓库进行对话编程 三、总结 一、引言 在生产力工具加速进化的浪潮中,Blackbox.AI开始崭露头角&#xff0c…

idea 弹窗 delete remote branch origin/develop-deploy

想删除远程分支,就选delete,仅想删除本地分支,选cancel; 在 IntelliJ IDEA 中遇到弹窗提示删除远程分支 origin/develop-deploy,这通常是在 Git 操作过程中出现的情况,可能是在执行如 git branch -d 或其他…

第四十五章 Vue之Vuex模块化创建(module)

目录 一、引言 二、模块化拆分创建方式 三、模块化拆分完整代码 3.1. index.js 3.2. module1.js 3.3. module2.js 3.4. module3.js 3.5. main.js 3.6. App.vue 3.7. Son1.vue 3.8. Son2.vue 四、访问模块module的state ​五、访问模块中的getters ​六、mutati…

【OpenEuler】配置虚拟ip

OpenEuler系统手动配置虚ip 介绍操作方法临时生效永久生效 验证 介绍 我们知道通过keepalived服务可以为linux服务器设置虚拟ip,但是有些特殊场景下若无法安装部署keepalived服务,则需要通过手动设置的方式,配置服务器的虚拟ip。 本方案提供…

CCI3.0-HQ:用于预训练大型语言模型的高质量大规模中文数据集

摘要 我们介绍了 CCI3.0-HQ,它是中文语料库互联网 3.0(CCI3.0)的一个高质量500GB子集,采用新颖的两阶段混合过滤管道开发,显著提高了数据质量。为了评估其有效性,我们在不同数据集的100B tokens上从头开始…

fastadmin多个表crud连表操作步骤

1、crud命令 php think crud -t xq_user_credential -u 1 -c credential -i voucher_type,nickname,user_id,voucher_url,status,time --forcetrue2、修改控制器controller文件 <?phpnamespace app\admin\controller;use app\common\controller\Backend;/*** 凭证信息…

安装SQL server中python和R

这两个都是编程语言 R 是一种专门为统计计算和数据分析而设计的语言&#xff0c;它具有丰富的统计函数和绘图工具&#xff0c;常用于学术研究、数据分析和统计建模等领域。 Python 是一种通用型编程语言&#xff0c;具有简单易学、语法简洁、功能强大等特点。它在数据科学、机…

项目技术栈-解决方案-web3去中心化

web3去中心化 Web3 DApp区块链:钱包:智能合约:UI:ETH系开发技能树DeFi应用 去中心化金融P2P 去中心化网络参考Web3 DApp 区块链: 以以太坊(Ethereum)为主流,也包括Solana、Aptos等其他非EVM链。 区块链本身是软件,需要运行在一系列节点上,这些节点组成P2P网络或者半…

【linux】centos7 换阿里云源

查看yum配置文件 yum的配置文件通常位于/etc/yum.repos.d/目录下。你可以使用以下命令查看这些文件&#xff1a; ls /etc/yum.repos.d/ # 或者 ll /etc/yum.repos.d/备份当前的yum配置文件 建议备份当前的yum配置文件&#xff1a; sudo cp /etc/yum.repos.d/CentOS-Base.re…

Python 中.title()函数和.lower()函数

一.title()函数 1.title()函数的功能 将字符串中的每一单词的首字母大写 2.举例 S1"i love you" S2S1.title() print(S2)3.输出 二.lower()函数 1.lower()函数的功能 将字符串中的每一大写字母都变成的小写字母 2.举例 S1"I LOVE YOU" S2S1.lower()…

[DEBUG] 服务器 CORS 已经允许所有源,仍然有 304 的跨域问题

背景 今天有一台服务器到期了&#xff0c;准备把后端迁移到另一台服务器上&#xff0c;结果前端在测试的时候&#xff0c;出现了 304 的跨域问题。 调试过程中出现的问题&#xff0c;包括但不限于&#xff1a; set the request’s mode to ‘no-cors’Redirect is not allow…

【AI构思渲染】网络直播——建筑绘图大模型生成渲染图

家人们&#xff01;&#xff01;好消息来了&#xff01;&#xff01; 2024年11月19日&#xff0c;上午10:00-11:00 构力学堂将会给大家带来一场直播课《AI构思渲染第一课&#xff0c;建筑绘图大模型生成渲染图》 课程亮点&#xff1a; 1、AI插件相关介绍 2、AI构思渲染安装…

初级数据结构——栈

目录 前言一、栈的基本概念二、栈的实现方式三、栈的性能分析四、栈的应用场景五、栈的变体六、出栈入栈的动态图解七、代码模版八、总结结语 前言 数据结构栈&#xff08;Stack&#xff09;是一种线性的数据结构&#xff0c;它只允许在序列的一端&#xff08;称为栈顶&#x…

ESLint 使用教程(五):ESLint 和 Prettier 的结合使用与冲突解决

前言 在现代前端开发中&#xff0c;代码质量与代码风格的统一是两个非常重要的方面。良好的代码质量能减少 bug 的产生&#xff0c;而统一的代码风格则能提高团队协作的效率。为了实现这两个目标&#xff0c;我们通常会使用一些工具。 为了保证代码的可读性和维护性&#xff0…

简易入手《SOM神经网络》的本质与原理

原创文章&#xff0c;转载请说明来自《老饼讲解神经网络》:www.bbbdata.com 关于《老饼讲解神经网络》&#xff1a; 本网结构化讲解神经网络的知识&#xff0c;原理和代码。 重现matlab神经网络工具箱的算法&#xff0c;是学习神经网络的好助手。 目录 一、入门原理解说 01.…

数字IC后端实现之Innovus specifyCellEdgeSpacing和ICC2 set_placement_spacing_rule的应用

昨天帮助社区IC训练营学员远程协助解决一个Calibre DRC案例。通过这个DRC Violation向大家分享下Innovus和ICC2中如何批量约束cell的spacing rule。 数字IC后端手把手实战教程 | Innovus verify_drc VIA1 DRC Violation解析及脚本自动化修复方案 下图所示为T12nm A55项目的Ca…

深度学习神经网络在机器人领域应用的深度剖析:原理、实践与前沿探索

深度学习神经网络在机器人领域的应用非常广泛&#xff0c;以下是详细介绍&#xff1a; 主要应用方向 环境感知与识别&#xff1a; 物体识别与分类&#xff1a;机器人利用深度学习神经网络能够准确识别周围环境中的各种物体&#xff0c;比如区分不同形状、颜色、材质的物品&…

自动化工具 Gulp

自动化工具 gulp 摘要 概念&#xff1a;gulp用于自动化开发流程。 理解&#xff1a;我们只需要编写任务&#xff0c;然后gulp帮我们执行 核心概念&#xff1a; 任务&#xff1a;通过定义不同的任务来组织你的构建流程。 管道&#xff1a;通过管道方式将文件从一个插件传递…

基于Spring Boot与Redis的令牌主动失效机制实现

目录 前言1. 项目结构和依赖配置1.1 项目依赖配置1.2 Redis连接配置 2. 令牌主动失效机制的实现流程2.1 登录成功后将令牌存储到Redis中2.2 使用拦截器验证令牌2.3 用户修改密码后删除旧令牌 3. Redis的配置与测试4. 可能的扩展与优化结语 前言 在现代Web系统中&#xff0c;用…

llama factory lora 微调 qwen2.5 7B Instruct模型

项目背景 甲方提供一台三卡4080显卡 需要进行qwen2.5 7b Instruct模型进行微调。以下为整体设计。 要使用 LLaMA-Factory 对 Qwen2.5 7B Instruct模型 进行 LoRA&#xff08;Low-Rank Adapters&#xff09;微调&#xff0c;流程与之前提到的 Qwen2 7B Instruct 模型类似。LoRA …