源码分析:Phaser 之更灵活的同步屏障 (4)

使当前线程到达phaser,不等待其他任务到达。返回arrival phase number。

public int arrive() { // 一次一个参与者完成 return doArrive(ONE_ARRIVAL); // 特殊的属性值 ONE_ARRIVAL: 1 } private int doArrive(int adjust) { final Phaser root = this.root; for (;;) { // 自旋 long s = (root == this) ? state : reconcileState(); // 确定state值 int phase = (int)(s >>> PHASE_SHIFT); //位运算,得到当前阶段phaser if (phase < 0) return phase; int counts = (int)s; // 表示parties和unarrived的值 int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK); // 计算未到达数 if (unarrived <= 0) throw new IllegalStateException(badArrive(s)); // 到达时边界异常 if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) { // CAS直接修改state if (unarrived == 1) { // == 1 表示当前为最后一个未到达的任务 long n = s & PARTIES_MASK; // 掩码计算当前parties, 保留了16-32位的部分 int nextUnarrived = (int)n >>> PARTIES_SHIFT; if (root == this) { if (onAdvance(phase, nextUnarrived))// 判断 registeredParties == 0,返回true,需要终止phaser n |= TERMINATION_BIT; // 标识终止位 else if (nextUnarrived == 0) n |= EMPTY; else n |= nextUnarrived; int nextPhase = (phase + 1) & MAX_PHASE; // 下一个阶段phaser n |= (long)nextPhase << PHASE_SHIFT; // 下一个阶段phaser左移32位再加上当前的phaser就是最新的phaser UNSAFE.compareAndSwapLong(this, stateOffset, s, n); //CAS 修改 releaseWaiters(phase); // 释放等待phase的线程 } else if (nextUnarrived == 0) { // propagate deregistration phase = parent.doArrive(ONE_DEREGISTER); // 使用父节点管理 UNSAFE.compareAndSwapLong(this, stateOffset, s, s | EMPTY); } else phase = parent.doArrive(ONE_ARRIVAL); // 使用父节点管理 } // 不是最后一个到达,直接返回phaser return phase; } } }

arrive()方法总结:

通过位运算计算当前state、phaser等值

然后直接使用自旋+CAS更新state值(state-=adjust)

如果当前不是最后一个未到达的任务,直接返回当前phaser值

如果当前是最后一个未到达的任务

如果当前是root节点,判断是否需要终止phase(nextUnarrived == 0)r,然后CAS更新state,最后释放等待phase的线程

如果是分层结构,并且已经没有下一代未到达的parties,则交由父节点处理doArrive逻辑,然后更新state为EMPTY

arriveAndDeregister()方法

使当前线程到达phaser并撤销注册,返回arrival phase number。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wssyfy.html