ForkJoinPool大型图文现场(一阅到底 vs 直接收藏) (6)

ForkJoinPool大型图文现场(一阅到底 vs 直接收藏)

lockRunState /** * Acquires the runState lock; returns current (locked) runState. */ // 从方法注释中看到,该方法一定会返回 locked 的 runState,也就是说一定会加锁成功 private int lockRunState() { int rs; return ((((rs = runState) & RSLOCK) != 0 || !U.compareAndSwapInt(this, RUNSTATE, rs, rs |= RSLOCK)) ? awaitRunStateLock() : rs); }

因为 RSLOCK = 1,如果 runState & RSLOCK == 0,则说明目前没有加锁,进入或运算的下半段 CAS

先通过 CAS 尝试加锁,尝试成功直接返回,尝试失败则要调用 awaitRunStateLock 方法

/** * Spins and/or blocks until runstate lock is available. See * above for explanation. */ private int awaitRunStateLock() { Object lock; boolean wasInterrupted = false; for (int spins = SPINS, r = 0, rs, ns;;) { //判断是否加锁(==0表示未加锁) if (((rs = runState) & RSLOCK) == 0) { // 通过CAS加锁 if (U.compareAndSwapInt(this, RUNSTATE, rs, ns = rs | RSLOCK)) { if (wasInterrupted) { try { // 重置线程终端标记 Thread.currentThread().interrupt(); } catch (SecurityException ignore) { // 这里竟然 catch 了个寂寞 } } // 加锁成功返回最新的 runState,for 循环的唯一正常出口 return ns; } } else if (r == 0) r = ThreadLocalRandom.nextSecondarySeed(); else if (spins > 0) { r ^= r << 6; r ^= r >>> 21; r ^= r << 7; // xorshift if (r >= 0) --spins; } // Flag1 如果是其他线程正在初始化占用锁,则调用 yield 方法让出 CPU,让其快速初始化 else if ((rs & STARTED) == 0 || (lock = stealCounter) == null) Thread.yield(); // initialization race // Flag2 如果其它线程持有锁,并且线程池已经初始化,则将唤醒位标记为1 else if (U.compareAndSwapInt(this, RUNSTATE, rs, rs | RSIGNAL)) { // 进入互斥锁 synchronized (lock) { // 再次判断,如果等于0,说明进入互斥锁前刚好有线程进行了唤醒,就不用等待,直接进行唤醒操作即可,否则就进入等待 if ((runState & RSIGNAL) != 0) { try { lock.wait(); } catch (InterruptedException ie) { if (!(Thread.currentThread() instanceof ForkJoinWorkerThread)) wasInterrupted = true; } } else lock.notifyAll(); } } } }

上面代码 33 ~ 34 (Flag1)行以及 36 ~ 50 (Flag2) 行,如果你没看后续代码,现在来理解是有些困难的,我这里先提前说明一下:

Flag1: 当完整的初始化 ForkJoinPool 时,直接利用了 stealCounter 这个原子变量,因为初始化时(调用 externalSubmit 时),才会对 StealCounter 赋值。所以,这里的逻辑是,当状态不是 STARTED 或者 stealCounter 为空,让出线程等待,也就是说,别的线程还没初始化完全,让其继续占用锁初始化即可

Flag2: 我们在讲等待/通知模型时就说,不要让无限自旋尝试,如果资源不满足就等待,如果资源满足了就通知,所以,如果 (runState & RSIGNAL) == 0 成立,说明有线程需要唤醒,直接唤醒就好,否则也别浪费资源,主动等待一会

当阅读到这的代码时,马上就抛出来两个问题:

Q1: 既然是加锁,为什么不用已有的轮子 ReentrantLock 呢?

PS:如果你读过并发系列 Java AQS队列同步器以及ReentrantLock的应用 ,你会知道 ReentrantLock 是用一个完整字段 state 来控制同步状态。但这里在竞争锁的时候还会判断线程池的状态,如果是初始化状态主动 yield 放弃 CPU 来减少竞争;另外,用一个完整的 runState 不同位来表示状态也体现出更细的粒度吧

Q2: synchronized 大法虽好,但是我们都知道这是比较重量级的锁,为什么还在这里应用了呢?

PS: 首先 synchronized 经过不断优化,没有它刚诞生时那么重,另外按照 Flag 2 的代码含义,进入 synchronized 同步块的概率还是很低的,可以用最简单的方式稳稳兜底(奥卡姆剃刀了原理?)

有加锁自然要解锁,向下看 unlockRunState

unlockRunState

解锁的逻辑相对简单多了,总体目标是清除锁标记位。如果顺利将状态修改为目标状态,自然解锁成功;否则表示有别的线程进入了wait,需要调用notifyAll唤醒,重新尝试竞争

/** * Unlocks and sets runState to newRunState. * * @param oldRunState a value returned from lockRunState * @param newRunState the next value (must have lock bit clear). */ private void unlockRunState(int oldRunState, int newRunState) { if (!U.compareAndSwapInt(this, RUNSTATE, oldRunState, newRunState)) { Object lock = stealCounter; runState = newRunState; // clears RSIGNAL bit if (lock != null) synchronized (lock) { lock.notifyAll(); } } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpgysx.html