源码分析:Phaser 之更灵活的同步屏障 (2)

运行结果:

Thread-1 doing 阶段:1 Thread-4 doing 阶段:1 Thread-3 doing 阶段:1 Thread-2 doing 阶段:1 Thread-5 doing 阶段:1 阶段phase: 1 执行完毕 Thread-5 doing 阶段:2 Thread-3 doing 阶段:2 Thread-4 doing 阶段:2 Thread-1 doing 阶段:2 Thread-2 doing 阶段:2 阶段phase: 2 执行完毕 Thread-2 doing 阶段:3 Thread-3 doing 阶段:3 Thread-1 doing 阶段:3 Thread-4 doing 阶段:3 Thread-5 doing 阶段:3 阶段phase: 3 执行完毕 源码分析 内部类QNode

内部等待队列,用于在阻塞时记录等待线程及相关信息

static final class QNode implements ForkJoinPool.ManagedBlocker { final Phaser phaser; final int phase; final boolean interruptible; final boolean timed; boolean wasInterrupted; long nanos; final long deadline; volatile Thread thread; // nulled to cancel wait QNode next; // 由此看出是一个单向列表 QNode(Phaser phaser, int phase, boolean interruptible, boolean timed, long nanos) { this.phaser = phaser; this.phase = phase; this.interruptible = interruptible; this.nanos = nanos; this.timed = timed; this.deadline = timed ? System.nanoTime() + nanos : 0L; thread = Thread.currentThread(); } ... 部分代码省略 ... } 主要的属性 // 状态变量,用于存储当前阶段phase、参与者数parties、未完成的参与者数unarrived_count // 低0-15位表示未到达parties数,中16-31位表示等待的parties数,中32-62位表示当前阶段phase private volatile long state; // 最多可以有多少个参与者,即每个阶段最多有多少个任务,十进制表示为65535 private static final int MAX_PARTIES = 0xffff; // 最多可以有多少阶段,2的31次方-1,十进制:2147483647 private static final int MAX_PHASE = Integer.MAX_VALUE; // 参与者数量的偏移量 private static final int PARTIES_SHIFT = 16; // 阶段的偏移量 private static final int PHASE_SHIFT = 32; // 未完成的参与者数的掩码,低16位,二进制:1111 1111 1111 1111 private static final int UNARRIVED_MASK = 0xffff; // to mask ints // 参与者数,中间16位,二进制:1111 1111 1111 1111 0000 0000 0000 0000 private static final long PARTIES_MASK = 0xffff0000L; // to mask longs // counts的掩码,counts等于参与者数和未完成的参与者数的'|'操作 // 二进制:1111 1111 1111 1111 1111 1111 1111 1111 private static final long COUNTS_MASK = 0xffffffffL; // 二进制位第64位为1,终止位 private static final long TERMINATION_BIT = 1L << 63; // 一些特殊的值 // 一次一个参与者完成 private static final int ONE_ARRIVAL = 1; // 增加减少参与者时使用,1左移16位,二进制:0001 0000 0000 0000 0000 private static final int ONE_PARTY = 1 << PARTIES_SHIFT; // 减少参与者时使用,二进制:0001 0000 0000 0000 0001 private static final int ONE_DEREGISTER = ONE_ARRIVAL|ONE_PARTY; // 没有参与者时使用 private static final int EMPTY = 1; // 当前Phaser的父级;如果没有,则为null private final Phaser parent; /** phaser的根。如果不在树中则等于phaser */ private final Phaser root; /** 两个队列链表,在偶数和奇数阶段交替使用 */ private final AtomicReference<QNode> evenQ; // 偶数 private final AtomicReference<QNode> oddQ; // 奇数 构造方法 public Phaser() { this(null, 0); } public Phaser(int parties) { this(null, parties); } public Phaser(Phaser parent) { this(parent, 0); } public Phaser(Phaser parent, int parties) { if (parties >>> PARTIES_SHIFT != 0) throw new IllegalArgumentException("Illegal number of parties"); int phase = 0; this.parent = parent; if (parent != null) { // 有设置parent final Phaser root = parent.root; this.root = root; this.evenQ = root.evenQ; this.oddQ = root.oddQ; if (parties != 0) phase = parent.doRegister(1); } else { this.root = this; // root是当前phaser // 初始化两个队列 this.evenQ = new AtomicReference<QNode>(); this.oddQ = new AtomicReference<QNode>(); } // 确定state,先是一个三目运算 // parties 为 0 时,state为 1 // this.state = (parties == 0) ? (long)EMPTY : ((long)phase << PHASE_SHIFT) | // 当前阶段左移32位 ((long)parties << PARTIES_SHIFT) | // 等待的parties数,左移16位 ((long)parties); // 未到达parties数,就存低16位 }

整个构造方法最重要的就是最后state值的确认,也可以看出低0-15位表示未到达parties数,中16-31位表示等待的parties数,中32-62位表示当前阶段phase。

比如入参为5的话,初始化的state值的二进制表示为:0101 0000 0000 0000 0101

register()方法

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wssyfy.html