由于我们示例中初始化Itr的时候的prevTakeIndex为0,故isDetached返回为false,程序将调用incorporateDequeues方法,根据注释我们也知道,该方法主要是调整和迭代器相关的内部索引。
/** * Adjusts indices to incorporate all dequeues since the last * operation on this iterator. Call only from iterating thread. */ private void incorporateDequeues() { final int cycles = itrs.cycles; final int takeIndex = ArrayBlockingQueue.this.takeIndex; final int prevCycles = this.prevCycles; final int prevTakeIndex = this.prevTakeIndex; if (cycles != prevCycles || takeIndex != prevTakeIndex) { final int len = items.length; // how far takeIndex has advanced since the previous // operation of this iterator long dequeues = (cycles - prevCycles) * len + (takeIndex - prevTakeIndex); // Check indices for invalidation if (invalidated(lastRet, prevTakeIndex, dequeues, len)) lastRet = REMOVED; if (invalidated(nextIndex, prevTakeIndex, dequeues, len)) nextIndex = REMOVED; if (invalidated(cursor, prevTakeIndex, dequeues, len)) cursor = takeIndex; if (cursor < 0 && nextIndex < 0 && lastRet < 0) detach(); else { this.prevCycles = cycles; this.prevTakeIndex = takeIndex; } } }注意cursor = takeIndex这句代码,将外部内的takeIndex赋值给cursor,这样子将队列和迭代器数据读取进行了同步。
对于iterator1,第一次调用next()方法时,cursor被赋值为3首先将nextItem的值保持在x变量中,即hadoop字符串。
然后设置nextItem和cursor的值
设置完成后,nextItem为flink,cursor为-1。
最后返回保存在x变量中的值,即返回hadoop字符串。
第二次调用next()方法时,输出的值即上次保存的nextItem值,即flink字符串。
迭代器运行过程中,相关变量内容如下:
至于iterator2迭代器,各位可以自己去分析,不再赘述。
本文主要以实例讲解Semaphore、阻塞队列,并分析了相关核心源码实现。
本文参考
Java 7 Concurrency Cookbook
concurrency-modle-seven-week