获取数据
take()方法源码如下:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock; //(1)
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
//(2)
return dequeue();
} finally {
lock.unlock();
//(9)
}
}
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
//(3)
items[takeIndex] = null;
//(4)
if (++takeIndex == items.length)
takeIndex = 0;
//(5)
count--;
//(6)
if (itrs != null)
itrs.elementDequeued();
//(7)
notFull.signal();
//(8)
return x;
}
(1):获取互斥锁
(2):如果count为0,即队列为空,则释放互斥锁,然后挂起当前线程
(3):根据takeIndex索引到数组中获取具体的值,并赋值给x
(4):赋值完成后,takeIndex索引位置数据置null,便于回收
(5):takeIndex加1,然后和队列长度比较,如果相等,即已经读取到队列尾部,takeIndex置0
(6):获取后,将队列元素个数count减1
(7):维护和queue相关的迭代器
(8):唤醒等待插入元素的线程
(9):释放互斥锁
可以看到ArrayBlockingQueue每次获取元素后,都会唤醒等待插入元素的线程。
迭代器
在分析源码前,我们先看在一个迭代器的示例
package com.aidodoo.java.concurrent;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* Created by zhangkh on 2018/9/10.
*/
public class ArrayBlockingQueueIterDemo {
public static void main(String[] args) throws InterruptedException{
BlockingQueue<String> queue=new ArrayBlockingQueue(5);
queue.put("hadoop");
queue.put("spark");
queue.put("storm");
queue.put("flink");
Iterator<String> iterator1 = queue.iterator();
System.out.println( queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println();
while(iterator1.hasNext()) {
System.out.println(iterator1.next());
}
System.out.println();
Iterator<String> iterator2 = queue.iterator();
while(iterator2.hasNext()) {
System.out.println(iterator2.next());
}
}
}
程序输出如下:
hadoop
spark
storm
hadoop
flink
flink
我们结合这个示例来具体分析数据插入和获取时,内部成员变量的值
当分别插入hadoop、spark、storm、flink四个元素后,内部变量的值如下:
此时,ArrayBlockingQueue的成员变量的值itrs为null。
调用iterator()方法后,源码如下:
public Iterator<E> iterator() {
return new Itr();
//(1)
}
Itr() {
lastRet = NONE;
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
//(2)
try {
if (count == 0) {
//(3)
cursor = NONE;
nextIndex = NONE;
prevTakeIndex = DETACHED;
} else {
final int takeIndex = ArrayBlockingQueue.this.takeIndex;
prevTakeIndex = takeIndex;
nextItem = itemAt(nextIndex = takeIndex); //(4)
cursor = incCursor(takeIndex);
//(5)
if (itrs == null) {
itrs = new Itrs(this);
//(6)
} else {
itrs.register(this);
//(7)
itrs.doSomeSweeping(false);
}
prevCycles = itrs.cycles;
}
} finally {
lock.unlock();
//(8)
}
}
(1):调用内部类Itr的构造方法
(2):获取外部类即ArrayBlockingQueue的锁
(3):没有没有元素,初始化变量值。内部类Itr的成员变量如下:
/** Index to look for new nextItem; NONE at end */
private int cursor;
/** Element to be returned by next call to next(); null if none */
private E nextItem;
/** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
private int nextIndex;
/** Last element returned; null if none or not detached. */
private E lastItem;
/** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
private int lastRet;
/** Previous value of takeIndex, or DETACHED when detached */
private int prevTakeIndex;
/** Previous value of iters.cycles */
private int prevCycles;
(4):将外部类的takeIndex赋值给内部类nextIndex,并获取数组具体的值赋值给nextItem
(5):计算游标cursor的下个值,其中incCursor方法如下:
private int incCursor(int index) {
// assert lock.getHoldCount() == 1;
if (++index == items.length)
index = 0;
if (index == putIndex)
index = NONE;
return index;
}
(6):注册,主要是维护链表
(7):清理itrs
(8):释放外部类的互斥锁
在上面的示例中,调用iterator()方法后,Itr的内部变量值如下:
由于后面三次调用了queue.take(),依次输出hadoop、spark、storm后,相关成员变量的值见图片标识,重点关注takeIndex=3。
当调用next()方法时,代码如下:
public E next() {
final E x = nextItem;
if (x == null)
throw new NoSuchElementException();
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
try {
if (!isDetached())
//(1)
incorporateDequeues();
lastRet = nextIndex;
final int cursor = this.cursor;
if (cursor >= 0) {
nextItem = itemAt(nextIndex = cursor);
this.cursor = incCursor(cursor);
} else {
nextIndex = NONE;
nextItem = null;
}
} finally {
lock.unlock();
}
return x;
}
其中(1)处的isDetached方法如下
boolean isDetached() {
// assert lock.getHoldCount() == 1;
return prevTakeIndex < 0;
}