阻塞队列
阻塞队列,字面意思就是带有阻塞功能,也就是这个线程不走了,不再参与cpu的调度,等到合适的时机条件成功时候再继续参与cpu的调度
主要体现在以下两方面
1.当队列满的时候,继续入队列,就会出现阻塞,阻塞到其他线程从队列中取走元素为止。
2.当队列空的时候,继续出队列,就会出现阻塞,阻塞到其他线程往队列中添加元素为止。
阻塞队列用处非常大,在后端开发中,有举足轻重的意义
生产者消费者模型
基于阻塞队列可以实现生产者消费者模型
先举个例子
三个人包饺子,在只有一个擀面杖的情况下
1.每个人,自己擀饺子皮,擀完了之后放下擀面杖再自己包饺子,这种做法,在擀和包之间来回切换,效率比较抵消,增加任务切换开销,假设只有一个擀面杖,三个人都想用,这个无疑又增加了锁竞争
2.一个人负责擀,另外两个负责包
擀的人就是生产者,包饺子的就是消费者,放饺子的那个喷子,就是交易场所
生产者消费者的优势
1.减少任务开销,减少锁竞争
2.解耦合耦合
降低模块之间的耦合,也就是降低模块之间的关联性
举个例子
当然也有弊端
之前是A直接发给B,是一次通信
现在是A发给队列,然后队列发给B,是两次通信
而且消息在队列里不知道有没有阻塞的情况
因此效率降低了
3.削峰填谷
服务器收到的来自客户端/用户的请求,不是一成不变的,可能会因为一些突发事件,引起请求数目暴增
(比如之前鹿晗公布恋情的时候,微博就崩了,短时间内发生了大量的转发)
一台服务器,同一时刻能处理的请求数量是有上限的,不同的服务器承担的上限是不一样的。
机器的硬件资源有限(cpu,内存,硬盘,网络带宽),服务器每处理一次请求,都需要消耗一定的硬件资源,不同服务器配置不同,每个请求消耗的资源也不同。
一个分布式系统中,就经常会出现,有的机器承担压力更大,有的就更小
生产者消费者模型就可以很好的解决
虽然阻塞队列只是一个数据结构,但是正因为生产者消费者模型这么重要,于是大佬们就会把这个数据结构单独实现成一个服务器程序,并且使用单独的主机/主机集群来部署,此时这个所谓的阻塞队列,就进化成了“消息队列”
Java标准库已经提供了现成的阻塞队列来实现了,也就是BlockingQueue
public static void main(String[] args) throws InterruptedException {BlockingDeque<String> queue=new LinkedBlockingDeque<>();//如果括号放个数字,就代表当前阻塞队列最大的容量queue.put("hello");String tmp=queue.take();System.out.println(tmp);String tmp2=queue.take();//第二次take,由于阻塞队列里面没有元素了,因此在这里会发生阻塞情况,也就是程序没结束System.out.println(tmp2);}
下面根据阻塞队列,来实现一个生产者消费者模型
一个线程生产,一个线程消费
public static void main(String[] args){// 搞一个阻塞队列, 作为交易场所BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();// 负责生产元素Thread t1 = new Thread(() ->{int count = 0;while (true){try{queue.put(count);System.out.println("生产元素: " + count);count++;Thread.sleep(1000);}catch (InterruptedException e){e.printStackTrace();}}});// 负责消费元素Thread t2 = new Thread(() ->{while (true){try{Integer n = queue.take();System.out.println("消费元素: " + n);}catch (InterruptedException e){e.printStackTrace();}}});t1.start();t2.start();}
阻塞队列底层代码实现(自己实现一个阻塞队列)
class MyBlockingQueue
{//head,tail,size在下面的读操作中可能因为内存可见性问题读取不到正确的结果,因此加上volatile让编译器知道这三个数字是可变的volatile private int head=0;volatile private int tail=0;volatile private int size=0;String[] arr=new String[100];public void put(String elem) throws InterruptedException{synchronized (this){while(size>=arr.length){//队列满了,触发阻塞this.wait();}arr[tail]=elem;tail++;if(tail>=arr.length){tail=0;}size++;//用来唤醒队列为空的阻塞情况this.notify();}}public String take() throws InterruptedException {synchronized (this){if(size==0){//队列空了,也触发阻塞this.wait();}String elem=arr[head];head++;if(head>=arr.length){head=0;}size--;//用来唤醒队列满的阻塞情况this.notify();return elem;}}
}
public class Demo20
{public static void main(String[] args) throws InterruptedException {MyBlockingQueue queue=new MyBlockingQueue();Thread t1=new Thread(()->{int count=0;while(true){try {queue.put(count+"");System.out.println("生产元素:"+count);count++;Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}}});Thread t2=new Thread(()->{while(true){try {String x=queue.take();System.out.println("消费元素:"+x);} catch (InterruptedException e) {throw new RuntimeException(e);}}});t1.start();;t2.start();}
}