文章目录
- 💐生产者消费者模型
- 💐模拟实现阻塞队列
- 💡注意点一
- 💡注意点二
阻塞队列是一种“特殊”的数据结构,但是也遵循队列的“先进先出”特性,它的特殊在于:
阻塞队列的两个特性:
1、阻塞队列是线程安全的
2、带有阻塞特性:
a.向队列中添加元素时,如果队列满了,就会阻塞等待,直到其他线程从队列中取走元素时才会解除等待
b.从队列中向外拿元素时,如果队列为空,就会阻塞等待,直到其他线程向队列中添加元素后,才会解除等待
阻塞队列的最大作用就是可以用来实现“生产者消费者模型”;
💐生产者消费者模型
什么是生产者消费者模型?
比如,在过年时包饺子,妈妈负责擀饺子皮,我负责包饺子,妈妈会把擀好的饺子皮放在板子上,此时,我就可以拿饺子皮开始包饺子,当板子上没有饺子皮时,我就要等妈妈擀,如果当板子上的饺子皮满了的时候,妈妈就会停止擀饺子皮,等到板子上有位置了以后,才会继续擀,所以,妈妈就相当于是一个生产者,我就相当于是一个消费者,放饺子皮的板子就相当于是一个阻塞队列。
1、生产者消费者模型可以解决强耦合问题:
什么是强耦合:
例如,有一个简单的分布式系统,客户端向服务器机房发送一个要求后等待服务器响应,但是,这个要求可能需要两个服务器联合进行处理,假设,有一个A服务器和B服务器,A服务器可能需要B服务器的响应后再进行后序的处理,但是呢,如果因为B服务器挂了,那么就会导致A服务器也会挂,所以这种联系较紧密的强耦合模块就很容易出现问题;
针对上述情况,就可以引入一个阻塞队列来解决:
将阻塞队列也封装成一个单独的服务器程序,让A服务器直接和这个阻塞队列进行交互,让B服务器也直接和阻塞队列进行交互,不管B到底会不会出错,也都不会影响到A,A都只会和阻塞队列进行交互,就算以后再增加多个服务器的话,也就只是和阻塞队列进行交互,A也不必进行修改,这样就降低了耦合性👇
2、削峰填谷
”峰“的意思是:短时间内,请求量比较多
”谷”的意思是:请求量比较少
如果在没有引入阻塞队列的情况下(如上图一),如果客户端这边的请求量比较大,那么,不管A接受多少请求量,都会直接发给B,A和B所要承受的压力是一样的,但是,每个服务器都是不一样的,可能就会出现,对于这个并发量,A服务器可以承受,B服务器就承受不了;
对于这种情况就可以用生产者消费者模型解决:
和图二一样,给A发过去的请求量,并不直接发给B,而是发队列,B仍然按照他自己的速度处理请求,举个例子:假如,A和B每秒钟可以处理1000条请求,但是,出现了极端情况,A每秒处理了3000条请求,但是呢,B仍然会继续按照每秒处理1000条请求,这样,请求就会在队列中积压,如果队列满了的话,A向队列中发送请求就发生阻塞,直到B从队列中取出请求,使队列出现空余空间后,A再继续发出请求,就这样,一来一回,虽然可能响应有点慢了,但是总比把B服务器搞挂了好;而后续A服务器发出的请求量少了以后,B服务器就可以慢慢的处理完这些积压的请求;
有了这样的机制后,就可以保证在突发情况突然来临时,服务器仍然能够正常运行
💐模拟实现阻塞队列
class MyBlockingQueue<T> {private Object[] element = new Object[5];//为了避免内存可见性问题,加上一个volatileprivate volatile int head;//记录当前要弹出元素的位置private volatile int tail;//记录当前要添加元素的位置private volatile int size;//向队列中添加一个元素public void put(T elem) {//加锁synchronized(this) {//判断队列是否满了while(size == element.length) {//阻塞等待try {this.wait();} catch (InterruptedException e) {e.printStackTrace();}}//没满情况element[tail] = elem;tail++;//判断tail是否是数组最后一个元素if(tail == element.length){tail = 0;}size++;this.notify();}}//弹出队列中的一个元素public T take() {synchronized (this) {//判断队列是否为空while(size == 0) {//阻塞等待try {this.wait();} catch (InterruptedException e) {throw new RuntimeException(e);}}//不为空情况T ret = (T)element[head];head++;if(head == element.length) {head = 0;}size--;this.notify();return ret;}}}
💡注意点一
put() 方法里面的wait需要take()里面的notify唤醒,take()方法里面的wait需要put() 里面的notify唤醒,因为,当在put里面调用wait后,就会处于阻塞等待,就算是继续调用put,仍然会阻塞等待,必须调用take里面的notify后,才后解除等待,take也同理,所以不可能出现方法内调用notify解除方法自己的wait这中情况;
💡注意点二
在这里为什么要使用while循环判断?
wait被唤醒的方式除了notify外,还有一种方式就是,如果在其他线程中调用interupt方法可能会中断wait的等待,就如上面代码一样,如果一旦调用了interupt方法,那么wait就会被唤醒,被唤醒之后,代码并没有退出,这时候,如果数组仍然是满的话,那么就会出现bug,此时的tail指向的还是一个有效元素,这个有效元素就会被修改,并且size++,size就超过了数组的长度,所以,利用while语句,一旦wait被唤醒,那么就再次进行一下判断,如果数组是满的,仍然继续等待,反之向下执行,take也同理;