万字超强图文讲解AQS以及ReentrantLock应用(建议收藏) (8)

是不是和上面 acquireInterruptibly 方法长相很详细了,继续查看来 doAcquireNanos 方法,看程序, 该方法也是 throws InterruptedException,我们在中断文章中说过,方法标记上有 throws InterruptedException 说明该方法也是可以响应中断的,所以你可以理解超时限制是 acquireInterruptibly 方法的加强版,具有超时和非阻塞控制的双保险

private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { // 超时时间内,为获取到同步状态,直接返回false if (nanosTimeout <= 0L) return false; // 计算超时截止时间 final long deadline = System.nanoTime() + nanosTimeout; // 以独占方式加入到同步队列中 final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } // 计算新的超时时间 nanosTimeout = deadline - System.nanoTime(); // 如果超时,直接返回 false if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) && // 判断是最新超时时间是否大于阈值 1000 nanosTimeout > spinForTimeoutThreshold) // 挂起线程 nanosTimeout 长时间,时间到,自动返回 LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }

上面的方法应该不是很难懂,但是又同学可能在第 27 行上有所困惑

为什么 nanosTimeout 和 自旋超时阈值1000进行比较?

/** * The number of nanoseconds for which it is faster to spin * rather than to use timed park. A rough estimate suffices * to improve responsiveness with very short timeouts. */ static final long spinForTimeoutThreshold = 1000L;

其实 doc 说的很清楚,说白了,1000 nanoseconds 时间已经非常非常短暂了,没必要再执行挂起和唤醒操作了,不如直接当前线程直接进入下一次循环

到这里,我们自定义的 MyMutex 只差 Condition 没有说明了,不知道你累了吗?我还在坚持

万字超强图文讲解AQS以及ReentrantLock应用(建议收藏)

Condition

如果你看过之前写的 并发编程之等待通知机制 ,你应该对下面这个图是有印象的:

万字超强图文讲解AQS以及ReentrantLock应用(建议收藏)

如果当时你理解了这个模型,再看 Condition 的实现,根本就不是问题了,首先 Condition 还是一个接口,肯定也是需要有实现类的

万字超强图文讲解AQS以及ReentrantLock应用(建议收藏)

那故事就从 lock.newnewCondition 说起吧

public Condition newCondition() { // 使用自定义的条件 return sync.newCondition(); }

自定义同步器重封装了该方法:

Condition newCondition() { return new ConditionObject(); }

ConditionObject 就是 Condition 的实现类,该类就定义在了 AQS 中,只有两个成员变量:

/** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter;

所以,我们只需要来看一下 ConditionObject 实现的 await / signal 方法来使用这两个成员变量就可以了

public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 同样构建 Node 节点,并加入到等待队列中 Node node = addConditionWaiter(); // 释放同步状态 int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { // 挂起当前线程 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }

这里注意用词,在介绍获取同步状态时,addWaiter 是加入到【同步队列】,就是上图说的入口等待队列,这里说的是【等待队列】,所以 addConditionWaiter 肯定是构建了一个自己的队列:

private Node addConditionWaiter() { Node t = lastWaiter; if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } // 新构建的节点的 waitStatus 是 CONDITION,注意不是 0 或 SIGNAL 了 Node node = new Node(Thread.currentThread(), Node.CONDITION); // 构建单向同步队列 if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }

这里有朋友可能会有疑问:

为什么这里是单向队列,也没有使用CAS 来保证加入队列的安全性呢?

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyfypd.html