目录
问题引出
一.单端阻塞队列(BlockingQueue)
二.双端阻塞队列(BlockingDeque)
三.延迟队列(DelayQueue)
问题引出
由于实现消费者-生产者模型,每一次实现都比较麻烦,比如sychronized的同步处理,或者通过锁实现。这些实现起来都比较繁琐,为了简单就能实现这种模型,JUC提供了阻塞队列接口:BlockingQueue(单端阻塞队列)和BlockingDeque(双端阻塞队列)
一.单端阻塞队列(BlockingQueue)
原理:通过使用FIFO模式处理的集合结构
什么是FIFO?
FIFO(First-In, First-Out)是一种常见的处理数据的方式,也被称为先进先出模式。在FIFO模式中,首先进入队列的数据首先被处理,而最后进入队列的数据最后被处理。
可以将FIFO模式理解为排队等候的情景,比如在超市的收银台,顾客按照先后顺序排队结账。当一个顾客结完账离开后,下一个顾客才能开始结账。这就是FIFO模式的处理顺序。
在计算机科学中,FIFO模式通常用于数据缓冲区、队列和调度算法等场景。例如,在操作系统中,进程调度算法可以使用FIFO模式,根据进程到达的先后顺序来决定执行顺序;在网络通信中,消息队列可以使用FIFO模式确保消息按照发送的先后顺序被接收和处理。
单端阻塞队列BlockQueue的常用方法:
方法 | 描述 |
---|---|
put(item) | 将指定的项放入队列中,如果队列已满则阻塞,直到有空间可用 |
take() | 从队列中获取并移除一个项,如果队列为空则阻塞,直到有项可取 |
offer(item) | 尝试将指定的项放入队列中,如果队列已满则立即返回false,否则返回true |
poll(timeout) | 从队列中获取并移除一个项,在指定的超时时间内如果队列为空则返回null |
peek() | 返回队列中的第一个项,但不对队列进行修改,如果队列为空则返回null |
size() | 返回队列中当前的项数 |
isEmpty() | 检查队列是否为空 |
isFull() | 检查队列是否已满 |
clear() | 清空队列,移除所有的项 |
单端阻塞队列接口BlockingQueue提供多个子类ArrayBlockingQueue(数组结构)、LinkedBlockingQueue(链表单端阻塞队列)、PriorityBlockingQueue(优先级阻塞队列)、SynchronousQueue(同步队列)
ArrayBlockingQueue
案例代码:
上述代码修改后如下
package Example2129;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;public class javaDemo {public static void main(String[] args){
// 创建对象和资源量BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);
// 创建一个包含各种美食的String数组String[] foods = {"披萨","汉堡", "寿司", "墨西哥炸玉米卷", "牛排", "意大利面", "烤鸭", "富士山寿司", "印度咖喱", "巴西烤肉",};int id;
// 两个厨师for (int i=0;i<2;i++){id = i;new Thread(()->{for (int j=0;j<10;j++){try {
// 模拟做菜时间TimeUnit.SECONDS.sleep(3);System.out.println(Thread.currentThread().getName()+"已经做完菜肴"+foods[j]+"并端上座子");queue.put(foods[j]);}catch (Exception e){e.printStackTrace();}}},"厨师"+id).start();}for (int i=0;i<10;i++){id = i;new Thread(()->{for (int j=0;j<2;j++){try {
// 模拟客人吃饭的时间TimeUnit.SECONDS.sleep(1);System.out.println(Thread.currentThread().getName()+"享用完"+queue.take()+"这道菜");}catch (Exception e){e.printStackTrace();}}},"客人"+i).start();}}
}
注意:阻塞队列虽然解决了数据存满则线程等待的情况,但是并没有解决线程并发的问题
LinkedBlockingQueue
案例代码:
package Example2130;import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;public class javaDemo {public static void main(String[] args) {
// 设置容量BlockingQueue<String> queue = new LinkedBlockingQueue<>(2);Random random = new Random();new Thread(()->{while (true){try {if (queue.size()==2){System.out.println("队列已满");TimeUnit.SECONDS.sleep(1);}else {TimeUnit.SECONDS.sleep(random.nextInt(3));System.out.println("存入数据");queue.put("存入数据");}} catch (InterruptedException e) {throw new RuntimeException(e);}}}).start();new Thread(()->{while (true){try {TimeUnit.SECONDS.sleep(random.nextInt(3));if (queue.isEmpty()){System.out.println("队列空了啊");TimeUnit.SECONDS.sleep(1);}else {System.out.println("取出数据");queue.take();}} catch (InterruptedException e) {throw new RuntimeException(e);}}}).start();}
}
PriorityBlockingQueue
package Example2131;import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;public class javaDemo {public static void main(String[] args) {BlockingQueue<Integer> queue = new PriorityBlockingQueue<>();Random random = new Random();new Thread(()->{try {for (int i=0;i<5;i++){queue.put(random.nextInt(10));}} catch (InterruptedException e) {throw new RuntimeException(e);}}).start();new Thread(()->{try {for (int i=0;i<5;i++){System.out.println("取出数据"+queue.take());}} catch (InterruptedException e) {throw new RuntimeException(e);}}).start();}
}
PriorityBlockingQueue
的特点是:
- 元素按照优先级进行排序。在示例中,较小的数字具有较高的优先级。
- 插入和移除操作的时间复杂度为O(logN),其中N为队列中的元素个数。
SynchronousQueue
package Example2132;import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;public class javaDemo {public static void main(String[] args){
// 创建对象和资源量BlockingQueue<String> queue = new SynchronousQueue<>();
// 创建一个包含各种美食的String数组String[] foods = {"披萨","汉堡", "寿司", "墨西哥炸玉米卷", "牛排", "意大利面", "烤鸭", "富士山寿司", "印度咖喱", "巴西烤肉",};int id;
// 两个厨师new Thread(()->{for (int j=0;j<10;j++){try {
// 模拟做菜时间TimeUnit.SECONDS.sleep(3);System.out.println(Thread.currentThread().getName()+"已经做完菜肴"+foods[j]+"并端上座子");queue.put(foods[j]);}catch (Exception e){e.printStackTrace();}}},"厨师").start();for (int i=0;i<10;i++){id = i;new Thread(()->{try {
// 模拟客人吃饭的时间TimeUnit.SECONDS.sleep(1);System.out.println(Thread.currentThread().getName()+"享用完"+queue.take()+"这道菜");}catch (Exception e){e.printStackTrace();}},"客人"+i).start();}}
}
SynchronousQueue
的特点是:
- 队列没有容量,每次插入操作必须等待对应的删除操作,反之亦然。
- 插入和删除操作是成对的,即一个元素的插入必须等待其被消费取出。
实现子类之间的区别:
-
ArrayBlockingQueue
(数组结构阻塞队列):- 基于数组实现的有界队列,具有固定容量。
- 具有公平(FIFO)和非公平(默认)两种策略的可选择性。
- 内部使用单个锁来实现线程安全。
- 插入和移除元素的时间复杂度为O(1)。
-
LinkedBlockingQueue
(链表单端阻塞队列):- 基于链表实现的可选有界或无界队列。
- 默认情况下是无界的,但可以指定最大容量来创建有界队列。
- 内部使用两个锁来实现线程安全,一个用于插入操作,一个用于移除操作。
- 插入和移除元素的时间复杂度为O(1)。
-
PriorityBlockingQueue
(优先级阻塞队列):- 基于堆实现的无界优先级队列。
- 元素按照优先级进行排序,优先级通过元素的自然顺序或者自定义比较器进行确定。
- 内部不允许存储
null
元素。 - 插入和移除元素的时间复杂度为O(logN),其中N为队列中的元素个数。
-
SynchronousQueue
(同步队列):- 一个没有缓冲区的阻塞队列,用于线程之间直接传输元素。
- 每个插入操作必须等待相应的移除操作,反之亦然。
- 队列本身不存储元素,仅用于线程之间的数据传递。
- 插入和移除操作通常具有较高的可伸缩性性能。
二.双端阻塞队列(BlockingDeque)
BlockingDeque ,可以实现FIFO与FILO操作
什么是FILO?
- FILO(First-In, Last-Out)是一种数据处理方式,也被称为后进先出模式。在FILO模式中,最后进入的数据会首先被处理,而最先进入的数据会最后被处理。
- 可以将FILO模式理解为堆叠物品的情景,比如在一个书架上放置书籍。当我们将一本新书放在书架上时,它会被放在已有书籍的顶部,因此最后放置的书会处于最上方。当我们需要取出一本书时,会优先从顶部取出最后放置的那本书。这符合FILO模式的处理顺序。
- 在计算机科学中,FILO模式常用于栈(Stack)数据结构的操作。栈是一种具有特定数据插入和删除规则的数据结构,最后插入的数据会成为栈顶,最先插入的数据会成为栈底。当需要访问或移除数据时,我们通常会先操作栈顶的数据。
- 总之,FILO模式即后进先出模式,用于保持数据处理顺序的一种方式。类似于堆叠物品或栈数据结构,最后进入的数据会首先被处理,而最先进入的数据会最后被处理。
BlockingDeque 的常用方法:
方法 | 描述 |
---|---|
addFirst(item) | 将指定的项添加到双端队列的开头,如果队列已满则抛出异常 |
addLast(item) | 将指定的项添加到双端队列的末尾,如果队列已满则抛出异常 |
offerFirst(item) | 尝试将指定的项添加到双端队列的开头,如果队列已满则立即返回false,否则返回true |
offerLast(item) | 尝试将指定的项添加到双端队列的末尾,如果队列已满则立即返回false,否则返回true |
putFirst(item) | 将指定的项放入双端队列的开头,如果队列已满则阻塞,直到有空间可用 |
putLast(item) | 将指定的项放入双端队列的末尾,如果队列已满则阻塞,直到有空间可用 |
pollFirst(timeout) | 从双端队列的开头获取并移除一个项,在指定的超时时间内如果队列为空则返回null |
pollLast(timeout) | 从双端队列的末尾获取并移除一个项,在指定的超时时间内如果队列为空则返回null |
takeFirst() | 从双端队列的开头获取并移除一个项,如果队列为空则阻塞,直到有项可取 |
takeLast() | 从双端队列的末尾获取并移除一个项,如果队列为空则阻塞,直到有项可取 |
getFirst() | 返回双端队列的开头项,但不对队列进行修改,如果队列为空则抛出异常 |
getLast() | 返回双端队列的末尾项,但不对队列进行修改,如果队列为空则抛出异常 |
peekFirst() | 返回双端队列的开头项,但不对队列进行修改,如果队列为空则返回null |
peekLast() | 返回双端队列的末尾项,但不对队列进行修改,如果队列为空则返回null |
size() | 返回双端队列中当前的项数 |
isEmpty() | 检查双端队列是否为空 |
clear() | 清空双端队列,移除所有的项 |
双端阻塞队列只有一个实现的子类LinkedBlockingDeque
案例代码:
package Example2133;import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;public class javaDemo {public static void main(String[] args) {BlockingDeque<Integer> deque = new LinkedBlockingDeque<>();new Thread(()->{for (int i=0;i<10;i++){try {deque.putFirst(i);} catch (InterruptedException e) {throw new RuntimeException(e);}}}).start();new Thread(()->{while (true){try {TimeUnit.SECONDS.sleep(1);System.out.println(deque.takeLast());} catch (InterruptedException e) {throw new RuntimeException(e);}if (deque.isEmpty()){System.out.println("队列空了啦");break;}}}).start();}
}
可以看到双端情况下可以将数据放在头或者尾,获取也可以获取头和尾
三.延迟队列(DelayQueue)
在JUC中提供自动弹出数据延迟的队列DelayQueue,该类属于BlockingQueue的实现子类。如果是创建类对象插入到延迟队列中的话,类需要继承Delayed,并且覆写 compareTo()和getDelay()方法
原理:
延迟时间计算:每个元素实现了 Delayed 接口,该接口定义了一个 getDelay(TimeUnit unit) 方法,用于计算当前元素距离延迟时间还有多长时间。这个方法返回一个 long 类型的时间值,表示时间单位内的延迟时间。
队列存储:内部使用有序优先队列(PriorityQueue)来存储元素。元素将根据它们的延迟时间进行排序,即最小的延迟时间的元素将排在队头。
元素添加:调用 offer(E e) 方法将一个元素添加到队列中。插入元素时,根据其延迟时间,决定其位置。
元素获取:调用 take() 方法从队列中取出延迟时间到达的元素。如果队列为空,则线程阻塞等待,直到有元素可以取出。
添加与移除的同步:对队列的添加和移除操作进行同步,以确保多线程环境下的安全性。
定时删除:元素在队列中的保存时间一旦超过其延迟时间,将会被自动删除。
常用方法:
方法名 | 描述 |
---|---|
enqueue(item, delay) | 将指定的 item 入队,并在 delay 毫秒后执行。 |
dequeue() | 出队并返回最早的延迟任务。 |
getDelay(item) | 返回指定 item 的剩余延迟时间(以毫秒为单位),如果 item 已经过期则返回负数。 |
remove(item) | 从队列中移除指定的 item 。 |
size() | 返回队列中延迟任务的数量。 |
isEmpty() | 判断队列是否为空。 |
clear() | 清空队列,移除所有的延迟任务。 |
getExpiredItems(now) | 返回所有已过期的任务,并从队列中移除它们。 |
getNextExpiringItem() | 返回下一个即将过期的任务,但不从队列中移除它。 |
案例代码:
package Example2134;import org.jetbrains.annotations.NotNull;import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;class Student implements Delayed {private String name;
// 设置停留时间private long delay;
// 设置离开时间private long expire;Student(String name, long delay , TimeUnit unit){this.name=name;this.delay = TimeUnit.MILLISECONDS.convert(delay,unit);this.expire = System.currentTimeMillis()+this.delay;}@Overridepublic String toString() {return this.name+"同学已经到达预计停留的时间"+TimeUnit.SECONDS.convert(this.delay,TimeUnit.MILLISECONDS)+"秒,已经离开了";}// 延迟时间计算@Overridepublic long getDelay(@NotNull TimeUnit unit) {return unit.convert(this.expire - System.currentTimeMillis(),TimeUnit.MILLISECONDS);}// 队列弹出计算@Overridepublic int compareTo(@NotNull Delayed o) {return (int) (this.delay-this.getDelay(TimeUnit.MILLISECONDS));}
}public class javaDemo {public static void main(String[] args) throws InterruptedException {BlockingQueue<Student> students = new DelayQueue<Student>();students.put(new Student("黄小龙",3,TimeUnit.SECONDS));students.put(new Student("张三",1,TimeUnit.SECONDS));students.put(new Student("李四",5,TimeUnit.SECONDS));while (!students.isEmpty()){Student stu = students.take();System.out.println(stu);TimeUnit.SECONDS.sleep(1);}}
}