AQS之ReentrantReadWriteLock精讲分析上篇 (2)

在独占锁中也调用了该方法,头和尾部不为空不相等整明是有节点的,如果返回true,那么就是有当前线程前面的线程在排队,返回false,那么就是当前线程是在队列的头部的下一个节点或者队列是空的

public final boolean hasQueuedPredecessors() { Node t = tail; Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } 2.3 fullTryAcquireShared

获取读锁的完整版本,可处理tryAcquireShared中未处理的CAS丢失和可重入读操作

final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { // 获取状态 int c = getState(); // 如果写线程数量不为0 if (exclusiveCount(c) != 0) { // 如果不是当前线程 if (getExclusiveOwnerThread() != current) return -1; // 写线程数量为0并且读线程被阻塞 } else if (readerShouldBlock()) { // 确保我们没有重新获取读锁 if (firstReader == current) { // 当前线程为第一个读线程 } else { // 当前线程不为第一个读线程 if (rh == null) { // 计数器为空 rh = cachedHoldCounter; // 计数器为空或者计数器的tid不为当前正在运行的线程的tid if (rh == null || rh.tid != getThreadId(current)) { // 获取当前线程对应的计数器 rh = readHolds.get(); if (rh.count == 0) // 计数为0 // 从readHolds中移除 readHolds.remove(); } } if (rh.count == 0) // 获取锁失败了 return -1; } } if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 比较并且设置成功 if (compareAndSetState(c, c + SHARED_UNIT)) { // 读线程数量为0 if (sharedCount(c) == 0) { // firstReader是第一个获得读取锁定的线程, // 第一个读锁firstReader是不会加入到readHolds中 firstReader = current; firstReaderHoldCount = 1; // 如果第一个读线程是当前线程那么就将计数+1 } else if (firstReader == current) { firstReaderHoldCount++; } else { // 读锁数量不为0并且不为当前线程 // 每个线程读取保持计数的计数器。 维护为ThreadLocal // 缓存在cachedHoldCounter中 if (rh == null) rh = cachedHoldCounter; // 计数器为空或者计数器的tid不为当前正在运行的线程的tid if (rh == null || rh.tid != getThreadId(current)) // 获取当前线程对应的计数器 rh = readHolds.get(); else if (rh.count == 0) // 加入到readHolds中 readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } // 获取锁成功 return 1; } } } 2.4tryAcquireShared失败

如果tryAcquireShared(arg)返回的值为正数或者为0,那么意味着获取锁失败,执行doAcquireShared(arg)方法

private void doAcquireShared(int arg) { // 将节点放入阻塞队列中返回当前节点,addWaiter前一篇文章已经讲过了 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } // 如果前置节点的waitStatus为唤醒那么就可以安心睡眠了,并且挂起当 // 前线程 if (shouldParkAfterFailedAcquire(p, node) && // 挂起当前线程 parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } 2.5setHeadAndPropagate

设置队列的头部,并检查后继者是否可能在共享模式下等待,如果正在传播,则传播是否设置为传播> 0或PROPAGATE状态

这个函数做的事情有两件:

在获取共享锁成功后,设置head节点

根据调用tryAcquireShared返回的状态以及节点本身的等待状态来判断是否要需要唤醒后继线程

在该方法内部我们不仅调用了setHead(node),还在一定条件下调用了doReleaseShared()来唤醒后继的节点。这是因为在共享锁模式下,锁可以被多个线程所共同持有,既然当前线程已经拿到共享锁了,那么就可以直接通知后继节点来拿锁,而不必等待锁被释放的时候再通知。

propagate是tryAcquireShared的返回值,这是决定是否传播唤醒的依据之一

private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); // h.waitStatus为SIGNAL或者PROPAGATE时也根据node的下一个节点共享来决定 // 是否传播唤醒 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } } 2.6 doReleaseShared()

共享模式下的释放动作-信号后继并确保传播

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wppjjx.html