阻塞队列
阻塞队列,也是一个队列 ~~ 先进先出
实际上有一些特殊的队列,不一定非得遵守先进先出的 ~~ 优先级队列(PriorityQueue)
阻塞队列,也是特殊的队列,虽然也是先进先出的,但是带有特殊的功能: 阻塞
-
如果队列为空,执行出队列操作,就会阻塞.阻塞到另一个线程往队列里添加元素(队列不空)为止.
-
如果队列满了,执行入队列操作,也会阻塞.阻塞到另一个线程从队列取走元素位置(队列不满)
消息队列,也是特殊的队列.相当于是在阻塞队列的基础上,加上了个"消息的类型”按照制定类别进行先进先出.此时我们谈到的这个消息队列,仍然是一个“数据结构”.
因为这个消息队列,太好用了 ~~ 因此就有大佬把这样的数据结构,单独实现成了一个程序,这个程序,可以同过网络的方式和其它程序进行通信.
这时这个消息队列,就可以单独部署到一组服务器(分布式).存储能力和转发能力都大大提升了.
很多大型项目里,都可以看到这样的消息队列的身影.
此时消息队列,就已经成为了一个可以和MySQL,Redis这种相提并论的一个重要组件(“中间件”)
rabbit mq 就是消息队列中一种典型实现.还有其他的实现. active mq, rocket mq, kafka ……. 都是业界知名的消息队列.
要想认识清楚消息队列,还是得先认识清楚“阻塞队列”
为什么消息队列这么好用了? 和阻塞队列阻塞的特性关系非常大!!!
基于这样的特性,可以实现“生产者消费者模型”
过年.有个环节就是年夜饭,包饺子 ~~ 一家人在一起包饺子 ~~ 有些地方过年是吃汤圆的
包饺子的环节:挨饺子皮+包饺子
两种典型的包法:
- 每个人都分别进行擀饺子皮 + 包饺子 操作
- 一个人专门负责擀饺子皮,每次擀好一个皮,就放到盖帘上,其他人负责包,每次都从盖帘上取一个皮进行包 => 这种方式就称为生产者消费者模型.
生产者消费者模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等
待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取.
此时,负责擀饺子皮的人,就是生产者.
负责包饺子的人,就是消费者.
盖帘就是阻塞队列.
如果擀饺子皮的人擀得太慢了,包饺子的人就得等.
如果擀饺子皮的人擀得太快了,包饺子的人一时就包不完,擀饺子皮的人就得停下来等一会儿
生产者消费者模型,能给我们的程序带来两个非常重要的好处!!!
1.实现了发送方和接收方之间的"解耦"
~~ 降低耦合的过程,就叫做“解耦"
耦合两个模块之间的关联关系是强还是弱
~~ 关联越强,耦合越高
开发中典型的场景:服务器之间的相互调用.
3.可以做到“削峰填谷”,保证系统的稳定性
三峡大坝 ~~ 起到的效果,就是削峰填补
洪灾就是雨量达到峰值了,上游水量增长了,大坝把水拦住,让下游仍然有一个比较平缓的流量
到了旱季,三峡大坝开闸放水.
服务器的开发,也和上述这个模型是非常相似的.
上游,就是用户发送的请求.
下游就是一些执行具体业务的服务器.
用户发多少请求?是不可控的.有的时候,请求多,有的时候请求少…
这时就涉及到了,热搜(比如什么微博热搜什么的)这个概念.
那个词会成为热搜,这是和社会现象有关的,比如新冠期间,”疫情”…就是热搜词.
说不定某个瞬间,很多用户都要来给你发起请求 ~~ 服务器处理每个请求,都需要消耗硬件资源,包括不限于(CPU,内存,硬盘,带宽…),如果某个硬件资源达到瓶颈,此时服务器就挂了,这就给系统的稳定性带来风险!!!
使用生产者消费者模型,就是一个有效的手段!!!
使用标准库提供的阻塞队列
public interface BlockingDeque<E> extends BlockingQueue<E>, Deque<E> {}
LinkedBlockingQueue<> (java.util.concurrent)
=> 基于链表实现的阻塞队列
priorityBlockingQueue<> (java.util.concurrent)
=> 带有优先级的阻塞队列 ~~ 基于数据结构堆实现的
ArrayBlockingQueue<> (java.utii.concurrent)
=> 基于数组实现的阻塞队列
Queue提供的方法有三个:
- 入队列 ~~ offer
- 出队列 ~~ poll
- 取队首元素 ~~ peek
阻塞队列主要方法是两个:
- 入队列(带有阻塞功能) ~~ put
- 出队列(带有阻塞功能) ~~ take
public static void main(String[] args) throws InterruptedException {BlockingDeque<String> blockingDeque = new LinkedBlockingDeque<>();blockingDeque.put("fly0213"); // 入队列String res = blockingDeque.take(); // 出队列. System.out.println(res);res = blockingDeque.take(); // 队列中没有元素了, take, 就会阻塞.System.out.println(res);
}
现在基于标准库的阻塞队列,写一个简单的生产者消费者模型的代码
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;/*** Created with IntelliJ IDEA.* Description:* User: fly(逐梦者)* Date: 2023-10-02* Time: 14:30*/
public class ThreadDemo22 {public static void main(String[] args) {BlockingDeque<Integer> blockingDeque = new LinkedBlockingDeque<>();// 创建两个线程, 来作为生产者和消费者Thread customer = new Thread(() -> {while (true) {try {Integer result = blockingDeque.take();System.out.println("消费元素: " + result);} catch (InterruptedException e) {throw new RuntimeException(e);}}});customer.start();Thread producer = new Thread(() -> {int count = 0;while(true){try {blockingDeque.put(count);System.out.println("生产元素: "+ count);count++;Thread.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}}});producer.start();}
}
自己实现一个简单的阻塞队列
~~ 通过编写阻塞队列的代码,来更好的理解多线程
实现一个阻塞队列,需要分三步:
- 先写一个普通的队列,队列的实现可以基于数组,也可以基于链表(易于实现头删/尾插).
- 加上线程安全.
- 加上阻塞功能.
注: 基于链表实现普通队列的时候,需要头删和尾插的时间复杂度都是O(1).
链表头删的时间复杂度本来就是O(1),无需注意,只是链表的尾删操作时间复杂度要实现O(1),需要用一个额外的引用,记录当前的尾结点 ~~ 相关代码没什么难度,博主就不做讲解了!
用数组循环对列的方式来实现阻塞队列
1. 实现普通队列的代码
/*** Created with IntelliJ IDEA.* Description:* User: fly(逐梦者)* Date: 2023-10-02* Time: 15:08*/// 自己写的阻塞队列
// 注: 此处不考虑泛型, 直接使用 int 来表示元素类型
class MyBlockingQueue {private int[] items = new int[1000];private int head = 0;private int tail = 0;private int size = 0;// 入队列public void put(int value) {if (size == items.length) {// 队列满了, 不能继续插入return;}items[tail] = value;tail++;// 对 tail 进行处理// 第一种写法// tail = tail % items.length;// 第二种写法 ( 更推荐, 可读性更高 & % 在效率上没有优势 )if (tail >= items.length) {tail = 0;}size++;}// 出队列public Integer take() {if (size == 0) {// 队列空, 不能出队列return null;}int result = items[head];head++;if (head >= items.length) {head = 0;}size--;return result;}
}public class ThreadDemo23 {public static void main(String[] args) {MyBlockingQueue queue = new MyBlockingQueue();queue.put(1);queue.put(2);queue.put(3);queue.put(4);int result = 0;result = queue.take();System.out.println("result = " + result);result = queue.take();System.out.println("result = " + result);result = queue.take();System.out.println("result = " + result);result = queue.take();System.out.println("result = " + result);}
}
2. 阻塞功能 ~~ 意味着队列要在多线程环境下使用
保证线程安全,主要是就是加上锁 ~~ 使用 synchronized 包裹put()
和take()
方法里的所有代码,当然, synchronized 加到方法上,也是可以的.
// 自己写的阻塞队列
// 注: 此处不考虑泛型, 直接使用 int 来表示元素类型
class MyBlockingQueue {private int[] items = new int[1000];private int head = 0;private int tail = 0;private int size = 0;// 入队列public void put(int value) {synchronized (this) {if (size == items.length) {// 队列满了, 此时要产生阻塞// return;try {this.wait();} catch (InterruptedException e) {throw new RuntimeException(e);}}items[tail] = value;tail++;// 对 tail 进行处理// 第一种写法// tail = tail % items.length;// 第二种写法 ( 更推荐, 可读性更高 & % 在效率上没有优势 )if (tail >= items.length) {tail = 0;}size++;// 这个 notify 唤醒 take 中的 waitthis.notify();}}// 出队列public Integer take() {int result = 0;synchronized (this) {if (size == 0) {// 队列空, 此时也需要阻塞try {this.wait();} catch (InterruptedException e) {throw new RuntimeException(e);}return null;}result = items[head];head++;if (head >= items.length) {head = 0;}size--;// 唤醒 put 中的 waitthis.notify();}return result;}
}
这两个线程中的 wait 是否可能会同时触发 ???
~~ 如果同时触发了,显然就不能正确相互唤醒了
答案是否定的,针对同一个队列,不能够既是满,又是空,除非它跟薛定谔的猫一样,嘿嘿!!!
3.上述代码还有一个瑕疵
if (size == items.length) { this.wait(); }
当wait被唤醒的时候,此时if的条件,一定就不成立了嘛??
具体来说, put中的 wait 被唤醒后,要求队列没有满,
但是 wait 被唤醒了之后,队列一定是不满的嘛?
在当前代码中,虽然不会出现这种情况.当前代码一定是取元素成功才唤醒,每次取元素都会唤醒.
但是稳妥起见,最好的办法,是wait返回之后再次判定一下,看此时的条件是不是具备了!!!
例子: 类似于,早上起床上课,要定个8:20的闹钟.正常情况下,闹钟响了,就该起床了.
但是如果7:00就醒了,就发现,距离上课还早(条件尚未满足),还可以再睡会(每次醒了,应该都看下时间,判定一下当前是否满足了起床的条件).
if (size == items.length) {this.wait(); if (size == items.length) {this.wait();}
}
这么写是进行了二次判定了,但是并不合适.可能第二次wait被唤醒,可能还是条件不具备~~
4.完整版的代码
/*** Created with IntelliJ IDEA.* Description:* User: fly(逐梦者)* Date: 2023-10-02* Time: 15:08*/// 自己写的阻塞队列
// 注: 此处不考虑泛型, 直接使用 int 来表示元素类型
class MyBlockingQueue {private int[] items = new int[1000];private int head = 0;private int tail = 0;private int size = 0;// 入队列public void put(int value) {synchronized (this) {while (size == items.length) { // 标准库就建议这么写!!!// 队列满了, 此时要产生阻塞// return;try {this.wait();} catch (InterruptedException e) {throw new RuntimeException(e);}}items[tail] = value;tail++;// 对 tail 进行处理// 第一种写法// tail = tail % items.length;// 第二种写法 ( 更推荐, 可读性更高 & % 在效率上没有优势 )if (tail >= items.length) {tail = 0;}size++;// 这个 notify 唤醒 take 中的 waitthis.notify();}}// 出队列public Integer take() {int result = 0;synchronized (this) {while (size == 0) {// 队列空, 此时也需要阻塞try {this.wait();} catch (InterruptedException e) {throw new RuntimeException(e);}}result = items[head];head++;if (head >= items.length) {head = 0;}size--;// 唤醒 put 中的 waitthis.notify();}return result;}
}public class ThreadDemo23 {public static void main(String[] args) {MyBlockingQueue queue = new MyBlockingQueue();queue.put(1);queue.put(2);queue.put(3);queue.put(4);int result = 0;result = queue.take();System.out.println("result = " + result);result = queue.take();System.out.println("result = " + result);result = queue.take();System.out.println("result = " + result);result = queue.take();System.out.println("result = " + result);}
}