Java并发编程(七)实践[生产者-消费者]

生产者-消费者 

概述

  • 生产者消费者问题,也称有限缓冲问题,是一个多线程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个线程
  • 在多线程开发中,如果生产者(生产数据的线程)处理速度很快,而消费者(消费数据的线程)处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这种生产消费能力不均衡的问题,便有了生产者和消费者模式

具体实现

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通信。生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取。所以在并发场景下,多线程对临界区资源(即共享资源)的操作时候必须保证在读写中只能存在一个线程,所以需要设计锁的策略

synchronized+wait/notify实现

package com.bierce.multiThread;
import java.time.Instant;
import java.util.LinkedList;
import java.util.Random;
public class TestProducerConsumer1 {public static void main(String[] args) throws InterruptedException {//创建一个自定义的Sy阻塞队列,其中存储整型的个数为10MySnchronizedBlockingQueueS mySnchronizedBlockingQueueS = new MySnchronizedBlockingQueueS(10);int resourceCount = mySnchronizedBlockingQueueS.size(); //阻塞队列资源个数//创建生产者线程Runnable producer=()->{while (resourceCount < 1){try {int random = new Random().nextInt(100); //生成一个0-100的两位整数mySnchronizedBlockingQueueS.put(random);System.out.println("北京时间:" + Instant.now() + ":" + Thread.currentThread().getName() + ":" + "生产了一个整型数据==>"+random + ",当前资源池有"+mySnchronizedBlockingQueueS.size()+"个资源");} catch (Exception e) {e.printStackTrace();}}};for (int i = 1; i <= 5; i++) { //创建5个生产线程 线程名称从0-4new Thread(producer).start();}//创建消费者线程Runnable consumer=()->{while (true){try {System.out.println("北京时间:" + Instant.now() + ":" + Thread.currentThread().getName() + ":" + "消费了一个整型数据==>"+mySnchronizedBlockingQueueS.take() + ",当前资源池有"+mySnchronizedBlockingQueueS.size()+"个资源");} catch (Exception e) {e.printStackTrace();}}};for (int i = 1; i <= 5; i++) { //创建5个消费线程 线程名称从5-9new Thread(consumer).start();}}
}
/*****  自定义阻塞队列: 通过Synchronized + wait/notifyAll实现* @author Bierce* @date 2023/08/15*/
class MySnchronizedBlockingQueueS {private final int maxSize; //容器允许存放的最大数量private final LinkedList<Integer> container; //存储数据的容器public MySnchronizedBlockingQueueS(int maxSize ) {this.maxSize = maxSize;this.container = new LinkedList<>();}/***  往队列添加元素,如果队列已满则阻塞线程*/public  synchronized  void put(Integer data){//如果队列已满,则阻塞生产者线程while (container.size()==maxSize){try {wait();} catch (InterruptedException e) {e.printStackTrace();}}//队列未满则添加元素,并通知消费者消费数据container.add(data);notifyAll();}/***  从队列取出数据,如果队列为空则阻塞* @return  队列元素*/public synchronized  Integer take(){//如果队列为空,则消费者停止消费while (container.size()==0){try {wait();} catch (InterruptedException e) {e.printStackTrace();}}//队列不为空则消费数据,并通知生产者继续生产数据int data = container.poll();notifyAll();return data;}public int size(){return container.size();}
}

synchronized无法实现精确通知的效果,而Condition可以达到精确通知哪个线程要被唤醒 

Lock+Condition实现

package com.bierce.multiThread;
import java.time.Instant;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class TestProducerConsumer2 {public static void main(String[] args) throws InterruptedException {//创建一个自定义的阻塞队列,其中存储整型的个数为10MyBlockingQueue<Integer> myBlockingQueue = new MyBlockingQueue<>(10, false);int resourceCount = myBlockingQueue.size(); //阻塞队列资源个数//创建生产者线程Runnable producer=()->{while (resourceCount < 1){try {int random = new Random().nextInt(100); //生成一个0-100的两位整数myBlockingQueue.put(random);System.out.println("北京时间:" + Instant.now() + ":" + Thread.currentThread().getName() + ":" + "生产了一个整型数据==>"+random + ",当前资源池有"+myBlockingQueue.size()+"个资源");} catch (InterruptedException e) {e.printStackTrace();}}};for (int i = 1; i <= 5; i++) { //创建5个生产线程 线程名称从0-4new Thread(producer).start();}//创建消费者线程Runnable consumer=()->{while (true){try {System.out.println("北京时间:" + Instant.now() + ":" + Thread.currentThread().getName() + ":" + "消费了一个整型数据==>"+myBlockingQueue.take() + ",当前资源池有"+myBlockingQueue.size()+"个资源");} catch (InterruptedException e) {e.printStackTrace();}}};for (int i = 1; i <= 5; i++) { //创建5个消费线程 线程名称从5-9new Thread(consumer).start();}}
}
/***自定义阻塞队列: Lock+Condition实现*/
class MyBlockingQueue<E> {private final Queue queue; //队列容器private final int capacity; //队列容量final ReentrantLock lock; //对象锁private final Condition notEmpty; //等待取出数据条件private final Condition notFull; //等待添加数据条件/*** 初始化阻塞队列* @param capacity  队列容量* @param fair  是否公平锁*/public MyBlockingQueue(int capacity, boolean fair) {this.queue = new LinkedList();this.capacity=capacity;this.lock = new ReentrantLock(fair);this.notEmpty = lock.newCondition();this.notFull =  lock.newCondition();}/***   往队列插入元素,如果队列大小到达容量限制则阻塞* @param e 插入元素* @throws InterruptedException 中断异常*/public  void put(E e) throws InterruptedException {final ReentrantLock lock = this.lock;lock.lock(); //上锁try{while (queue.size()==capacity){ //队列已满则阻塞notFull.await();}//队列未满则加入数据并唤醒消费者进行消费queue.add(e);notEmpty.signalAll();} finally {lock.unlock(); //必须释放锁}}/***   从队列取出一个元素,如果队列为空则阻塞* @return 队列元素* @throws InterruptedException 中断异常*/public  E take()throws  InterruptedException{final ReentrantLock lock = this.lock;lock.lock();try{while (queue.size()==0){ //队列为空则阻塞notEmpty.await();}//队列有数据则获取数据并唤醒生产者进行生产E element = (E) queue.remove();notFull.signalAll();return   element;} finally {lock.unlock(); //必须释放锁}}public int size(){return queue.size();}
}
自定义myBlockingQueue控制台输出

阻塞队列BlockingQueue实现

public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == items.length)notFull.await();enqueue(e);} finally {lock.unlock();}
}
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0)notEmpty.await();return dequeue();} finally {lock.unlock();}
}
/*** Inserts element at current put position, advances, and signals.* Call only when holding lock.*/
private void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;final Object[] items = this.items;items[putIndex] = x;if (++putIndex == items.length)putIndex = 0;count++;notEmpty.signal();
}
/*** Extracts element at current take position, advances, and signals.* Call only when holding lock.*/
private E dequeue() {// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;final Object[] items = this.items;@SuppressWarnings("unchecked")E x = (E) items[takeIndex];items[takeIndex] = null;if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();notFull.signal();return x;
}

