目录
1.生产者消费者模型
2.使用标准库中的阻塞队列
3.模拟实现阻塞队列
在介绍阻塞队列之前,会先介绍一些前置知识,像队列:有普通队列、优先级队列、阻塞队列、和消息队列。前面两个是线程不安全的,而后面两个是线程安全的。本文重点介绍阻塞队列。
1.生产者消费者模型
1.1队列功能介绍
(1)阻塞队列
1)当队列为空时,尝试出队列;此时,队列会阻塞等待,直到队列不为空才继续执行出队列操作。
2)当队列为满时,尝试入队列;此时,队列就会阻塞等待,直到队列不为满为止才能继续执行入队操作。
(2)消息队列
并非遵循常规的先进先出,而是带有一个topic关键字。当出队时指定某个topic,就会先出topic下的元素(topic内部就遵循先进先出)
举例:例如到医院窗口排队,有很多种类型的窗口,如:妇科、儿科、骨科等等(这些称为topic),不是说,你先来了就一定可以就诊,而是等待你所在的topic是否呼唤你。
像上面的阻塞队列和消息队列,起到的作用就是可以实现:生产者消费者模型。
1.2.生产者消费者模型介绍
(1)什么是生产者消费者模型
1)A线程进行入队操作,B线程进行出队操作。当队列为空时,B线程需要等待A线程入队,才能从队列中取出元素;当队列满时,A线程需要等待B线程取出元素后,才能继续入队。这里的A线程就相当于生产者,B线程相当于消费者。
2)有一个自动售卖机(相当于阻塞队列/消息队列),商人负责填货(生产者),用户负责买东西(消费者)。
(2)模型的作用
1)可以让程序机械能解耦合操作
2)可以程序“削峰填谷”
解耦合作用举例:
1.如果A和B是互相调用的关系,那么如果A中需要修改,那么B中的大部分都需要同步修改,否则无法互相调用。
2.当A和B中间加了一个队列,那么A与B的交互只需要通过操作队列即可。即使其中一个出现了问题,也不会影响到另外一个。
削峰填谷举例:
1.当服务器直接和客户端交互时,当请求过多时,就会直接导致服务器崩溃。
2.如果在客户端和服务器中间加上一个队列,让他们通过队列进行交互。即使请求再多,也不会影响到服务器,最坏的情况也就是队列崩溃。
所以说,阻塞队列/消息队列,就是可以实现生产者消费者模型的效果。
2.使用标准库中的阻塞队列
现在,我们介绍如何调用标准库中的阻塞队列。
2.1.创建阻塞队列
(1)选择正确的接口
(2)实例化的对象
可以选择的有下面这三个,很明显,它们之间只是基于不同的数据结构进行实现。
这三个,我们都是可以选择的。
(3)队列的操作
因为是队列,我们只需要考虑入队和出队操作即可。
在阻塞队列中,只有这两个是带有阻塞功能的,所以我们只需要使用这两个即可。
因为这两个操作,是带有阻塞功能的,也就是wait,所以使用时需要声明异常。
(4)普通的操作
这里普通的操作指的是在一个线程中进行操作。
程序运行起来,发现没有任何的报错,只是程序仍然不会结束,这就是阻塞功能。
2.2.使用阻塞队列
(1)消费者消费的很慢
当消费者消费慢时,也就是让消费者每次sleep,此时,就会产生生产者在等消费者的过程
public static void main(String[] args) throws InterruptedException {BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);Thread t1 = new Thread(()->{//负责入队操作for (int i = 0; i < 5000; i++) {try {queue.put(i);System.out.println("入队:"+i);} catch (InterruptedException e) {throw new RuntimeException(e);}}});Thread t2 = new Thread(()->{//负责出队操作for (int i = 0; i < 5000; i++) {try {int tmp = queue.take();System.out.println("出队:"+tmp);Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}}});t1.start();t2.start();}
(2)生成者生成的很慢
以上就是对阻塞队列的使用,下面我们自己实现一个阻塞队列
3.模拟实现阻塞队列
要模拟阻塞队列,就需要有入队/出队操作,并且可以进行阻塞等待,并且是线程安全的!
我们先从普通队列开始,然后加上线程安全和阻塞操作,最后进行优化和线程安全Pro版
3.1.实现普通的队列
下面按照循环队列的形式进行创建队列,只提供了入队和出队操作
class MyBlockQueue {int head = 0;int tail = 0;int size = 0;public String[] elem;public MyBlockQueue(int capacity) {elem = new String[capacity];}//入队public void put(String s) throws InterruptedException { if(size == elem.length) {return;}elem[tail] = s;//队尾入size++;if(tail >= elem.length-1) tail=0;else tail++;}}//出队public String take() throws InterruptedException {if(size == 0) {return null;}String tmp = elem[head];size--;if(head == elem.length-1) head = 0;else head++;return tmp;}
}
3.2.加上阻塞功能
这里我们使用的是wait而不是sleep。
class MyBlockQueue {int head = 0;int tail = 0;int size = 0;public String[] elem;public MyBlockQueue(int capacity) {elem = new String[capacity];}//入队public void put(String s) throws InterruptedException {synchronized (this) {if(size == elem.length) {System.out.println("队列满,阻塞等待");this.wait();}elem[tail] = s;//队尾入size++;if(tail >= elem.length-1) tail=0;else tail++;this.notify();//入队一个,唤醒一次}}//出队public String take() throws InterruptedException {synchronized (this) {if(size == 0) {System.out.println("队列空");this.wait();}String tmp = elem[head];size--;if(head == elem.length-1) head = 0;else head++;this.notify();//出队一个,唤醒一次return tmp;}}
}
(1)改进1:对入队、出队操作,都进行了加锁操作
(2)改进2:在队列满/空时,不进行return,而是进行阻塞等待;当有一个元素入队/出队之后,就进行唤醒一次。
上述不使用sleep的原因是:sleep是抱着锁使线程进入休眠状态,当此时有其他的操作(入队/出队)时,无法拿到锁,从而无法执行(发生了死锁)
上述还有一个缺点:就是wait不仅仅可以被notify唤醒,还可以被interrupt唤醒,所以要循环进行判断。
class MyBlockQueue {int head = 0;int tail = 0;int size = 0;public String[] elem;public MyBlockQueue(int capacity) {elem = new String[capacity];}//入队public void put(String s) throws InterruptedException {synchronized (this) {while (size >=elem.length) {//循环等待确认,防止不是被notify唤醒try {this.wait();}catch (InterruptedException e) {e.printStackTrace();}}elem[tail] = s;//队尾入size++;if(tail >= elem.length-1) tail=0;else tail++;this.notify();}}//出队public String take() throws InterruptedException {synchronized (this) {while (size ==0) {try {this.wait();}catch (InterruptedException e) {e.printStackTrace();}}String tmp = elem[head];size--;if(head == elem.length-1) head = 0;else head++;this.notify();return tmp;}}
}
(3)改进3:使用while+wait的方式反复确认是否要被唤醒
3.3.确保线程一定安全
上述线程其实已经很安全了,但是还需要再进一步优化,达到更安全的效果。对于线程安全还有两个:内存可见性问题和指令重排序,所以我们只需要对变量加上volatile关键字即可。
volatile int head = 0;
volatile int tail = 0;
volatile int size = 0;
上述就是一个完整的阻塞队列模拟实现的代码,下面展示完整代码:
class MyBlockQueue {/* int head = 0;int tail = 0;int size = 0;*/volatile int head = 0;volatile int tail = 0;volatile int size = 0;public String[] elem;public MyBlockQueue(int capacity) {elem = new String[capacity];}//入队public void put(String s) throws InterruptedException {synchronized (this) {/* if(size == elem.length) {System.out.println("队列满,阻塞等待");this.wait();}*/while (size >=elem.length) {//循环等待确认,防止不是被notify唤醒try {this.wait();}catch (InterruptedException e) {e.printStackTrace();}}elem[tail] = s;//队尾入size++;if(tail >= elem.length-1) tail=0;else tail++;this.notify();}}//出队public String take() throws InterruptedException {synchronized (this) {/* if(size == 0) {System.out.println("队列空");this.wait();}*/while (size ==0) {try {this.wait();}catch (InterruptedException e) {e.printStackTrace();}}String tmp = elem[head];size--;if(head == elem.length-1) head = 0;else head++;this.notify();return tmp;}}
}
测试代码:
public static void main(String[] args) throws InterruptedException {MyBlockQueue myBlockQueue = new MyBlockQueue(10);Thread t1 = new Thread(()->{for (int i = 0; i < 20; i++) {try {System.out.println("生成1:");myBlockQueue.put("1");Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}}});Thread t2 = new Thread(()->{for (int i = 0; i < 30; i++) {String tmp = null;try {tmp = myBlockQueue.take();} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("消费:"+tmp);try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}}});t1.start();t2.start();}
测试结果:
结果是可以的,和标准库中的阻塞队列基本一致。
几个注意事项: