硬核干货:5W字17张高清图理解同步器框架AbstractQueuedSynchronizer (9)

其实代码的实现和独占模式有很多类似的地方,一个很大的不同点是:共享模式同步器当节点获取资源成功晋升为头节点之后,它会把自身的等待状态通过CAS更新为Node.PROPAGATE,下一个加入等待队列的新节点会把头节点的等待状态值更新回Node.SIGNAL,标记后继节点处于可以被唤醒的状态,如果遇上资源释放,那么这个阻塞的节点就能被唤醒从而解除阻塞。我们还是画图理解一下,先假设tryAcquireShared(int arg)总是返回小于0的值,入队两个阻塞的线程thread-1和thread-2,然后进行资源释放确保tryAcquireShared(int arg)总是返回大于0的值:

j-a-q-s-ex-10

看起来和独占模式下的同步等待队列差不多,实际上真正不同的地方在于有节点解除阻塞和晋升为头节点的过程。因此我们可以先看releaseShared(int arg)的源码:

// 共享模式下释放资源 public final boolean releaseShared(int arg) { // 尝试释放资源成功则调用前面分析过的doReleaseShared以传播唤醒状态和unpark头节点的后继节点 if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } // 共享模式下尝试释放资源,必须由子类覆盖 protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); }

releaseShared(int arg)就是在tryReleaseShared(int arg)调用返回true的情况下主动调用一次doReleaseShared()从而基于头节点传播唤醒状态和unpark头节点的后继节点。接着之前的图:

j-a-q-s-ex-11

j-a-q-s-ex-12

接着看acquireSharedInterruptibly(int arg)的源码实现:

// 共享模式下获取资源的方法,响应线程中断 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC return; } } // 和非响应线程中断的acquireShared方法类似,不过这里解除阻塞之后直接抛出异常InterruptedException if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } catch (Throwable t) { cancelAcquire(node); throw t; } }

最后看tryAcquireSharedNanos(int arg, long nanosTimeout)的源码实现:

// 共享模式下获取资源的方法,带超时时间版本 public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 注意这里只要tryAcquireShared >= 0或者doAcquireSharedNanos返回true都认为获取资源成功 return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); } private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; // 计算超时的最终期限 final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.SHARED); try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC return true; } } //重新计算剩余的超时时间 nanosTimeout = deadline - System.nanoTime(); // 超时的情况下直接取消获取 if (nanosTimeout <= 0L) { cancelAcquire(node); return false; } // 满足阻塞状态并且剩余的超时时间大于阀值1000纳秒则通过LockSupport.parkNanos()阻塞线程 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) LockSupport.parkNanos(this, nanosTimeout); // 解除阻塞后判断线程的中断标记并且清空标记位,如果是处于中断状态则抛出InterruptedException if (Thread.interrupted()) throw new InterruptedException(); } } catch (Throwable t) { cancelAcquire(node); throw t; } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zywszj.html