这才是图文并茂:我写了1万多字,就是为了让你了解AQS是怎么运行的 (5)

此方法的作用是将当前线程加入等待队列尾部休息,直到其他线程释放资源唤醒自己,自己成功拿到相应量的资源后才返回,这是它的源码:

private void doAcquireShared(int arg) { // 加入队列尾部 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; // CAS自旋 for (;;) { final Node p = node.predecessor(); // 判断前驱结点是否是head if (p == head) { // 尝试获取一定数量的锁 int r = tryAcquireShared(arg); if (r >= 0) { // 获取锁成功,而且还有剩余资源,就设置当前结点为head,并继续唤醒下一个线程 setHeadAndPropagate(node, r); // 让前驱结点去掉引用链,方便被GC p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } // 跟独占模式一样,改前驱结点waitStatus为-1,并且当前线程挂起,等待被唤醒 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // head指向自己 setHead(node); // 如果还有剩余量,继续唤醒下一个邻居线程 if (propagate > 0 || h == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }

看到这里,你会不会一点熟悉的感觉,这个方法的逻辑怎么跟上面那个acquireQueued() 那么类似啊?对的,其实两个流程并没有太大的差别。只是doAcquireShared()比起独占模式下的获取锁上多了一步唤醒后继线程的操作,当获取完一定的资源后,发现还有剩余的资源,就继续唤醒下一个邻居线程,这才符合"共享"的思想嘛。

这里我们可以提出一个疑问,共享模式下,当前线程释放了一定数量的资源,但这部分资源满足不了下一个等待结点的需要的话,那么会怎么样?

按照正常的思维,共享模式是可以多个线程同时执行的才对,所以,多个线程的情况下,如果老大释放完资源,但这部分资源满足不了老二,但能满足老三,那么老三就可以拿到资源。可事实是,从源码设计中可以看出,如果真的发生了这种情况,老三是拿不到资源的,因为等待队列是按顺序排列的,老二的资源需求量大,会把后面量小的老三以及老四、老五等都给卡住。从这一个角度来看,虽然AQS严格保证了顺序,但也降低了并发能力

接着往下说吧,唤醒下一个邻居线程的逻辑在doReleaseShared()中,我们放到下面的释放锁来解析。

释放锁

共享模式释放锁的顶层方法是releaseShared,它会释放指定量的资源,如果成功释放且允许唤醒等待线程,它会唤醒等待队列里的其他线程来获取资源。下面是releaseShared()的源码:

public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }

该方法同样包含两部分的逻辑:

tryReleaseShared:释放资源。

doAcquireShared:唤醒后继结点。

跟tryAcquireShared方法一样,tryReleaseShared在AQS中没有具体的实现,由子同步器自己去定义,但功能都一样,就是释放一定数量的资源。

释放完资源后,线程不会马上就收工,而是唤醒等待队列里最前排的等待结点。

doAcquireShared

唤醒后继结点的工作在doReleaseShared()方法中完成,我们可以看下它的源码:

private void doReleaseShared() { for (;;) { // 获取等待队列中的head结点 Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; // head结点waitStatus = -1,唤醒下一个结点对应的线程 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 唤醒后继结点 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }

代码没什么特别的,就是如果等待队列head结点的waitStatus为-1的话,就直接唤醒后继结点,唤醒的方法unparkSuccessor()在上面已经讲过了,这里也没必要再复述。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpjfxf.html