文章目录
- 普通队列存在的问题
- 单锁实现
- 双锁实现
普通队列存在的问题
- 大部分场景要求分离向队列放入(生产者)、从队列拿出(消费者)两个角色、它们得由不同的线程来担当,而之前的实现根本没有考虑线程安全问题
- 队列为空,那么在之前的实现里会返回 null,如果就是硬要拿到一个元素呢?只能不断循环尝试
- 队列为满,那么再之前的实现里会返回 false,如果就是硬要塞入一个元素呢?只能不断循环尝试
因此我们需要解决的问题有
- 用锁保证线程安全
- 用条件变量让等待非空线程与等待不满线程进入等待状态,而不是不断循环尝试,让 CPU 空转
单锁实现
ava 中要防止代码段交错执行,需要使用锁,有两种选择
- synchronized 代码块,属于关键字级别提供锁保护,功能少
- ReentrantLock 类,功能丰富
ReentrantLock
配合条件变量
来实现:
在队列满时,不是立刻返回,而是当前线程进入等待
什么时候队列不满了,再唤醒这个等待的线程,从上次的代码处继续向下运行
offer方法:
ReentrantLock lock = new ReentrantLock();
Condition tailWaits = lock.newCondition(); // 条件变量
int size = 0;public void offer(String e) {lock.lockInterruptibly();try {while (isFull()) {tailWaits.await(); // 当队列满时, 当前线程进入 tailWaits 等待}array[tail] = e;tail++;size++;} finally {lock.unlock();}
}private boolean isFull() {return size == array.length;
}
- 条件变量底层也是个队列,用来存储这些需要等待的线程,当队列满了,就会将 offer 线程加入条件队列,并暂时释放锁
- 将来我们的队列如果不满了(由 poll 线程那边得知)可以调用 tailWaits.signal() 来唤醒 tailWaits 中首个等待的线程,被唤醒的线程会再次抢到锁,从上次 await 处继续向下运行
上述关键点:
- 从 tailWaits 中唤醒的线程,会与新来的 offer 的线程争抢锁,谁能抢到是不一定的,如果后者先抢到,就会导致条件又发生变化
- 这种情况称之为虚假唤醒,唤醒后应该重新检查条件,看是不是得重新进入等待(while循环)
最终版本:
/*** 单锁实现* @param <E> 元素类型*/
public class BlockingQueue1<E> implements BlockingQueue<E> {private final E[] array;private int head = 0;private int tail = 0;private int size = 0; // 元素个数@SuppressWarnings("all")public BlockingQueue1(int capacity) {array = (E[]) new Object[capacity];}ReentrantLock lock = new ReentrantLock();//单锁Condition tailWaits = lock.newCondition();Condition headWaits = lock.newCondition();//条件变量,底层也是队列@Overridepublic void offer(E e) throws InterruptedException {lock.lockInterruptibly();try {while (isFull()) {tailWaits.await();}array[tail] = e;if (++tail == array.length) {tail = 0;}size++;headWaits.signal();} finally {lock.unlock();}}@Overridepublic void offer(E e, long timeout) throws InterruptedException {lock.lockInterruptibly();try {long t = TimeUnit.MILLISECONDS.toNanos(timeout);while (isFull()) {if (t <= 0) {return;}t = tailWaits.awaitNanos(t);}array[tail] = e;if (++tail == array.length) {tail = 0;}size++;headWaits.signal();} finally {lock.unlock();}}@Overridepublic E poll() throws InterruptedException {lock.lockInterruptibly();try {while (isEmpty()) {headWaits.await();}E e = array[head];array[head] = null; // help GCif (++head == array.length) {head = 0;}size--;tailWaits.signal();return e;} finally {lock.unlock();}}private boolean isEmpty() {return size == 0;}private boolean isFull() {return size == array.length;}
}
注意
- JDK 中 BlockingQueue 接口的方法命名与我的示例有些差异
- 方法 offer(E e) 是非阻塞的实现,阻塞实现方法为 put(E e)
- 方法 poll() 是非阻塞的实现,阻塞实现方法为 take()
双锁实现
单锁的缺点在于:
- 生产和消费几乎是不冲突的,唯一冲突的是生产者和消费者它们有可能同时修改 size
- 冲突的主要是生产者之间:多个 offer 线程修改 tail
- 冲突的还有消费者之间:多个 poll 线程修改 head
如果希望进一步提高性能,可以用两把锁
- 一把锁保护 tail
- 另一把锁保护 head
初步实现:
@Override
public void offer(E e) throws InterruptedException {tailLock.lockInterruptibly();try {// 队列满等待while (isFull()) {tailWaits.await();}// 不满则入队array[tail] = e;if (++tail == array.length) {tail = 0;}// 修改 size (有问题)size++;} finally {tailLock.unlock();}
}
上面代码的缺点是 size 并不受 tailLock 保护,tailLock 与 headLock 是两把不同的锁,并不能实现互斥的效果。因此,size 需要用下面的代码保证原子性
最终版本:
- 两把锁,一把 锁生产者,一把 锁消费者,生产者的signel需要加生产者的锁,然后唤醒消费者,消费者的signel要加消费者的锁,然后唤醒生产者。
- 当向队列取元素时:当队列由满到不满时,由消费者唤醒生产者(此时是生产者锁+生产者条件变量.signel),其他情况(由不满到不满)由消费者唤醒消费者。
- 当向队列存元素时:当队列由空到不空时,由生产者线程唤醒消费者(此时是消费者锁+消费者条件变量.signel),其他情况(由不空到不空时),生产者唤醒生产者。
- 生产者 / 消费者 获得生产者锁与消费者锁是串行,不能嵌套(避免死锁)。
public class BlockingQueue2<E> implements BlockingQueue<E> {private final E[] array;private int head = 0;private int tail = 0;private final AtomicInteger size = new AtomicInteger(0);ReentrantLock headLock = new ReentrantLock();Condition headWaits = headLock.newCondition();ReentrantLock tailLock = new ReentrantLock();Condition tailWaits = tailLock.newCondition();public BlockingQueue2(int capacity) {this.array = (E[]) new Object[capacity];}@Overridepublic void offer(E e) throws InterruptedException {int c;tailLock.lockInterruptibly();try {while (isFull()) {tailWaits.await();}array[tail] = e;if (++tail == array.length) {tail = 0;} c = size.getAndIncrement();// a. 队列不满, 但不是从满->不满, 由此offer线程唤醒其它offer线程if (c + 1 < array.length) {tailWaits.signal();}} finally {tailLock.unlock();}// b. 从0->不空, 由此offer线程唤醒等待的poll线程if (c == 0) {headLock.lock();try {headWaits.signal();} finally {headLock.unlock();}}}@Overridepublic E poll() throws InterruptedException {E e;int c;headLock.lockInterruptibly();try {while (isEmpty()) {headWaits.await(); }e = array[head]; if (++head == array.length) {head = 0;}c = size.getAndDecrement();// b. 队列不空, 但不是从0变化到不空,由此poll线程通知其它poll线程if (c > 1) {headWaits.signal();}} finally {headLock.unlock();}// a. 从满->不满, 由此poll线程唤醒等待的offer线程if (c == array.length) {tailLock.lock();try {tailWaits.signal();} finally {tailLock.unlock();}}return e;}private boolean isEmpty() {return size.get() == 0;}private boolean isFull() {return size.get() == array.length;}}