在开始介绍 LinkedBlockingDeque 之前, 我们先看一下 LinkedBlockingDeque 的类图:
从其中可以看出他直接实现了 BlockingDeque 接口, 而 BlockingDeque 又实现了 BlockingQueue 的接口, 所以它本身具备了队列的特性。
而实现 BlockingDeque 使其在 BlockingQueue 的基础上多了 Deque 的功能。
什么是 Deque 呢?
Deque(双端队列)是一种具有两端插入和删除操作的数据结构, 允许在队列的前端和后端执行高效的操作。
Deque 是“双端队列”(Double-Ended Queue)的缩写, 其特点在于可以在两端同时进行元素的插入和删除操作, 这使得它在许多应用中具有卓越的灵活性和性能。
我们知道队列只能是头进尾出, 而 Deque 2 边都支持进出, 这个特性更加的灵活。
实现了 BlockingDeque 的 LinkedBlockingDeque 其本身具备了以下特点
- 不支持 null 元素: 存入 null 元素会抛出异常
- 线程安全性: 在内部中通过 ReentrantLock 保证多线程环境下的安全访问
- 无界容量: 在不指定容量的情况下, 默认为 Integer.MAX_VALUE, 理论上的无限容量
- 阻塞操作: 当 LinkedBlockingDeque 的操作尝试在空队列中执行移除操作或在满队列中执行插入操作时, 它会导致操作的阻塞
1 实现的数据结构
内部的实现结构就是一个链表, 但是为了支持 2 端都可以出入队操作, 所以是一个双向链表。
2 源码分析
2.1 LinkedBlockingDeque 链表节点的定义
我们知道 LinkedBlockingDeque 的底层实现结构就是一个链表, 而链表绕不开的一个概念就是节点, 所以我们先来看一下 LinkedBlockingDeque 的节点定义。
public class LinkedBlockingDeque<E> {/*** 节点类, 用于存储数据*/static class Node<E> {// 存储的内容E item;// 上一个节点的指针Node<E> prev;// 下一个节点的指针Node<E> next;Node(E x) {item = x;}}
}
上面的 Node 就是队列中的节点了, 声明的属性
item: 存储的内容
prev: 上一个节点的指针
next: 下一个节点指针
可以看出 LinkedBlockingDeque 是一个具备双向操作的链表
2.2 LinkedBlockingQueue 持有的属性
public class LinkedBlockingDeque<E> {/** 队列的第一个元素的节点, 也就是头节点*/transient Node<E> first;/** 队列的最后一个元素节点, 也就是尾节点 */transient Node<E> last;/** 当前队列中的元素个数 */private transient int count;/** 队列的大小, 默认为 Integer.MAX_VALUE */private final int capacity;/** 所有操作队列元素节点时使用的锁 */final ReentrantLock lock = new ReentrantLock();// 非空, 也就是数组中重新有数据了, 可以继续取数据了// 当某个线程尝试从当前的队列中获取元素时, 如果数组中没有数据, 会把这个线程放到这个等待条件中// 在另外一个线程中添加元素到数组中, 会唤醒阻塞在这个等待条件中的线程private final Condition notEmpty = lock.newCondition();// 非满, 也就是数组中的数据又没有达到上限了, 可以继续添加数据// 当某个线程尝试向当前的队列添加元素, 但是当前数组已经满了, 会把这个线程放到这个等待条件中// 在另一个线程中从当前队列中获取一个元素时, 会唤醒等待在这个等待队列中的线程private final Condition notFull = lock.newCondition();}
通过上面属性的声明, 可以发现 LinkedBlockingDeque 和 ArrayBlockingQueue 类似, 都是通过一个可重入锁控制并发, 同时通过 2 个 Condition 实现在队列空或满时挂起或唤醒线程。
再来看表示队列容量的 count, 就是简单的用一个 int 表示, 没有用 volatile 修饰, 可以保证线程的可见性吗?
在 LinkedBlockingQueue 容量的大小却是的用 AtomicInteger 来表示, 为什么不是同样简单用一个 int 呢?
这 2 个问题都可以用 Happens-Before 的监视器原则来解释。
在 LinkedBlockingDeque 中只用了一把锁, 同时 count 的修改也只有在线程持有锁的过程中变更, 所以能够保证可见性, 而监视器原则,
确保了同一把锁的解锁 Happens-Before 加锁, 也就是说在 A 线程解锁之前的操作对于 B 线程获取同一把锁后都是可见的。
而 LinkedBlockingQueue 中使用了 2 把锁, 一个用于添加元素, 一个用于移除元素, 所以 2 把锁之间的加锁和解锁不能保证可见性, 所以使用了 AtomicInteger, 利用其内部的 CAS 操作来保证可见性。
2.3 LinkedBlockingQueue 构造函数
public class LinkedBlockingDeque<E> {// 无参构造函数public LinkedBlockingDeque() {// 调用自身指定容量的构造函数, 默认为 int 的最大值this(Integer.MAX_VALUE);}// 指定容量的构造函数public LinkedBlockingDeque(int capacity) {// 指定的容量小于等于 0, 直接抛出异常if (capacity <= 0)throw new IllegalArgumentException();this.capacity = capacity;}// 指定一个集合的构造函数public LinkedBlockingDeque(Collection<? extends E> c) {this(Integer.MAX_VALUE);final ReentrantLock lock = this.lock;// 加锁lock.lock();try {for (E e : c) {// 元素不能为空if (e == null)throw new NullPointerException();// 把当前的数据封装为 Node 节点, 放到了队列的尾部, 添加失败就抛出异常if (!linkLast(new Node<E>(e)))throw new IllegalStateException("Deque full");}} finally {lock.unlock();}}private boolean linkLast(Node<E> node) {// 队列中的元素个数大于等于容量上限了, 直接返回 falseif (count >= capacity)return false;// 获取尾节点Node<E> l = last;// 新增节点的上一个节点为原本的尾节点node.prev = l;// 当前的尾节点等于新增的节点last = node;// 头节点为空, 原本链表没有数据, 新增节点就是头节点if (first == null)first = node;else// 头节点不为空, 原本链表有数据, 那么把原本的尾节点的下一个节点指向新增的节点l.next = node;// 队列中的元素个数 + 1++count;// 队列有数据了, 唤醒阻塞在非空等待条件上的线程notEmpty.signal();return true;}
}
第一个构造函数无参, 那么创建的队列的容量就是 Integer.MAX_VALUE, 也就是理论上的无限容量。
第二个构造函数可以指定队列大小。
第三个构造函数传入一个集合, 这样也会创建出理论无限容量的队列。
同理里面使用了 ReentrantLock 来加锁, 然后把传入的集合元素按顺序一个个放入 items 中。
这里加锁目的不是使用它的互斥性, 而是让修改后的头节点 first 和尾节点 last 的变动, 对其他线程可见, 也就是使用了 ReentrantLock 的监视器原则。
2.4 LinkedBlockingDeque 支持的方法
2.4.1 数据入队方法
LinkedBlockingDeque 提供了多种入队操作的实现来满足不同情况下的需求, 入队操作有如下几种:
- add(E e) / addFirst(E e) / addLast(E e)
- offer(E e) / offerFirst(E e) / offerLast(E e)
- offer(E e, long timeout, TimeUnit unit) / offerFirst(E e, long timeout, TimeUnit unit) / offerLast(E e, long timeout, TimeUnit unit)
- put(E e) / putFirst(E e) / putLast(E e)
add() 相关的方法
public class LinkedBlockingDeque<E> {public boolean add(E e) {// 调用到添加到尾部的方法addLast(e);return true;}public void addFirst(E e) {// 调用 offerFirst 的方法if (!offerFirst(e))throw new IllegalStateException("Deque full");}public void addLast(E e) {// 调用 offerLast 的方法if (!offerLast(e))throw new IllegalStateException("Deque full");}}
add() 相关的方法都是直接调用 offer() 对应的方法, 比如 add 对应 offer, addFirst 对应 offerFirst, 所以我们只需要看 offer() 方法的实现就可以了。
offer() 相关的方法
public class LinkedBlockingDeque<E> {public boolean offer(E e) {// 调用自身的 offerLast 方法, 添加到队列的尾部return offerLast(e);}public boolean offerFirst(E e) {// 添加的数据不能为空if (e == null) throw new NullPointerException();// 封装为 Node 节点 Node<E> node = new Node<E>(e);final ReentrantLock lock = this.lock;// 获取锁lock.lock();try {// 添加到队列的头部return linkFirst(node);} finally {lock.unlock();}}public boolean offerLast(E e) {// 添加的数据不能为空if (e == null) throw new NullPointerException();// 封装为 Node 节点 Node<E> node = new Node<E>(e);final ReentrantLock lock = this.lock;// 获取锁lock.lock();try {// 添加到队列的尾部return linkLast(node);} finally {lock.unlock();}}private boolean linkFirst(Node<E> node) {// 节点个数大于等于容量上限了, 直接返回if (count >= capacity)return false;// 获取头节点 Node<E> f = first;// 新增的节点的下一个节点为当前的头节点node.next = f;// 当前的头节的设置为新增的节点first = node;// 当前的尾节点为空, 表示原本链表中没有数据, 把新增的节点同时设置为尾节点if (last == null)last = node;else// 为节点不为空, 表示原本链表中有数据了// 那么只需要把 旧的头节点的上一个节点指向新增的节点f.prev = node;// 队列中的元素个数 + 1 ++count;// 唤醒阻塞在非空等待条件中的线程notEmpty.signal();return true;}private boolean linkLast(Node<E> node) {if (count >= capacity)return false;// 获取尾结点 Node<E> l = last;// 新增节点的上一个节点为原本的尾节点node.prev = l;// 当前的尾节点等于新增的节点last = node;// 头节点为空, 原本链表没有数据if (first == null)// 将头节点设置为新增的节点first = node;else// 尾节点不为空, 原本队列有数据了, 把原本尾节点的下一个节点设置为新增的节点l.next = node;// 队列中的元素个数 + 1 ++count;// 唤醒阻塞在非空等待条件中的线程notEmpty.signal();return true;}
}
offer() 相关的方法的实现很简单, 最终就是调用到内部的 linkFirst() 和 linkLast() 方法, 而 2 个就是链表的节点新增操作。
而 offer() 相关的带超时时间的方法就不展开了, 原理和 LinkedBlockingQueue 一样, 利用了 Condition 的 awaitNanos 进行超时等待, 并在外面用 while 循环控制等待时的中断问题。
put() 相关的方法
public class LinkedBlockingDeque<E> {public void put(E e) throws InterruptedException {// 调用到 putLast 方法putLast(e);}public void putLast(E e) throws InterruptedException {// 添加的数据不能为空if (e == null) throw new NullPointerException();// 将当前的数据封装为 Node 节点 Node<E> node = new Node<E>(e);final ReentrantLock lock = this.lock;lock.lock();try {// 阻塞等待 linkLast 成功while (!linkLast(node))// 添加到队列尾部失败的话, 将当前线程放到非满的等待条件, 等待唤醒后, 尝试添加元素到队列notFull.await();} finally {lock.unlock();}}public void putFirst(E e) throws InterruptedException {if (e == null) throw new NullPointerException();Node<E> node = new Node<E>(e);final ReentrantLock lock = this.lock;lock.lock();try {// 阻塞等待linkFirst成功while (!linkFirst(node))// 添加到队列尾部失败的话, 当前线程放到非满的等待条件, 等待唤醒后, 尝试添加元素到队列notFull.await();} finally {lock.unlock();}}
}
lock 加锁后一直阻塞等待, 直到元素插入到队列中。
整体的方法和 offer 方法类似, 前者在当队列已满时, 进入阻塞, 后者在队列已满时, 则是返回。
2.3.2 数据出队方法
同入队的方法一样, 出队也有多种实现, LinkedBlockingDeque 提供了好几种出队的方法, 大体如下:
- remove() / removeFirst() /removeLast()
- poll() / pollFirst() / pollLast()
- poll(long timeout, TimeUnit unit) / pollFirst(long timeout, TimeUnit unit) / pollLast(long timeout, TimeUnit unit)
- take() / takeFirst() / takeLast()
remove() 相关的方法
public class LinkedBlockingDeque<E> {public E remove() {// 调用自身的 removeFirst 方法return removeFirst();}public E removeFirst() {// 调用自身的 pollFirst 方法E x = pollFirst();if (x == null) throw new NoSuchElementException();return x;}public E removeLast() {// 调用自身的 pollLast 方法E x = pollLast();if (x == null) throw new NoSuchElementException();return x;}
}
remove() 相关的方法都是调用到对应的 poll() 方法, 在拿到方法的返回值后, 做一层非空的判断。
poll() 相关的方法
public class LinkedBlockingDeque<E> {public E poll() {// 调用到自身的 pollFirst 方法return pollFirst();}public E pollFirst() {final ReentrantLock lock = this.lock;// 加锁lock.lock();try {// 带锁的执行 unlinkFirst 方法return unlinkFirst();} finally {lock.unlock();}}public E pollLast() {final ReentrantLock lock = this.lock;lock.lock();try {// 带锁的执行 unlinkLast 方法return unlinkLast();} finally {lock.unlock();}}private E unlinkFirst() {Node<E> f = first;// 判断当前的头节点, 为空的话, 直接返回 nullif (f == null)return null;// 获取头节点的下一个节点 Node<E> n = f.next;// 获取头节点的数据E item = f.item;// 置空头节点的数据f.item = null;// 把头节点的下一个节点指向自身f.next = f;// 把头节点的指针 first 指向当前头节点的下一个节点first = n;// 头节点的下一个节点为空if (n == null)// 置空尾结点last = null;else// 否则设置头节点的下一个节点的上一个节点为空n.prev = null;// 队列元素个数 - 1 --count;// 唤醒在 notFull 等待队列上的线程notFull.signal();return item;}private E unlinkLast() {Node<E> l = last;// 尾节点为空, 直接返回 nullif (l == null)return null;// 获取尾节点的上一个节点Node<E> p = l.prev;// 获取尾节点的数据E item = l.item;// 置空尾节点的数据l.item = null;// 设置尾结点的上一个节点为自身l.prev = l;// 设置尾节点的指针 last 指向当前尾结点的上一个节点last = p;// 尾结点的上一个节点为空if (p == null)// 置空头节点first = null;else// 设置尾结点的上一个节点的下一个节点为空p.next = null;// 队列元素个数 - 1 --count;// 唤醒在 notFull 等待队列上的线程notFull.signal();return item;}}
poll() 相关的方法的实现很简单, 最终就是调用到内部的 unlinkFirst() 和 unlinkLast() 方法, 而 2 个就是链表的节点删除操作。
而 poll() 相关的带超时时间的方法就不展开了, 原理和 LinkedBlockingQueue 一样, 利用了 Condition 的 awaitNanos 进行超时等待, 并在外面用 while 循环控制等待时的中断问题。
take() 相关的方法
public class LinkedBlockingDeque<E> {public E take() throws InterruptedException {// 调用到自身的 takeFirst 方法return takeFirst();}public E takeFirst() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lock();try {E x;// 通过 unlinkFirst 方法删除头节点, // 删除失败, 即返回结果为 null, 将当前线程阻塞在非空等待条件上// 等待唤醒后重新执行删除while ((x = unlinkFirst()) == null)notEmpty.await();return x;} finally {lock.unlock();}}public E takeLast() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lock();try {E x;// 通过 unlinkFirst 方法删除头节点, // 删除失败, 即返回结果为 null, 将当前线程阻塞在非空等待条件上// 等待唤醒后重新执行删除while ((x = unlinkLast()) == null)notEmpty.await();return x;} finally {lock.unlock();}}}
和 put() 方法一个套路, 先通过 lock 加锁, 然后 while 循环重试, 失败就 await 阻塞等待, 等待下次唤醒, 直至成功。
2.3.3 获取元素方法
LinkedBlockingDeque 提供了获取元素的方法主要有 2 个 element() 和 peek() 方法, 他们的区别在于, element() 方法在队列为空时会抛出异常, 而 peek() 方法则是返回 null。
public class LinkedBlockingDeque<E> {public E element() {// 调用 getFirst() 方法return getFirst();}public E peek() {// 调用 peekFirst() 方法return peekFirst();}public E getFirst() {// 调用 getFirst 方法结果还是调用到 peekFirst() 方法E x = peekFirst();if (x == null) throw new NoSuchElementException();return x;}public E getLast() {// 调用 getLast 方法结果还是调用到 peekLast() 方法E x = peekLast();if (x == null) throw new NoSuchElementException();return x;}public E peekFirst() {final ReentrantLock lock = this.lock;// 加锁lock.lock();try {// 头节点为空吗 ? 是的话返回 null, 否则返回头节点的数据return (first == null) ? null : first.item;} finally {lock.unlock();}}public E peekLast() {final ReentrantLock lock = this.lock;// 加锁lock.lock();try {// 尾节点为空吗? 是的话返回 null, 否则返回尾节点的数据return (last == null) ? null : last.item;} finally {lock.unlock();}}
}
获取元素前加锁, 防止并发问题导致数据不一致, 然后利用 first 和 last 2 个节点直接可以获得元素。
2.3.4 删除元素方法
public class LinkedBlockingDeque<E> {public boolean remove(Object o) {// 调用自身的 removeFirstOccurrence 方法return removeFirstOccurrence(o);}public boolean removeFirstOccurrence(Object o) {// 从头开始遍历// 需要移除的节点为 null, 直接抛异常if (o == null) return false;final ReentrantLock lock = this.lock;// 加锁lock.lock();try {// 从 first 向后开始遍历比较, 找到元素后调用 unlink 移除for (Node<E> p = first; p != null; p = p.next) {if (o.equals(p.item)) {unlink(p);return true;}}return false;} finally {lock.unlock();}}public boolean removeLastOccurrence(Object o) {// 从尾开始遍历if (o == null) return false;final ReentrantLock lock = this.lock;// 加锁lock.lock();try {// 从 last 向前开始遍历比较, 找到元素后调用 unlink 移除for (Node<E> p = last; p != null; p = p.prev) {if (o.equals(p.item)) {unlink(p);return true;}}return false;} finally {lock.unlock();}}void unlink(Node<E> x) {// 获取删除节点的上一个节点Node<E> p = x.prev;// 获取删除节点的下一个节点Node<E> n = x.next;// 需要删除的节点上一个节点为空, 需要删除的节点为头节点if (p == null) {unlinkFirst();} else if (n == null) {// 需要删除的节点的下一个节点为空, 需要删除对接的为尾节点unlinkLast();} else {// 把需要删除的节点的上一个节点的下一个节点的指针 指向 需要删除的节点的下一个节点p.next = n;// 把需要删除的节点的下一个节点的上一个节点的指针 指向 需要删除的节点的上一个节点n.prev = p;// 置空需要删除节点的数据x.item = null;// 队列中的元素个数 - 1--count;// 唤醒阻塞在 notFull 等待队列上的线程notFull.signal();}}
}
删除元素是从头 / 尾向两边进行遍历比较, 故时间复杂度为 O(n), 最后调用 unlink 把要移除元素的 prev 和 next 进行关联, 把要移除的元素从链中脱离, 等待下次 GC 回收。
3 总结
总体而言, LinkedBlockingDeque 的源码实现通过锁、条件等待和双向链表结构, 保证了在多线程环境中对队列的安全操作。
其本身可以作为一个双端队列使用, 也可以作为一个双端阻塞队列使用, 具有很好的灵活性。
4 转载
【细谈Java并发】谈谈LinkedBlockingDeque