大纲
1.并发安全的数组列表CopyOnWriteArrayList
2.并发安全的链表队列ConcurrentLinkedQueue
3.并发编程中的阻塞队列概述
4.JUC的各种阻塞队列介绍
5.LinkedBlockingQueue的具体实现原理
6.基于两个队列实现的集群同步机制
4.JUC的各种阻塞队列介绍
(1)基于数组的阻塞队列ArrayBlockingQueue
(2)基于链表的阻塞队列LinkedBlockingQueue
(3)优先级阻塞队列PriorityBlockingQueue
(4)延迟阻塞队列DelayQueue
(5)无存储结构的阻塞队列SynchronousQueue
(6)阻塞队列结合体LinkedTransferQueue
(7)双向阻塞队列LinkedBlockingDeque
(1)基于数组的阻塞队列ArrayBlockingQueue
ArrayBlockingQueue是一个基于数组实现的阻塞队列。其构造方法可以指定:数组的长度、公平还是非公平、数组的初始集合。
ArrayBlockingQueue会通过ReentrantLock来解决线程竞争的问题,以及采用Condition来解决线程的唤醒与阻塞的问题。
//A bounded BlockingQueue backed by an array.
//This queue orders elements FIFO (first-in-first-out).
//The head of the queue is that element that has been on the queue the longest time.
//The tail of the queue is that element that has been on the queue the shortest time.
//New elements are inserted at the tail of the queue,
//and the queue retrieval operations obtain elements at the head of the queue.
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { //The queued itemsfinal Object[] items;//items index for next take, poll, peek or removeint takeIndex;//items index for next put, offer, or addint putIndex;//Number of elements in the queueint count;//Main lock guarding all accessfinal ReentrantLock lock;//Condition for waiting takesprivate final Condition notEmpty;//Condition for waiting putsprivate final Condition notFull;//Creates an ArrayBlockingQueue with the given (fixed) capacity and default access policy.public ArrayBlockingQueue(int capacity) {this(capacity, false);}//Creates an ArrayBlockingQueue with the given (fixed) capacity and the specified access policy.public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0) {throw new IllegalArgumentException();}this.items = new Object[capacity];lock = new ReentrantLock(fair);notEmpty = lock.newCondition();notFull = lock.newCondition();}//Inserts the specified element at the tail of this queue, //waiting for space to become available if the queue is full.public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == items.length) {notFull.await();}enqueue(e);} finally {lock.unlock();}}//Inserts element at current put position, advances, and signals.//Call only when holding lock.private void enqueue(E x) {final Object[] items = this.items;items[putIndex] = x;if (++putIndex == items.length) {putIndex = 0;}count++;notEmpty.signal();}public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0) {notEmpty.await();}return dequeue();} finally {lock.unlock();}}//Returns the number of elements in this queue.public int size() {final ReentrantLock lock = this.lock;lock.lock();try {return count;} finally {lock.unlock();}}...
}
(2)基于链表的阻塞队列LinkedBlockingQueue
LinkedBlockingQueue是一个基于链表实现的阻塞队列,它可以不指定阻塞队列的长度,它的默认长度是Integer.MAX_VALUE。由于这个默认长度非常大,一般也称LinkedBlockingQueue为无界队列。
//An optionally-bounded BlockingQueue based on linked nodes.
//This queue orders elements FIFO (first-in-first-out).
//The head of the queue is that element that has been on the queue the longest time.
//The tail of the queue is that element that has been on the queue the shortest time.
//New elements are inserted at the tail of the queue,
//and the queue retrieval operations obtain elements at the head of the queue.
//Linked queues typically have higher throughput than array-based queues
//but less predictable performance in most concurrent applications.
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {...//The capacity bound, or Integer.MAX_VALUE if noneprivate final int capacity;//Current number of elementsprivate final AtomicInteger count = new AtomicInteger();//Head of linked list.transient Node<E> head;//Tail of linked list.private transient Node<E> last;//Lock held by take, poll, etcprivate final ReentrantLock takeLock = new ReentrantLock();//Lock held by put, offer, etcprivate final ReentrantLock putLock = new ReentrantLock();//Wait queue for waiting takesprivate final Condition notEmpty = takeLock.newCondition();//Wait queue for waiting putsprivate final Condition notFull = putLock.newCondition();//Creates a LinkedBlockingQueue with a capacity of Integer#MAX_VALUE.public LinkedBlockingQueue() {this(Integer.MAX_VALUE);}//Creates a LinkedBlockingQueue with the given (fixed) capacity.public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null);}//Inserts the specified element at the tail of this queue, //waiting if necessary for space to become available.public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {while (count.get() == capacity) {notFull.await();}enqueue(node);c = count.getAndIncrement();if (c + 1 < capacity) {notFull.signal();}} finally {putLock.unlock();}if (c == 0) {signalNotEmpty();}}//Links node at end of queue.private void enqueue(Node<E> node) {last = last.next = node;}//Signals a waiting take. Called only from put/offer (which do not otherwise ordinarily lock takeLock.)private void signalNotEmpty() {final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {notEmpty.signal();} finally {takeLock.unlock();}}public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {while (count.get() == 0) {notEmpty.await();}x = dequeue();c = count.getAndDecrement();if (c > 1) {notEmpty.signal();}} finally {takeLock.unlock();}if (c == capacity) {signalNotFull();}return x;}private E dequeue() {Node<E> h = head;Node<E> first = h.next;h.next = h; // help GChead = first;E x = first.item;first.item = null;return x;}public int size() {return count.get();}...
}
(3)优先级阻塞队列PriorityBlockingQueue
PriorityBlockingQueue是一个支持自定义元素优先级的无界阻塞队列。默认情况下添加的元素采用自然顺序升序排列,当然可以通过实现元素的compareTo()方法自定义优先级规则。
PriorityBlockingQueue是基于数组实现的,这个数组会自动进行动态扩容。在应用方面,消息中间件可以基于优先级阻塞队列来实现。
//An unbounded BlockingQueue that uses the same ordering rules as class PriorityQueue and supplies blocking retrieval operations.
//While this queue is logically unbounded, attempted additions may fail due to resource exhaustion (causing OutOfMemoryError).
//This class does not permit null elements.
public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { //Priority queue represented as a balanced binary heap: the two children of queue[n] are queue[2*n+1] and queue[2*(n+1)].//The priority queue is ordered by comparator, or by the elements' natural ordering, //if comparator is null: For each node n in the heap and each descendant d of n, n <= d.//The element with the lowest value is in queue[0], assuming the queue is nonempty.private transient Object[] queue;//The number of elements in the priority queue.private transient int size;//The comparator, or null if priority queue uses elements' natural ordering.private transient Comparator<? super E> comparator;//Lock used for all public operationsprivate final ReentrantLock lock;//Condition for blocking when emptyprivate final Condition notEmpty;//Spinlock for allocation, acquired via CAS.private transient volatile int allocationSpinLock;//Creates a PriorityBlockingQueue with the default initial capacity (11) that //orders its elements according to their Comparable natural ordering.public PriorityBlockingQueue() {this(DEFAULT_INITIAL_CAPACITY, null);}//Creates a PriorityBlockingQueue with the specified initial capacity that //orders its elements according to their Comparable natural ordering.public PriorityBlockingQueue(int initialCapacity) {this(initialCapacity, null);}//Creates a PriorityBlockingQueue with the specified initial capacity that orders its elements according to the specified comparator.public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) {if (initialCapacity < 1) {throw new IllegalArgumentException();}this.lock = new ReentrantLock();this.notEmpty = lock.newCondition();this.comparator = comparator;this.queue = new Object[initialCapacity];}//Inserts the specified element into this priority queue.//As the queue is unbounded, this method will never block.public void put(E e) {offer(e); // never need to block}//Inserts the specified element into this priority queue.//As the queue is unbounded, this method will never return false.public boolean offer(E e) {if (e == null) {throw new NullPointerException();}final ReentrantLock lock = this.lock;lock.lock();int n, cap;Object[] array;while ((n = size) >= (cap = (array = queue).length)) {tryGrow(array, cap);}try {Comparator<? super E> cmp = comparator;if (cmp == null) {siftUpComparable(n, e, array);} else {siftUpUsingComparator(n, e, array, cmp);}size = n + 1;notEmpty.signal();} finally {lock.unlock();}return true;}//Tries to grow array to accommodate at least one more element (but normally expand by about 50%), //giving up (allowing retry) on contention (which we expect to be rare). Call only while holding lock.private void tryGrow(Object[] array, int oldCap) {lock.unlock(); // must release and then re-acquire main lockObject[] newArray = null;if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) {try {int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : (oldCap >> 1));if (newCap - MAX_ARRAY_SIZE > 0) {int minCap = oldCap + 1;if (minCap < 0 || minCap > MAX_ARRAY_SIZE) {throw new OutOfMemoryError();}newCap = MAX_ARRAY_SIZE;}if (newCap > oldCap && queue == array) {newArray = new Object[newCap];}} finally {allocationSpinLock = 0;}}if (newArray == null) {// back off if another thread is allocatingThread.yield();}lock.lock();if (newArray != null && queue == array) {queue = newArray;System.arraycopy(array, 0, newArray, 0, oldCap);}}private static <T> void siftUpComparable(int k, T x, Object[] array) {Comparable<? super T> key = (Comparable<? super T>) x;while (k > 0) {int parent = (k - 1) >>> 1;Object e = array[parent];if (key.compareTo((T) e) >= 0) {break;}array[k] = e;k = parent;}array[k] = key;}private static <T> void siftUpUsingComparator(int k, T x, Object[] array, Comparator<? super T> cmp) {while(k > 0) {int parent = (k - 1) >>> 1;Object e = array[parent];if (cmp.compare(x, (T) e) >= 0) {break;}array[k] = e;k = parent;}array[k] = x;}public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();E result;try {while ( (result = dequeue()) == null) {notEmpty.await();}} finally {lock.unlock();}return result;}public int size() {final ReentrantLock lock = this.lock;lock.lock();try {return size;} finally {lock.unlock();}}...
}
(4)延迟阻塞队列DelayQueue
DelayQueue是一个支持延迟获取元素的无界阻塞队列,它是基于优先级队列PriorityQueue实现的。
往DelayQueue队列插入元素时,可以按照自定义的delay时间进行排序。也就是队列中的元素顺序是按照到期时间排序的,只有delay时间小于或等于0的元素才能够被取出。
DelayQueue的应用场景有:
一.订单超时支付需要自动取消订单
二.任务超时处理需要自动丢弃任务
三.消息中间件的实现
//An unbounded BlockingQueue of Delayed elements, in which an element can only be taken when its delay has expired.
//The head of the queue is that Delayed element whose delay expired furthest in the past.
//If no delay has expired there is no head and poll will return null.
//Expiration occurs when an element's getDelay(TimeUnit.NANOSECONDS) method returns a value less than or equal to zero.
//Even though unexpired elements cannot be removed using take or poll, they are otherwise treated as normal elements.
//For example, the size method returns the count of both expired and unexpired elements.
//This queue does not permit null elements.
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {private final transient ReentrantLock lock = new ReentrantLock();private final PriorityQueue<E> q = new PriorityQueue<E>();//Thread designated to wait for the element at the head of the queue.//When a thread becomes the leader, it waits only for the next delay to elapse, but other threads await indefinitely.//The leader thread must signal some other thread before returning from take() or poll(...), //unless some other thread becomes leader in the interim. //Whenever the head of the queue is replaced with an element with an earlier expiration time, //the leader field is invalidated by being reset to null, and some waiting thread, //but not necessarily the current leader, is signalled. //So waiting threads must be prepared to acquire and lose leadership while waiting.private Thread leader = null;//Condition signalled when a newer element becomes available at the head of the queue or a new thread may need to become leader.private final Condition available = lock.newCondition();//Creates a new {@code DelayQueue} that is initially empty.public DelayQueue() {}//Inserts the specified element into this delay queue. //As the queue is unbounded this method will never block.public void put(E e) {offer(e);}//Inserts the specified element into this delay queue.public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {q.offer(e);if (q.peek() == e) {leader = null;available.signal();}return true;} finally {lock.unlock();}}//Retrieves and removes the head of this queue, //waiting if necessary until an element with an expired delay is available on this queue.public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {E first = q.peek();if (first == null) {available.await();} else {long delay = first.getDelay(NANOSECONDS);if (delay <= 0) {return q.poll();} first = null; // don't retain ref while waitingif (leader != null) {available.await();} else {Thread thisThread = Thread.currentThread();leader = thisThread;try {available.awaitNanos(delay);} finally {if (leader == thisThread) {leader = null;}}}}}} finally {if (leader == null && q.peek() != null) {available.signal();}lock.unlock();}}...
}public class PriorityQueue<E> extends AbstractQueue<E> implements java.io.Serializable {//Priority queue represented as a balanced binary heap: the two children of queue[n] are queue[2*n+1] and queue[2*(n+1)].//The priority queue is ordered by comparator, or by the elements' natural ordering, if comparator is null: //For each node n in the heap and each descendant d of n, n <= d. //The element with the lowest value is in queue[0], assuming the queue is nonempty.transient Object[] queue; //The number of elements in the priority queue.private int size = 0;//The comparator, or null if priority queue uses elements' natural ordering.private final Comparator<? super E> comparator;public E peek() {return (size == 0) ? null : (E) queue[0];}//Inserts the specified element into this priority queue.public boolean offer(E e) {if (e == null) {throw new NullPointerException();}modCount++;int i = size;if (i >= queue.length) {grow(i + 1);}size = i + 1;if (i == 0) {queue[0] = e;} else {siftUp(i, e);}return true;}//Increases the capacity of the array.private void grow(int minCapacity) {int oldCapacity = queue.length;// Double size if small; else grow by 50%int newCapacity = oldCapacity + ((oldCapacity < 64) ? (oldCapacity + 2) : (oldCapacity >> 1));// overflow-conscious codeif (newCapacity - MAX_ARRAY_SIZE > 0) {newCapacity = hugeCapacity(minCapacity);}queue = Arrays.copyOf(queue, newCapacity);}private void siftUp(int k, E x) {if (comparator != null) {siftUpUsingComparator(k, x);} else {siftUpComparable(k, x);}}@SuppressWarnings("unchecked")private void siftUpComparable(int k, E x) {Comparable<? super E> key = (Comparable<? super E>) x;while (k > 0) {int parent = (k - 1) >>> 1;Object e = queue[parent];if (key.compareTo((E) e) >= 0) {break;}queue[k] = e;k = parent;}queue[k] = key;}@SuppressWarnings("unchecked")private void siftUpUsingComparator(int k, E x) {while (k > 0) {int parent = (k - 1) >>> 1;Object e = queue[parent];if (comparator.compare(x, (E) e) >= 0) {break;}queue[k] = e;k = parent;}queue[k] = x;}...
}
(5)无存储结构的阻塞队列SynchronousQueue
SynchronousQueue的内部没有容器来存储数据,因此当生产者往其添加一个元素而没有消费者去获取元素时,生产者会阻塞。当消费者往其获取一个元素而没有生产者去添加元素时,消费者也会阻塞。
SynchronousQueue的本质是借助了无容量存储的特点,来实现生产者线程和消费者线程的即时通信,所以它特别适合在两个线程之间及时传递数据。
线程池是基于阻塞队列来实现生产者/消费者模型的。当向线程池提交任务时,首先会把任务放入阻塞队列中,然后线程池中会有对应的工作线程专门处理阻塞队列中的任务。
Executors.newCachedThreadPool()就是基于SynchronousQueue来实现的,它会返回一个可以缓存的线程池。如果这个线程池大小超过处理当前任务所需的数量,会灵活回收空闲线程。当任务数量增加时,这个线程池会不断创建新的工作线程来处理这些任务。
public class Executors {...//Creates a thread pool that creates new threads as needed, //but will reuse previously constructed threads when they are available.//These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks.//Calls to execute will reuse previously constructed threads if available. //If no existing thread is available, a new thread will be created and added to the pool. //Threads that have not been used for sixty seconds are terminated and removed from the cache. //Thus, a pool that remains idle for long enough will not consume any resources. //Note that pools with similar properties but different details (for example, timeout parameters)//may be created using ThreadPoolExecutor constructors.public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());}...
}
(6)阻塞队列结合体LinkedTransferQueue
LinkedTransferQueue是一个基于链表实现的无界阻塞TransferQueue。
阻塞队列的特性是根据队列的数据情况来阻塞生产者线程或消费者线程,TransferQueue的特性是生产者线程生产数据后必须等消费者消费才返回。
LinkedTransferQueue是TransferQueue和LinkedBlockingQueue的结合体,而SynchronousQueue内部其实也是基于TransferQueue来实现的,所以LinkedTransferQueue是带有阻塞队列功能的SynchronousQueue。
(7)双向阻塞队列LinkedBlockingDeque
LinkedBlockingDeque是一个基于链表实现的双向阻塞队列,双向队列的两端都可以插入和移除元素,可减少多线程并发下的一半竞争。
5.LinkedBlockingQueue的具体实现原理
(1)阻塞队列的设计分析
(2)有界队列LinkedBlockingQueue
(3)LinkedBlockingQueue的put()方法
(4)LinkedBlockingQueue的take()方法
(5)LinkedBlockingQueue使用两把锁拆分锁功能
(6)LinkedBlockingQueue的size()方法和迭代
(7)对比LinkedBlockingQueue链表队列和ArrayBlockingQueue数组队列
(1)阻塞队列的设计分析
阻塞队列的特性为:如果队列为空,消费者线程会被阻塞。如果队列满了,生产者线程会被阻塞。
为了实现这个特性:如何让线程在满足某个特定条件的情况下实现阻塞和唤醒?阻塞队列中的数据应该用什么样的容器来存储?
线程的阻塞和唤醒,可以使用wait/notify或者Condition。阻塞队列中数据的存储,可以使用数组或者链表。
(2)有界队列LinkedBlockingQueue
一.并发安全的无界队列
比如ConcurrentLinkedQueue,是没有边界没有大小限制的。它就是一个单向链表,可以无限制的往里面去存放数据。如果不停地往无界队列里添加数据,那么可能会导致内存溢出。
二.并发安全的有界队列
比如LinkedBlockingQueue,是有边界的有大小限制的。它也是一个单向链表,如果超过了限制,往队列里添加数据就会被阻塞。因此可以限制内存队列的大小,避免内存队列无限增长,最后撑爆内存。
(3)LinkedBlockingQueue的put()方法
put()方法是阻塞添加元素的方法,当队列满时,阻塞添加元素的线程。
首先把添加的元素封装成一个Node对象,该对象表示链表中的一个结点。
然后使用ReentrantLock.lockInterruptibly()方法来获取一个可被中断的锁,加锁的目的是保证数据添加到队列过程中的安全性 + 避免队列长度超阈值。
接着调用enqueue()方法把封装的Node对象存储到链表尾部,然后通过AtomicInteger来递增当前阻塞队列中的元素个数。
最后根据AtomicInteger类型的变量判断队列元素是否已超阈值。
注意:这里用到了一个很重要的属性notFull。notFull是一个Condition对象,用来阻塞和唤醒生产者线程。如果队列元素个数等于最大容量,就调用notFull.await()阻塞生产者线程。如果队列元素个数小于最大容量,则调用notFull.signal()唤醒生产者线程。
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {...private final int capacity;//阻塞队列的最大容量,默认是Integer.MAX_VALUEprivate final AtomicInteger count = new AtomicInteger();//阻塞队列的元素个数transient Node<E> head;//链表的头结点private transient Node<E> last;//链表的尾结点//使用两把锁,使put()和take()的锁分离,提升并发性能private final ReentrantLock takeLock = new ReentrantLock();//出队的锁private final ReentrantLock putLock = new ReentrantLock();//入队的锁//使用两个Condition,分别阻塞和唤醒出队时的线程和入队时的线程private final Condition notEmpty = takeLock.newCondition();//出队的等待队列conditionprivate final Condition notFull = putLock.newCondition();//入队的等待队列condition//Creates a LinkedBlockingQueue with a capacity of Integer#MAX_VALUE.public LinkedBlockingQueue() {this(Integer.MAX_VALUE);}//Creates a LinkedBlockingQueue with the given (fixed) capacity.public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null);}//Inserts the specified element at the tail of this queue, //waiting if necessary for space to become available.public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();int c = -1;//将添加的元素封装成一个Node对象Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;//当前队列元素的数量putLock.lockInterruptibly();//加可被中断的锁try {//注意:这里用到了一个很重要的属性notFull,它是一个Condition对象,用来阻塞和唤醒生产者线程//如果阻塞队列当前的元素个数等于最大容量,就调用notFull.await()方法来阻塞生产者线程while (count.get() == capacity) {notFull.await();//阻塞当前线程,并释放锁}//把封装的Node对象存储到链表中enqueue(node);//通过AtomicInteger来递增当前阻塞队列中的元素个数,用于后续判断是否已超阻塞队列的最大容量c = count.getAndIncrement();//根据AtomicInteger类型的变量判断队列元素是否已超阈值if (c + 1 < capacity) {notFull.signal();}} finally {putLock.unlock();//释放锁}if (c == 0) {signalNotEmpty();}}//Links node at end of queue.private void enqueue(Node<E> node) {//node先成为当前last的next//然后last又指向last的next(即node)last = last.next = node;}//Signals a waiting take. Called only from put/offer (which do not otherwise ordinarily lock takeLock.)private void signalNotEmpty() {final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {notEmpty.signal();} finally {takeLock.unlock();}}...
}
(4)LinkedBlockingQueue的take()方法
take()方法是阻塞获取元素的方法,当队列为空时,阻塞获取元素的线程。
首先使用ReentrantLock.lockInterruptibly()方法来获取一个可被中断的锁。
然后判断元素个数是否为0,若是则通过notEmpty.await()阻塞消费者线程。
否则接着调用dequeue()方法从链表头部获取一个元素,并通过AtomicInteger来递减当前阻塞队列中的元素个数。
最后判断阻塞队列中的元素个数是否大于1,如果是,则调用notEmpty.signal()唤醒被阻塞的消费者线程。
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {...private final int capacity;//阻塞队列的最大容量,默认是Integer.MAX_VALUEprivate final AtomicInteger count = new AtomicInteger();//阻塞队列的元素个数transient Node<E> head;//链表的头结点private transient Node<E> last;//链表的尾结点//使用两把锁,使put()和take()的锁分离,提升并发性能private final ReentrantLock takeLock = new ReentrantLock();//出队的锁private final ReentrantLock putLock = new ReentrantLock();//入队的锁//使用两个Condition,分别阻塞和唤醒出队时的线程和入队时的线程private final Condition notEmpty = takeLock.newCondition();//出队的等待队列conditionprivate final Condition notFull = putLock.newCondition();//入队的等待队列condition//Creates a LinkedBlockingQueue with a capacity of Integer#MAX_VALUE.public LinkedBlockingQueue() {this(Integer.MAX_VALUE);}//Creates a LinkedBlockingQueue with the given (fixed) capacity.public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null);}public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;//获取一个可中断的锁takeLock.lockInterruptibly();try {//判断元素个数是否为0while (count.get() == 0) {notEmpty.await();//阻塞当前线程并释放锁}//调用dequeue()方法从链表中获取一个元素x = dequeue();//通过AtomicInteger来递减当前阻塞队列中的元素个数c = count.getAndDecrement();//判断阻塞队列中的元素个数是否大于1//如果是,则调用notEmpty.signal()唤醒被阻塞的消费者消除if (c > 1) {notEmpty.signal();}} finally {takeLock.unlock();}if (c == capacity) {signalNotFull();}return x;}//首先获取链表的头结点head//然后拿到头结点的下一个结点first//然后把原来的头结点从队列中移除,设置first结点的数据为null,并将first结点设置为新的头结点//最后返回first结点的数据private E dequeue() {Node<E> h = head;//h指向headNode<E> first = h.next;//first指向h的nexth.next = h;// help GChead = first;E x = first.item;first.item = null;return x;}...
}
(5)LinkedBlockingQueue使用两把锁拆分锁功能
两把独占锁可以提升并发性能,因为出队和入队用的是不同的锁。这样在并发出队和入队的时候,出队和入队就可以同时执行,不会锁冲突。
这也是锁优化的一种思想,通过将一把锁按不同的功能进行拆分,使用不同的锁控制不同功能下的并发冲突,从而提升性能。
(6)LinkedBlockingQueue的size()方法和迭代
一.size()方法获取的结果也不是100%准确
LinkedBlockingQueue的size()方法获取元素个数是通过AtomicInteger获取的。
相比ConcurrentLinkedQueue通过遍历队列获取,准确性大很多。
相比CopyOnWriteArrayList通过遍历老副本数组获取,准确性也大很多。
但是相比ConcurrentHashMap通过分段CAS统计,那么准确性则差不多。
注意:LinkedBlockingQueue也不能获取到100%准确的队列元素的个数。除非锁掉整个队列,调用size()时不允许入队和出队,才会是100%准确。因为是完成入队或出队之后,才会对AtomicInteger变量进行递增或递减。
二.迭代时获取两把锁来锁整个队列
LinkedBlockingQueue的遍历会直接锁整个队列,即会先获取两把锁。
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {...private final AtomicInteger count = new AtomicInteger();//阻塞队列的元素个数transient Node<E> head;//链表的头结点private transient Node<E> last;//链表的尾结点//使用两把锁,使put()和take()的锁分离,提升并发性能private final ReentrantLock takeLock = new ReentrantLock();//出队的锁private final ReentrantLock putLock = new ReentrantLock();//入队的锁public int size() {return count.get();}public Iterator<E> iterator() {return new Itr();}private class Itr implements Iterator<E> {private Node<E> current;private Node<E> lastRet;private E currentElement;Itr() {fullyLock();try {current = head.next;if (current != null) {currentElement = current.item;}} finally {fullyUnlock();}}...}void fullyLock() {putLock.lock();takeLock.lock();}void fullyUnlock() {takeLock.unlock();putLock.unlock();}...
}
(7)对比LinkedBlockingQueue链表队列和ArrayBlockingQueue数组队列
一.LinkedBlockingQueue是基于链表实现的有界阻塞队列,ArrayBlockingQueue是基于数组实现的有界阻塞队列
二.ArrayBlockingQueue的整体实现原理与LinkedBlockingQueue的整体实现原理是一样的
三.LinkedBlockingQueue需要使用两把独占锁,分别锁出队和入队的场景
四.ArrayBlockingQueue只使用一把锁,锁整个数组,所以其入队和出队不能同时进行
五.ArrayBlockingQueue执行size()方法获取元素个数时会直接加独占锁
六.ArrayBlockingQueue和LinkedBlockingQueue执行迭代方法时都会锁数据
6.基于两个队列实现的集群同步机制
(1)服务注册中心集群需要实现的功能
(2)基于两个队列实现的集群同步机制
(3)使用ConcurrentLinkedQueue实现第一个队列
(4)使用LinkedBlockingQueue实现第二个队列
(5)集群同步机制的具体实现
(1)服务注册中心集群需要实现的功能
服务实例向任何一个服务注册中心实例发送注册、下线、心跳的请求,该服务注册中心实例都需要将这些信息同步到其他的服务注册中心实例上,从而确保所有服务注册中心实例的内存注册表的数据是一致的。
(2)基于两个队列实现的集群同步机制
某服务注册中心实例接收到服务实例A的请求时,首先会把服务实例A的服务请求信息存储到本地的内存注册表里,也就是把服务实例A的服务请求信息写到第一个内存队列中,之后该服务注册中心实例对服务实例A的请求处理就可以结束并返回。
接着该服务注册中心实例会有一个后台线程消费第一个内存队列里的数据,把消费到的第一个内存队列的数据batch打包然后写到第二个内存队列里。
最后该服务注册中心实例还有一个后台线程消费第二个内存队列里的数据,把消费到的第二个内存队列的数据同步到其他服务注册中心实例中。
(3)使用ConcurrentLinkedQueue实现第一个队列
首先有两种队列:
一是无界队列ConcurrentLinkedQueue,基于CAS实现,并发性能很高。
二是有界队列LinkedBlockingQueue,基于两把锁实现,并发性能一般。
LinkedBlockingQueue默认的队列长度是MAX_VALUE,所以可以看成是无界队列。但是也可以指定正常大小的队列长度,从而实现入队的阻塞,避免耗尽内存。
当服务注册中心实例接收到各种请求时,会先将请求信息放入第一个队列。所以第一个队列会存在高并发写的情况,因此LinkedBlockingQueue不合适。
因为LinkedBlockingQueue属于阻塞队列,如果LinkedBlockingQueue满了,那么服务注册中心实例中的,处理服务请求的线程,就会被阻塞住。而且LinkedBlockingQueue的并发性能也不是太高,要获取独占锁才能写,所以最好还是使用无界队列ConcurrentLinkedQueue来实现第一个队列。
(4)使用LinkedBlockingQueue实现第二个队列
消费第一个内存队列的数据时,可以按时间来进行batch打包,比如每隔500ms才将消费到的所有数据打包成一个batch消息。接着再将这个batch信息放入到第二个内存队列中,这样消费第二个队列的数据时,只需同步batch信息到集群其他实例即可。
可见对第二个队列进行的入队和出队操作是由少数的后台线程来执行的,因此可以使用有界队列LinkedBlockingQueue来实现第二个内存队列。
此外还要估算有界队列LinkedBlockingQueue的队列长度应设置多少才合适。假如每一条需要同步给集群其他实例的请求信息,有6个字段,占30字节。平均每一条batch信息会包含100条请求信息,也就是会占3000字节 = 3KB。那么1000条batch消息,才占用3000KB = 3MB。因此可以设置第二个内存队列LinkedBlockingQueue的长度为1000。
(5)集群同步机制的具体实现
//集群同步组件
public class PeersReplicator {//集群同步生成batch的间隔时间:500msprivate static final long PEERS_REPLICATE_BATCH_INTERVAL = 500;private static final PeersReplicator instance = new PeersReplicator();private PeersReplicator() {//启动接收请求和打包batch的线程AcceptorBatchThread acceptorBatchThread = new AcceptorBatchThread();acceptorBatchThread.setDaemon(true); acceptorBatchThread.start();//启动同步发送batch的线程PeersReplicateThread peersReplicateThread = new PeersReplicateThread();peersReplicateThread.setDaemon(true);peersReplicateThread.start(); }public static PeersReplicator getInstance() {return instance;}//第一个内存队列:处理高并发的服务请求,所以存在高并发的写入情况,无界队列private ConcurrentLinkedQueue<AbstractRequest> acceptorQueue = new ConcurrentLinkedQueue<AbstractRequest>();//第二个内存队列:有界队列,用于同步batch消息到其他集群实例private LinkedBlockingQueue<PeersReplicateBatch> replicateQueue = new LinkedBlockingQueue<PeersReplicateBatch>(10000); //同步服务注册请求public void replicateRegister(RegisterRequest request) {request.setType(AbstractRequest.REGISTER_REQUEST); //将请求消息放入第一个内存队列acceptorQueue.offer(request);}//同步服务下线请求public void replicateCancel(CancelRequest request) {request.setType(AbstractRequest.CANCEL_REQUEST);//将请求消息放入第一个内存队列acceptorQueue.offer(request);}//同步发送心跳请求public void replicateHeartbeat(HeartbeatRequest request) {request.setType(AbstractRequest.HEARTBEAT_REQUEST);//将请求消息放入第一个内存队列acceptorQueue.offer(request);}//负责接收数据以及打包为batch的后台线程class AcceptorBatchThread extends Thread {long latestBatchGeneration = System.currentTimeMillis();@Overridepublic void run() {while(true) {try {//每隔500ms生成一个batchPeersReplicateBatch batch = new PeersReplicateBatch();long now = System.currentTimeMillis();if (now - latestBatchGeneration >= PEERS_REPLICATE_BATCH_INTERVAL) {//已经到了500ms的时间间隔//将batch消息放入第二个内存队列replicateQueue.offer(batch);//更新latestBatchGenerationlatestBatchGeneration = System.currentTimeMillis();//重置batchbatch = new PeersReplicateBatch();} else {//还没到500ms的时间间隔//从第一层队列获取数据,然后batch放入到第二层队列中AbstractRequest request = acceptorQueue.poll();if (request != null) {batch.add(request); } else {Thread.sleep(100);} }} catch (Exception e) {e.printStackTrace(); }}}}//集群同步线程class PeersReplicateThread extends Thread {@Overridepublic void run() {while(true) {try {PeersReplicateBatch batch = replicateQueue.take();if (batch != null) {//遍历其他的register-server地址//给每个地址的register-server都发送一个http请求同步batchSystem.out.println("给其他的register-server发送请求,同步batch......"); }} catch (Exception e) {e.printStackTrace(); }}}}
}//用于进行批量同步的batch消息
public class PeersReplicateBatch {private List<AbstractRequest> requests = new ArrayList<AbstractRequest>();public void add(AbstractRequest request) {this.requests.add(request);}public List<AbstractRequest> getRequests() {return requests;}public void setRequests(List<AbstractRequest> requests) {this.requests = requests;}
}//负责接收和处理register-client发送过来的请求的
public class RegisterServerController {//服务注册表private ServiceRegistry registry = ServiceRegistry.getInstance();//服务注册表的缓存private ServiceRegistryCache registryCache = ServiceRegistryCache.getInstance();//集群同步组件private PeersReplicator peersReplicator = PeersReplicator.getInstance();//服务注册public RegisterResponse register(RegisterRequest registerRequest) {RegisterResponse registerResponse = new RegisterResponse();try {//在注册表中加入这个服务实例ServiceInstance serviceInstance = new ServiceInstance();serviceInstance.setHostname(registerRequest.getHostname()); serviceInstance.setIp(registerRequest.getIp()); serviceInstance.setPort(registerRequest.getPort()); serviceInstance.setServiceInstanceId(registerRequest.getServiceInstanceId()); serviceInstance.setServiceName(registerRequest.getServiceName());registry.register(serviceInstance);//更新自我保护机制的阈值synchronized (SelfProtectionPolicy.class) {SelfProtectionPolicy selfProtectionPolicy = SelfProtectionPolicy.getInstance();selfProtectionPolicy.setExpectedHeartbeatRate(selfProtectionPolicy.getExpectedHeartbeatRate() + 2);selfProtectionPolicy.setExpectedHeartbeatThreshold((long)(selfProtectionPolicy.getExpectedHeartbeatRate() * 0.85));}//过期掉注册表缓存registryCache.invalidate();//进行集群同步peersReplicator.replicateRegister(registerRequest);registerResponse.setStatus(RegisterResponse.SUCCESS); } catch (Exception e) {e.printStackTrace(); registerResponse.setStatus(RegisterResponse.FAILURE);}return registerResponse;}//服务下线 public void cancel(CancelRequest cancelRequest) {//从服务注册中摘除实例registry.remove(cancelRequest.getServiceName(), cancelRequest.getServiceInstanceId());//更新自我保护机制的阈值synchronized (SelfProtectionPolicy.class) {SelfProtectionPolicy selfProtectionPolicy = SelfProtectionPolicy.getInstance();selfProtectionPolicy.setExpectedHeartbeatRate(selfProtectionPolicy.getExpectedHeartbeatRate() - 2);selfProtectionPolicy.setExpectedHeartbeatThreshold((long)(selfProtectionPolicy.getExpectedHeartbeatRate() * 0.85));}//过期掉注册表缓存registryCache.invalidate();//进行集群同步peersReplicator.replicateCancel(cancelRequest); }//发送心跳public HeartbeatResponse heartbeat(HeartbeatRequest heartbeatRequest) { HeartbeatResponse heartbeatResponse = new HeartbeatResponse();try {//获取服务实例ServiceInstance serviceInstance = registry.getServiceInstance(heartbeatRequest.getServiceName(), heartbeatRequest.getServiceInstanceId());if (serviceInstance != null) {serviceInstance.renew();}//记录一下每分钟的心跳的次数HeartbeatCounter heartbeatMessuredRate = HeartbeatCounter.getInstance();heartbeatMessuredRate.increment();//进行集群同步peersReplicator.replicateHeartbeat(heartbeatRequest);heartbeatResponse.setStatus(HeartbeatResponse.SUCCESS); } catch (Exception e) {e.printStackTrace(); heartbeatResponse.setStatus(HeartbeatResponse.FAILURE); }return heartbeatResponse;}//同步batch数据public void replicateBatch(PeersReplicateBatch batch) {for (AbstractRequest request : batch.getRequests()) {if (request.getType().equals(AbstractRequest.REGISTER_REQUEST)) {register((RegisterRequest) request);} else if (request.getType().equals(AbstractRequest.CANCEL_REQUEST)) {cancel((CancelRequest) request);} else if (request.getType().equals(AbstractRequest.HEARTBEAT_REQUEST)) {heartbeat((HeartbeatRequest) request); }}}//拉取全量注册表public Applications fetchFullRegistry() {return (Applications) registryCache.get(CacheKey.FULL_SERVICE_REGISTRY);}//拉取增量注册表public DeltaRegistry fetchDeltaRegistry() {return (DeltaRegistry) registryCache.get(CacheKey.DELTA_SERVICE_REGISTRY); }
}