count表示当前队列中的元素数量,LinkedBlockingQueue的入队列和出队列使用了两个不同的lock对象,因此无论是在入队列还是出队列,都会涉及对元素数量的并发修改,因此这里使用了一个原子操作类来解决对同一个变量进行并发修改的线程安全问题。
head和last分别表示链表的头部和尾部;
takeLock表示元素出队列时线程所获取的锁,当执行take、poll等操作时线程获取;notEmpty当队列为空时,通过该Condition让获取元素的线程处于等待状态;
putLock表示元素入队列时线程所获取的锁,当执行put、offer等操作时获取;notFull当队列容量达到capacity时,通过该Condition让加入元素的线程处于等待状态。
其次,LinkedBlockingQueue有三个构造方法,分别如下:
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); } public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } }默认构造函数直接调用LinkedBlockingQueue(int capacity),LinkedBlockingQueue(int capacity)会初始化首尾节点,并置位null。LinkedBlockingQueue(Collection<? extends E> c)在初始化队列的同时,将一个集合的全部元素加入队列。
最后分析下put和take的过程,这里重点关注:LinkedBlockingQueue如何实现添加/移除并行的?
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); } public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }之所以把put和take放在一起,是因为它们是一对互逆的过程:
put在插入元素前首先获得putLock和当前队列的元素数量,take在去除元素前首先获得takeLock和当前队列的元素数量;
put时需要判断当前队列是否已满,已满时当前线程进行等待,take时需要判断队列是否已空,队列为空时当前线程进行等待;
put调用enqueue在队尾插入元素,并修改尾指针,take调用dequeue将head指向原来first的位置,并将first的数据域置位null,实现删除原first指针,并产生新的head,同时,切断原head节点的引用,便于垃圾回收。
private void enqueue(Node<E> node) { last = last.next = node; } private E dequeue() { Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }最后,put根据count决定是否触发队列未满和队列空;take根据count决定是否触发队列未空和队列满。
回到刚才的问题:LinkedBlockingQueue如何实现添加/移除并行的?
LinkedBlockingQueue在入队列和出队列时使用的是不同的Lock,这也意味着它们之间的操作不会存在互斥。在多个CPU的情况下,可以做到在同一时刻既消费、又生产,做到并行处理。
同样的,LinkedBlockingDeque在LinkedBlockingQueue的基础上,增加了双向操作的属性。继续以put和take为例,LinkedBlockingDeque增加了putFirst/putLast、takeFirst/takeLast方法,分别用于在队列头、尾进行添加和删除。与LinkedBlockingQueue不同的是,LinkedBlockingDeque的入队列和出队列不再使用不同的Lock。
final ReentrantLock lock = new ReentrantLock(); private final Condition notEmpty = lock.newCondition(); private final Condition notFull = lock.newCondition();其中,lock表示读写的主锁,notEmpty和notFull依然表示相应的控制线程状态条件量。以putFirst和takeFirst为例:
public void putFirst(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); Node<E> node = new Node<E>(e); final ReentrantLock lock = this.lock; lock.lock(); try { while (!linkFirst(node)) notFull.await(); } finally { lock.unlock(); } }
