文章目录
- 1、PriorityBlockingQueue
- 2、DelayQueue
1、PriorityBlockingQueue
优先级阻塞队列就是在优先级队列的基础上增加队列排序的功能,将高优先级排在前面,所以优先级队列的元素需要实现Comparator接口。
如果数据结构用数组去维护队列的话,要么在put有大量的后移操作,要么在take有大量的前移操作。
为避免这个问题优先级队列内部用二叉堆的数据结构去实现,这样无论是put还是take都不会有大量的移动操作。具体逻辑如下:
put:如果优先级比父节点高就上浮,依次类推,直至不能再上浮
take:直接拿走堆顶元素,然后再用最后一个元素顶上堆顶,再根据优先级下沉该元素
不懂二叉堆的,可以先去查阅一下二叉堆或者堆排序。
虽然二叉堆不是完全有序的,但可以保证堆顶元素的优先级肯定是最高的。
put
public void put(E e) {offer(e); //优先级队列是无界的,所以不需要阻塞,直接调用offer}//入队public boolean offer(E e) {if (e == null)throw new NullPointerException();final ReentrantLock lock = this.lock;//加锁lock.lock();int n, cap;Object[] es;//扩容//先释放锁,开辟新数组(长度= size<64? size+2 : size*1.5。)//之后再重新加锁复制到新数组while ((n = size) >= (cap = (es = queue).length))tryGrow(es, cap);try {final Comparator<? super E> cmp;//如果没有传入比较器就用默认的if ((cmp = comparator) == null)//二叉堆节点上浮siftUpComparable(n, e, es);elsesiftUpUsingComparator(n, e, es, cmp);size = n + 1;//有元素了,不为空,notEmpty.signalnotEmpty.signal();} finally {//释放锁lock.unlock();}return true;}//上浮private static <T> void siftUpComparable(int k, T x, Object[] es) {Comparable<? super T> key = (Comparable<? super T>) x;//如果k=0,说明上浮到堆顶了while (k > 0) {//二叉堆父节点在数据的下标=(当前节点下标-1)/2int parent = (k - 1) >>> 1;Object e = es[parent];//不能再上浮了就breakif (key.compareTo((T) e) >= 0)break;//可以上浮,交换当前节点和父节点 es[k] = e;k = parent;}es[k] = key;}
take
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;//加锁lock.lockInterruptibly();E result;try {//dequeue出队一个元素while ( (result = dequeue()) == null)//队列为空,notEmpty.await 阻塞notEmpty.await();} finally {//释放锁lock.unlock();}return result;}//出队private E dequeue() {final Object[] es;final E result;//堆顶记入resultif ((result = (E) ((es = queue)[0])) != null) {final int n;//--size并将最后一个元素记入x后清空(赋值null)final E x = (E) es[(n = --size)];es[n] = null;if (n > 0) {final Comparator<? super E> cmp;//如果没有传入比较器就用默认的if ((cmp = comparator) == null)//二叉堆节点下沉siftDownComparable(0, x, es, n);elsesiftDownUsingComparator(0, x, es, n, cmp);}}//返回堆顶return result;}//下沉private static <T> void siftDownComparable(int k, T x, Object[] es, int n) {Comparable<? super T> key = (Comparable<? super T>)x;//假如这里是 (n-1)/2 就是(前面被清空的那个)最后一个元素x的父节点下标//但这里是 (n/2),意味着当最后一个元素x是左孩子节点的话half就是父节点下标,右孩子节点的话half就是父节点下标+1int half = n >>> 1;//k>=half说明下沉到底了while (k < half) {//n*2+1就是左孩子节点int child = (k << 1) + 1;Object c = es[child];//右孩子节点=左孩子节点+1int right = child + 1;//在两个孩子节点中取优先级比较高去跟下沉节点比较if (right < n &&((Comparable<? super T>) c).compareTo((T) es[right]) > 0)c = es[child = right];//如果不能再下沉了就breakif (key.compareTo((T) c) <= 0)break;//可以下沉,交换当前节点和被选中的孩子节点 es[k] = c;k = child;}//最后别忘了替换下沉堆顶的值为被清空的最后一个元素xes[k] = key;}
2、DelayQueue
而延迟队列是在优先级队列的基础上实现的(优先级按延迟时间排序),其内部维护了一个优先级队列。
注:是优先级队列而不是优先级阻塞队列,这两者的区别在于有无阻塞。
为什么要用优先级队列不用优先级阻塞队列?
因为不适用。延迟队列的put不需要阻塞,而take则是需要自己实现一个根据延迟时间来阻塞的逻辑。
put
public void put(E e) {offer(e); //因为不需要阻塞,所以直接调用offer即可}public boolean offer(E e) {final ReentrantLock lock = this.lock;//加锁lock.lock();try {//优先级队列 PriorityQueue.offer() 入队q.offer(e);//如果入队后是第一个元素,需要更新leader的阻塞时间if (q.peek() == e) {//清空leader,leader记录带超时时间等待阻塞队列头节点的线程(只有一个)leader = null;//唤醒所有正在等待的线程重新take(自旋),以更新leader的阻塞时间,同时leader也可能会变available.signal();}return true;} finally {//释放锁lock.unlock();}}
take
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;//加锁lock.lockInterruptibly();try {for (;;) {//第一个节点E first = q.peek();//如果为空阻塞,available.await()if (first == null)available.await();else {//如果延迟时间到了,出队long delay = first.getDelay(NANOSECONDS);if (delay <= 0L)return q.poll();first = null;//下面的就是延迟时间还没到//如果已经有leader了,就不带超时时间的阻塞,后续由leader唤醒if (leader != null)available.await();//如果还没有leader,就记录leader是当前线程带并超时时间的阻塞else {//记录leader是当前线程Thread thisThread = Thread.currentThread();leader = thisThread;try {//带超时时间的阻塞available.awaitNanos(delay);} finally {//阻塞时间到了,清空leaderif (leader == thisThread)leader = null;}}}}} finally {//如果leader为空(leader的阻塞时间已到,此时leader已经获取到资源了)//且队列中还有资源//就唤醒后面等待的线程if (leader == null && q.peek() != null)available.signal();//释放锁lock.unlock();}}