从 parties 和 count 的变量声明中,你也能看出一些门道,前者有 final 修饰,初始化后就不可以改变了,因为 CyclicBarrier 的设计目的是可以循环利用的,所以始终用 parties 来记录线程总数,当 count 计数器变为 0 后,如果没有 parties 的值赋给它,怎么进行重新复用再次计数呢,所以这里维护两个变量很有必要
接下来就看看 await() 到底是怎么实现的
// 从方法签名上可以看出,该方法同样可以被中断,另外还有一个 BrokenBarrierException 异常,我们一会看 public int await() throws InterruptedException, BrokenBarrierException { try { // 调用内部 dowait 方法, 第一个参数为 false,表示不设置超时时间,第二个参数也就没了意义 return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }接下来看看 dowait(false, 0L) 做了哪些事情 (这个方法内容有点多,别担心,逻辑并不复杂,请看关键代码注释)
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; // 还记得之前说过的 Lock 标准范式吗? JDK 内部都是这么使用的,你一定也要遵循范式 lock.lock(); try { final Generation g = generation; // broken 是静态内部类 Generation唯一的一个成员变量,用于记录当前屏障是否被打破,如果打破,则抛出 BrokenBarrierException 异常 // 这里感觉挺困惑的,我们要【冲破】屏障,这里【打破】屏障却抛出异常,注意我这里的用词 if (g.broken) throw new BrokenBarrierException(); // 如果线程被中断,则会通过 breakBarrier 方法将 broken 设置为true,也就是说,如果有线程收到中断通知,直接就打破屏障,停止 CyclicBarrier, 并唤醒所有线程 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } // ************************************ // 因为 breakBarrier 方法在这里会被调用多次,为了便于大家理解,我直接将 breakBarrier 代码插入到这里 private void breakBarrier() { // 将打破屏障标识 设置为 true generation.broken = true; // 重置计数器 count = parties; // 唤醒所有等待的线程 trip.signalAll(); } // ************************************ // 每当一个线程调用 await 方法,计数器 count 就会减1 int index = --count; // 当 count 值减到 0 时,说明这是最后一个调用 await() 的子线程,则会突破屏障 if (index == 0) { // tripped boolean ranAction = false; try { // 获取构造函数中的 barrierCommand,如果有值,则运行该方法 final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; // 激活其他因调用 await 方法而被阻塞的线程,并重置 CyclicBarrier nextGeneration(); // ************************************ // 为了便于大家理解,我直接将 nextGeneration 实现插入到这里 private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); } // ************************************ return 0; } finally { if (!ranAction) breakBarrier(); } } // index 不等于0, 说明当前不是最后一个线程调用 await 方法 // loop until tripped, broken, interrupted, or timed out for (;;) { try { // 没有设置超时时间 if (!timed) // 进入条件等待 trip.await(); else if (nanos > 0L) // 否则,判断超时时间,这个我们在 AQS 中有说明过,包括为什么最后超时阈值 spinForTimeoutThreshold 不再比较的原因,大家会看就好 nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { // 条件等待被中断,则判断是否有其他线程已经使屏障破坏。若没有则进行屏障破坏处理,并抛出异常;否则再次中断当前线程 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); // 如果新一轮回环结束,会通过 nextGeneration 方法新建 generation 对象 if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }doWait 就是 CyclicBarrier 的核心逻辑, 可以看出,该方法入口使用了 ReentrantLock,这也就是为什么 Generation broken 变量没有被声明为 volatile 类型保持可见性,因为对其的更改都是在锁的内部,同样在锁的内部对计数器 count 做更新,也保证了原子性
doWait 方法中,是通过 nextGeneration 方法来重新初始化/重置 CyclicBarrier 状态的,该类中还有一个 reset() 方法,也是重置 CyclicBarrier 状态的
public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // break the current generation nextGeneration(); // start a new generation } finally { lock.unlock(); } }但 reset() 方法并没有在 CyclicBarrier 内部被调用,显然是给 CyclicBarrier 使用者来调用的,那问题来了
什么时候调用 reset() 方法呢
正常情况下,CyclicBarrier 是会被自动重置状态的,从 reset 的方法实现中可以看出调用了 breakBarrier
方法,也就是说,调用 reset 会使当前处在等待中的线程最终抛出 BrokenBarrierException 并立即被唤醒,所以说 reset() 只会在你想打破屏障时才会使用