源码分析:Phaser 之更灵活的同步屏障 (5)

arriveAndDeregister()方法和arrive()方法非常类似,都是调用的doArrive()方法,只是入参有些区别,arriveAndDeregister()方法传入的入参是ONE_DEREGISTER,同时减参与者和未到达者。

arriveAndAwaitAdvance()方法

到达并等待其他人到达

public int arriveAndAwaitAdvance() { // Specialization of doArrive+awaitAdvance eliminating some reads/paths final Phaser root = this.root; for (;;) { // 自旋 // 当前state值 long s = (root == this) ? state : reconcileState(); int phase = (int)(s >>> PHASE_SHIFT); // 位运算-->当前阶段 if (phase < 0) // onAdvance()方法返回true后,中断位标识后phase就会小于0 return phase; int counts = (int)s; // =>int // 未到达数 int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK); if (unarrived <= 0) throw new IllegalStateException(badArrive(s)); // 到达时边界异常 // CAS 修改state值 s-=1 if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s -= ONE_ARRIVAL)) { if (unarrived > 1) // 还是超过1个未到达,加入队列阻塞等待 return root.internalAwaitAdvance(phase, null); // 到下面这里,说明是最后一个到达 if (root != this) // root 不是当前自己,交由父节点阻塞等待 return parent.arriveAndAwaitAdvance(); // 位运算,得到parties,s是CAS计算过后的值, long n = s & PARTIES_MASK; // base of next state // 即下一次需要到达的参与者数量 int nextUnarrived = (int)n >>> PARTIES_SHIFT; if (onAdvance(phase, nextUnarrived)) // 判断是否要终止,nextUnarrived == 0 n |= TERMINATION_BIT; // 标识终止位 else if (nextUnarrived == 0) n |= EMPTY; else n |= nextUnarrived; // n 加上unarrived的值,下个阶段 int nextPhase = (phase + 1) & MAX_PHASE; // +1,进入下一个阶段 n |= (long)nextPhase << PHASE_SHIFT; // 标识到具体的位 if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n)) // CAS 修改 return (int)(state >>> PHASE_SHIFT); // terminated releaseWaiters(phase); // 唤醒当前阶段的线程,可以进行下一段了 return nextPhase; //返回下一阶段 } } }

arriveAndAwaitAdvance()方法总结:

主要逻辑就是自旋+CAS 修改state中低16的unarrived的值-1,知道自旋修改成功

如果调用当前的线程不是最后一个到达,需要入队阻塞等待

如果是最后一个到达的线程,则调用onAdvance()方法,返回true表示需要被中断,之后的phase就会小于0,再次调用arriveAndAwaitAdvance()方法也就么有阻塞等待效果了

onAdvance()方法支持重写,我们可以自定义判断规则

awaitAdvance()方法

等待指定phase数,返回下一个 arrival phase number。

public int awaitAdvance(int phase) { final Phaser root = this.root; long s = (root == this) ? state : reconcileState(); int p = (int)(s >>> PHASE_SHIFT); // 当前阶段 if (phase < 0) return phase; if (p == phase) // 阻塞或等待phase前进到下一代,internalAwaitAdvance见上面代码分析 return root.internalAwaitAdvance(phase, null); return p; } Phaser 总结

Phaser 使用了state变量来维护各个逻辑状态的计数

state的低0-15位表示未到达parties数,中16-31位表示等待的parties数,中32-62位表示当前阶段phase,第64位为终止位

维护的QNode队列根据当前阶段的奇偶性来选择,判断奇偶性可以使用(phase & 1) == 0来快速判断

每个阶段最后一个参与者到达时,会唤醒队列中的线程进入到下一阶段,不是最后一个参与者到达会阻塞等待

重写onAdvance方法可以达到CyclicBarrier的barrierAction类似效果,即在阶段完成执行指定的命令

于CyclicBarrier和CountDownLatch比较灵活在那里?

Phaser 支持分层,支持多个阶段,功能更加丰富与灵活

可以使用register方法追加参与者;By: https://jinglingwang.cn

也可以使用arriveAndDeregister方法到达但是不用等待

CountDownLatch 不支持循环使用,只能控制一个或一组线程

CyclicBarrier 支持循环使用,但不支持分层,不支持修改任务数

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wssyfy.html