根据ArrayBlockingQueue的put和take方法源码可知其底层最终使用的仍是Lock+condition机制

package com.bierce.multiThread;
import java.time.Instant;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class TestProducerConsumer3 {public static void main(String[] args) throws InterruptedException {//创建一个阻塞队列,其中存储整型的个数为10BlockingQueue<Integer> queue= new ArrayBlockingQueue<>(10);int resourceCount = queue.size(); //阻塞队列资源个数//System.out.println("资源总数:" + resourceCount);//创建生产者线程Runnable producer=()->{while (resourceCount < 1){try {int random = new Random().nextInt(100); //生成一个0-100的两位整数queue.put(random);System.out.println("北京时间:" + Instant.now() + ":" + Thread.currentThread().getName() + ":" + "生产了一个整型数据==>"+random + ",当前资源池有"+queue.size()+"个资源");} catch (InterruptedException e) {e.printStackTrace();}}};for (int i = 1; i <= 5; i++) { //创建5个生产线程 线程名称从0-4new Thread(producer).start();}//创建消费者线程Runnable consumer=()->{while (true){try {System.out.println("北京时间:" + Instant.now() + ":" + Thread.currentThread().getName() + ":" + "消费了一个整型数据==>"+queue.take() + ",当前资源池有"+queue.size()+"个资源");} catch (InterruptedException e) {e.printStackTrace();}}};for (int i = 1; i <= 5; i++) { //创建5个消费线程 线程名称从5-9new Thread(consumer).start();}}
}
BlockingQueue控制台输出

扩展

  • 通过信号量semaphore实现
  • 通过PipedInputStream/PipedOutputStream实现

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/91417.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

docsify gitee 搭建个人博客

docsify & gitee 搭建个人博客 文章目录 docsify & gitee 搭建个人博客1.npm 安装1.1 在Windows上安装npm&#xff1a;1.2 在macOS上安装npm&#xff1a;1.3 linux 安装npm 2. docsify2.1 安装docsify2.2 自定义配置2.2.1 通过修改index.html&#xff0c;定制化开发页面…

【讯飞星火认知大模型】大模型之星火手机助理

目录 1. 讯飞星火认知大模型介绍 2. API 申请 3. 星火手机助理 4. 效果展示 1. 讯飞星火认知大模型介绍 讯飞星火认知大模型是科大讯飞自研的基于深度学习的自然语言处理模型&#xff0c;它可以理解和生成中文&#xff0c;执行多种任务&#xff0c;如问答、翻译、写作、编…

uniapp的逆地理编码 和地理编码

1.先打开高德地图api找到那个 地理编码 2.封装好我们的请求 3.逆地理编码 和地理编码 都是固定的 记住自己封装的请求 就可以了 这个 是固定的 方式 下面这个是固定的 可以复制过去 getlocation就是uniapp提供的 获取经纬度 然后 下面的 就是高德地图提供的 方法 要想使用我…

arcgis数据采集与拓扑检查

1、已准备好一张配准好的浙江省行政区划图&#xff0c;如下&#xff1a; 2、现在需要绘制湖州市县级行政区划。需要右击文件夹新建文件地理数据库&#xff0c;如下&#xff1a; 其余步骤均默认即可。 创建好县级要素数据集后&#xff0c;再新建要素类&#xff0c;命名为县。 为…

什么是Selenium?使用Selenium进行自动化测试

什么是 Selenium&#xff1f;   Selenium 是一种开源工具&#xff0c;用于在 Web 浏览器上执行自动化测试&#xff08;使用任何 Web 浏览器进行 Web 应用程序测试&#xff09;。   等等&#xff0c;先别激动&#xff0c;让我再次重申一下&#xff0c;Selenium 仅可以测试We…

无涯教程-Perl - sethostent函数

描述 该函数应在首次调用gethostent之前调用。 STAYOPEN参数是可选的,在大多数系统上未使用。 当gethostent()检索主机数据库中下一行的信息时,然后sethostent设置(或重置)枚举到主机条目集的开头。 语法 以下是此函数的简单语法- sethostent STAYOPEN返回值 此函数不返回…

分类预测 | MATLAB实现BO-BiGRU贝叶斯优化双向门控循环单元多输入分类预测

分类预测 | MATLAB实现BO-BiGRU贝叶斯优化双向门控循环单元多输入分类预测 目录 分类预测 | MATLAB实现BO-BiGRU贝叶斯优化双向门控循环单元多输入分类预测预测效果基本介绍模型描述程序设计参考资料 预测效果 基本介绍 1.Matlab实现BO-BiGRU贝叶斯优化双向门控循环单元多特征分…

Nodejs+vue+elementui汽车租赁管理系统_1ma2x

语言 node.js 框架&#xff1a;Express 前端:Vue.js 数据库&#xff1a;mysql 数据库工具&#xff1a;Navicat 开发软件&#xff1a;VScode 前端nodejsvueelementui, 课题主要分为三大模块&#xff1a;即管理员模块、用户模块和普通管理员模块&#xff0c;主要功能包括&#…

小程序制作教程:从零开始搭建企业小程序

在如今的数字化时代&#xff0c;企业介绍小程序成为了企业展示与推广的重要工具。通过企业介绍小程序&#xff0c;企业可以向用户展示自己的品牌形象、产品服务以及企业文化等内容&#xff0c;进而提高用户对企业的认知度和信任度。本文将介绍如何从零开始搭建一个企业介绍小程…

《Java极简设计模式》第03章:工厂方法模式(FactoryMethod)

作者&#xff1a;冰河 星球&#xff1a;http://m6z.cn/6aeFbs 博客&#xff1a;https://binghe.gitcode.host 文章汇总&#xff1a;https://binghe.gitcode.host/md/all/all.html 源码地址&#xff1a;https://github.com/binghe001/java-simple-design-patterns/tree/master/j…

突破网络编程1024限制的方法(修改配置文件)

文章目录 概述修改linux配置相关命令步骤1. 打开终端2. 使用sudo权限编辑文件3. 添加资源限制配置4. 保存和退出5. 重启系统或重新登录 其他方法1. 使用事件驱动的框架2. 使用连接池3. 负载均衡4. 使用线程池和进程池5. 升级操作系统设置6. 使用专业的高性能服务器7. 分布式架构…

PHP最简单自定义自己的框架控制器自动加载运行(四)

1、实现效果调用控制中方法 2、创建控制器indexCrl.php <?php class indexCrl{public function index(){echo 当前index控制器index方法;} } 3、KJ.php字段加载控制器文件 public static function run(){//定义常量self::_set_const();//创建模块目录self::_mk_module();…

不基于比较的排序:基数排序

本篇只是讨论桶排序的具体实现&#xff0c;想了解更多算法内容可以在我的博客里搜&#xff0c;建议大家看看这篇排序算法总结&#xff1a;排序算法总结_鱼跃鹰飞的博客-CSDN博客 桶排序的原理&#xff1a; 代码&#xff1a;sort1是一个比较二逼的实现方式浪费空间&#xff0c;s…

day4 IO模型

IO多路复用 1.select函数 服务器&#xff1a; 客户端 poll函数 客户端&#xff1a;

DEWDROP65 DM蓝牙5.2双模热插拔PCB

键盘使用说明索引&#xff08;均为出厂默认值&#xff09; 软件支持&#xff08;驱动的详细使用帮助&#xff09;一些常见问题解答&#xff08;FAQ&#xff09;首次使用步骤蓝牙配对规则&#xff08;重要&#xff09;蓝牙和USB切换键盘默认层默认触发层0的FN键配置的功能默认功…

探索未来:元宇宙与Web3的无限可能

随着科技的奇迹般发展&#xff0c;互联网已经成为了我们生活的不可分割的一部分。然而&#xff0c;尽管它的便利性和普及性带来了巨大的影响&#xff0c;但我们仍然面临着传统互联网体验的诸多限制。 购物需要不断在实体店与电商平台间切换&#xff0c;教育依然受制于时间与地…

设备数字化平台的优势和应用价值

在现代工业领域&#xff0c;设备的高效管理和维护对于企业的运营和竞争力至关重要。而设备管理系统作为一个强大的工具&#xff0c;可以极大地提升设备管理和维护的效率&#xff0c;从而实现生产效益的最大化。本文将探讨设备数字化平台的优势和应用价值。 设备数字化平台是一款…

蓝牙耳机运动耳机哪个好、好用的运动蓝牙耳机推荐

如今的蓝牙耳机已经成为手机的最佳伴侣&#xff0c;也是运动爱好者的必备装备。然而&#xff0c;在众多蓝牙耳机中做出选择可能会让人感到困惑。其实&#xff0c;在选购运动蓝牙耳机时需要注意的事项还挺多的&#xff0c;比如舒适度、稳定性和音质等多个方面,逐一对照这些要点来…

MySQL数据库基础语法

一&#xff0c;数据库操作 数据库中不区分大小写&#xff01;&#xff01;&#xff01; 1.1 显示数据库 show databases ; 如图&#xff1a; 1.2 创建数据库 create database [ if not exists ]数据库名 ; 如图&#xff1a; 1.3 使用数据库 use 数据库名 &#xff1b; 如图&a…

Vue3 Axios网络请求简单应用

cd 到项目 安装Axios&#xff1a;cnpm install --save axios post传递参数 需要安装querystring 用于转换参数格式&#xff1a;cnpm install --save querystring 运行示例&#xff1a; 后台接口&#xff1a; GetTestData.java package com.csdnts.api;import java.io.IOExce…