生产者-消费者
概述
- 生产者消费者问题,也称有限缓冲问题,是一个多线程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个线程
- 在多线程开发中,如果生产者(生产数据的线程)处理速度很快,而消费者(消费数据的线程)处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这种生产消费能力不均衡的问题,便有了生产者和消费者模式
具体实现
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通信。生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取。所以在并发场景下,多线程对临界区资源(即共享资源)的操作时候必须保证在读写中只能存在一个线程,所以需要设计锁的策略
synchronized+wait/notify实现
package com.bierce.multiThread;
import java.time.Instant;
import java.util.LinkedList;
import java.util.Random;
public class TestProducerConsumer1 {public static void main(String[] args) throws InterruptedException {//创建一个自定义的Sy阻塞队列,其中存储整型的个数为10MySnchronizedBlockingQueueS mySnchronizedBlockingQueueS = new MySnchronizedBlockingQueueS(10);int resourceCount = mySnchronizedBlockingQueueS.size(); //阻塞队列资源个数//创建生产者线程Runnable producer=()->{while (resourceCount < 1){try {int random = new Random().nextInt(100); //生成一个0-100的两位整数mySnchronizedBlockingQueueS.put(random);System.out.println("北京时间:" + Instant.now() + ":" + Thread.currentThread().getName() + ":" + "生产了一个整型数据==>"+random + ",当前资源池有"+mySnchronizedBlockingQueueS.size()+"个资源");} catch (Exception e) {e.printStackTrace();}}};for (int i = 1; i <= 5; i++) { //创建5个生产线程 线程名称从0-4new Thread(producer).start();}//创建消费者线程Runnable consumer=()->{while (true){try {System.out.println("北京时间:" + Instant.now() + ":" + Thread.currentThread().getName() + ":" + "消费了一个整型数据==>"+mySnchronizedBlockingQueueS.take() + ",当前资源池有"+mySnchronizedBlockingQueueS.size()+"个资源");} catch (Exception e) {e.printStackTrace();}}};for (int i = 1; i <= 5; i++) { //创建5个消费线程 线程名称从5-9new Thread(consumer).start();}}
}
/***** 自定义阻塞队列: 通过Synchronized + wait/notifyAll实现* @author Bierce* @date 2023/08/15*/
class MySnchronizedBlockingQueueS {private final int maxSize; //容器允许存放的最大数量private final LinkedList<Integer> container; //存储数据的容器public MySnchronizedBlockingQueueS(int maxSize ) {this.maxSize = maxSize;this.container = new LinkedList<>();}/*** 往队列添加元素,如果队列已满则阻塞线程*/public synchronized void put(Integer data){//如果队列已满,则阻塞生产者线程while (container.size()==maxSize){try {wait();} catch (InterruptedException e) {e.printStackTrace();}}//队列未满则添加元素,并通知消费者消费数据container.add(data);notifyAll();}/*** 从队列取出数据,如果队列为空则阻塞* @return 队列元素*/public synchronized Integer take(){//如果队列为空,则消费者停止消费while (container.size()==0){try {wait();} catch (InterruptedException e) {e.printStackTrace();}}//队列不为空则消费数据,并通知生产者继续生产数据int data = container.poll();notifyAll();return data;}public int size(){return container.size();}
}
synchronized无法实现精确通知的效果,而Condition可以达到精确通知哪个线程要被唤醒
Lock+Condition实现
package com.bierce.multiThread;
import java.time.Instant;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class TestProducerConsumer2 {public static void main(String[] args) throws InterruptedException {//创建一个自定义的阻塞队列,其中存储整型的个数为10MyBlockingQueue<Integer> myBlockingQueue = new MyBlockingQueue<>(10, false);int resourceCount = myBlockingQueue.size(); //阻塞队列资源个数//创建生产者线程Runnable producer=()->{while (resourceCount < 1){try {int random = new Random().nextInt(100); //生成一个0-100的两位整数myBlockingQueue.put(random);System.out.println("北京时间:" + Instant.now() + ":" + Thread.currentThread().getName() + ":" + "生产了一个整型数据==>"+random + ",当前资源池有"+myBlockingQueue.size()+"个资源");} catch (InterruptedException e) {e.printStackTrace();}}};for (int i = 1; i <= 5; i++) { //创建5个生产线程 线程名称从0-4new Thread(producer).start();}//创建消费者线程Runnable consumer=()->{while (true){try {System.out.println("北京时间:" + Instant.now() + ":" + Thread.currentThread().getName() + ":" + "消费了一个整型数据==>"+myBlockingQueue.take() + ",当前资源池有"+myBlockingQueue.size()+"个资源");} catch (InterruptedException e) {e.printStackTrace();}}};for (int i = 1; i <= 5; i++) { //创建5个消费线程 线程名称从5-9new Thread(consumer).start();}}
}
/***自定义阻塞队列: Lock+Condition实现*/
class MyBlockingQueue<E> {private final Queue queue; //队列容器private final int capacity; //队列容量final ReentrantLock lock; //对象锁private final Condition notEmpty; //等待取出数据条件private final Condition notFull; //等待添加数据条件/*** 初始化阻塞队列* @param capacity 队列容量* @param fair 是否公平锁*/public MyBlockingQueue(int capacity, boolean fair) {this.queue = new LinkedList();this.capacity=capacity;this.lock = new ReentrantLock(fair);this.notEmpty = lock.newCondition();this.notFull = lock.newCondition();}/*** 往队列插入元素,如果队列大小到达容量限制则阻塞* @param e 插入元素* @throws InterruptedException 中断异常*/public void put(E e) throws InterruptedException {final ReentrantLock lock = this.lock;lock.lock(); //上锁try{while (queue.size()==capacity){ //队列已满则阻塞notFull.await();}//队列未满则加入数据并唤醒消费者进行消费queue.add(e);notEmpty.signalAll();} finally {lock.unlock(); //必须释放锁}}/*** 从队列取出一个元素,如果队列为空则阻塞* @return 队列元素* @throws InterruptedException 中断异常*/public E take()throws InterruptedException{final ReentrantLock lock = this.lock;lock.lock();try{while (queue.size()==0){ //队列为空则阻塞notEmpty.await();}//队列有数据则获取数据并唤醒生产者进行生产E element = (E) queue.remove();notFull.signalAll();return element;} finally {lock.unlock(); //必须释放锁}}public int size(){return queue.size();}
}
阻塞队列BlockingQueue实现
public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == items.length)notFull.await();enqueue(e);} finally {lock.unlock();}
}
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0)notEmpty.await();return dequeue();} finally {lock.unlock();}
}
/*** Inserts element at current put position, advances, and signals.* Call only when holding lock.*/
private void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;final Object[] items = this.items;items[putIndex] = x;if (++putIndex == items.length)putIndex = 0;count++;notEmpty.signal();
}
/*** Extracts element at current take position, advances, and signals.* Call only when holding lock.*/
private E dequeue() {// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;final Object[] items = this.items;@SuppressWarnings("unchecked")E x = (E) items[takeIndex];items[takeIndex] = null;if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();notFull.signal();return x;
}
根据ArrayBlockingQueue的put和take方法源码可知其底层最终使用的仍是Lock+condition机制
package com.bierce.multiThread;
import java.time.Instant;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class TestProducerConsumer3 {public static void main(String[] args) throws InterruptedException {//创建一个阻塞队列,其中存储整型的个数为10BlockingQueue<Integer> queue= new ArrayBlockingQueue<>(10);int resourceCount = queue.size(); //阻塞队列资源个数//System.out.println("资源总数:" + resourceCount);//创建生产者线程Runnable producer=()->{while (resourceCount < 1){try {int random = new Random().nextInt(100); //生成一个0-100的两位整数queue.put(random);System.out.println("北京时间:" + Instant.now() + ":" + Thread.currentThread().getName() + ":" + "生产了一个整型数据==>"+random + ",当前资源池有"+queue.size()+"个资源");} catch (InterruptedException e) {e.printStackTrace();}}};for (int i = 1; i <= 5; i++) { //创建5个生产线程 线程名称从0-4new Thread(producer).start();}//创建消费者线程Runnable consumer=()->{while (true){try {System.out.println("北京时间:" + Instant.now() + ":" + Thread.currentThread().getName() + ":" + "消费了一个整型数据==>"+queue.take() + ",当前资源池有"+queue.size()+"个资源");} catch (InterruptedException e) {e.printStackTrace();}}};for (int i = 1; i <= 5; i++) { //创建5个消费线程 线程名称从5-9new Thread(consumer).start();}}
}
扩展
- 通过信号量semaphore实现
- 通过PipedInputStream/PipedOutputStream实现