Java并发基础-并发工具类(二) (5)

获取数据
take()方法源码如下:

public E take() throws InterruptedException { final ReentrantLock lock = this.lock; //(1) lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); //(2) return dequeue(); } finally { lock.unlock(); //(9) } } private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; //(3) items[takeIndex] = null; //(4) if (++takeIndex == items.length) takeIndex = 0; //(5) count--; //(6) if (itrs != null) itrs.elementDequeued(); //(7) notFull.signal(); //(8) return x; }

(1):获取互斥锁
(2):如果count为0,即队列为空,则释放互斥锁,然后挂起当前线程
(3):根据takeIndex索引到数组中获取具体的值,并赋值给x
(4):赋值完成后,takeIndex索引位置数据置null,便于回收
(5):takeIndex加1,然后和队列长度比较,如果相等,即已经读取到队列尾部,takeIndex置0
(6):获取后,将队列元素个数count减1
(7):维护和queue相关的迭代器
(8):唤醒等待插入元素的线程
(9):释放互斥锁
可以看到ArrayBlockingQueue每次获取元素后,都会唤醒等待插入元素的线程。

迭代器
在分析源码前,我们先看在一个迭代器的示例

package com.aidodoo.java.concurrent; import java.util.Iterator; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; /** * Created by zhangkh on 2018/9/10. */ public class ArrayBlockingQueueIterDemo { public static void main(String[] args) throws InterruptedException{ BlockingQueue<String> queue=new ArrayBlockingQueue(5); queue.put("hadoop"); queue.put("spark"); queue.put("storm"); queue.put("flink"); Iterator<String> iterator1 = queue.iterator(); System.out.println( queue.take()); System.out.println(queue.take()); System.out.println(queue.take()); System.out.println(); while(iterator1.hasNext()) { System.out.println(iterator1.next()); } System.out.println(); Iterator<String> iterator2 = queue.iterator(); while(iterator2.hasNext()) { System.out.println(iterator2.next()); } } }

程序输出如下:

hadoop spark storm hadoop flink flink

我们结合这个示例来具体分析数据插入和获取时,内部成员变量的值
当分别插入hadoop、spark、storm、flink四个元素后,内部变量的值如下:

此时,ArrayBlockingQueue的成员变量的值itrs为null。
调用iterator()方法后,源码如下:

public Iterator<E> iterator() { return new Itr(); //(1) } Itr() { lastRet = NONE; final ReentrantLock lock = ArrayBlockingQueue.this.lock; lock.lock(); //(2) try { if (count == 0) { //(3) cursor = NONE; nextIndex = NONE; prevTakeIndex = DETACHED; } else { final int takeIndex = ArrayBlockingQueue.this.takeIndex; prevTakeIndex = takeIndex; nextItem = itemAt(nextIndex = takeIndex); //(4) cursor = incCursor(takeIndex); //(5) if (itrs == null) { itrs = new Itrs(this); //(6) } else { itrs.register(this); //(7) itrs.doSomeSweeping(false); } prevCycles = itrs.cycles; } } finally { lock.unlock(); //(8) }

}
(1):调用内部类Itr的构造方法
(2):获取外部类即ArrayBlockingQueue的锁
(3):没有没有元素,初始化变量值。内部类Itr的成员变量如下:

/** Index to look for new nextItem; NONE at end */ private int cursor; /** Element to be returned by next call to next(); null if none */ private E nextItem; /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */ private int nextIndex; /** Last element returned; null if none or not detached. */ private E lastItem; /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */ private int lastRet; /** Previous value of takeIndex, or DETACHED when detached */ private int prevTakeIndex; /** Previous value of iters.cycles */ private int prevCycles;

(4):将外部类的takeIndex赋值给内部类nextIndex,并获取数组具体的值赋值给nextItem
(5):计算游标cursor的下个值,其中incCursor方法如下:

private int incCursor(int index) { // assert lock.getHoldCount() == 1; if (++index == items.length) index = 0; if (index == putIndex) index = NONE; return index; }

(6):注册,主要是维护链表
(7):清理itrs
(8):释放外部类的互斥锁
在上面的示例中,调用iterator()方法后,Itr的内部变量值如下:

由于后面三次调用了queue.take(),依次输出hadoop、spark、storm后,相关成员变量的值见图片标识,重点关注takeIndex=3。

当调用next()方法时,代码如下:

public E next() { final E x = nextItem; if (x == null) throw new NoSuchElementException(); final ReentrantLock lock = ArrayBlockingQueue.this.lock; lock.lock(); try { if (!isDetached()) //(1) incorporateDequeues(); lastRet = nextIndex; final int cursor = this.cursor; if (cursor >= 0) { nextItem = itemAt(nextIndex = cursor); this.cursor = incCursor(cursor); } else { nextIndex = NONE; nextItem = null; } } finally { lock.unlock(); } return x; }

其中(1)处的isDetached方法如下

boolean isDetached() { // assert lock.getHoldCount() == 1; return prevTakeIndex < 0; }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zydzdg.html