Java并发编程系列-AbstractQueuedSynchronizer (3)

第二步:调用addConditionWaiter()方法,目的是将当前线程封装成为Node节点并添加到等待队列的尾部,源码如下:

public class ConditionObject implements Condition, java.io.Serializable { private Node addConditionWaiter() { Node t = lastWaiter;// 保存尾节点 // If lastWaiter is cancelled, clean out. // 如果尾节点线程被取消,则清除之 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters();// 清除等待队列中所有的被取消的线程节点 t = lastWaiter; } // 将当前线程封装成为等待队列的Node节点 Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) // 如果等待队列为空,则将新节点作为头节点 firstWaiter = node; else // 否则将新节点作为新的尾节点添加到等待队列中 t.nextWaiter = node; // 更新尾节点指针 lastWaiter = node; return node; } }

这个方法里面除了封装节点和添加节点之外,还有针对等待队列进行清理的流程,主要是为了清理被取消的线程节点

第三步:调用fullyRelease(node)方法,用于释放当前线程所持有的锁并唤醒同步队列的下一节点,详情可见AQS方法解析部分;

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState();// 获取同步状态state值 // 执行release方法,尝试释放当前线程持有的共享状态,并唤醒下一个线程 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } }

第四步:调用LockSupport.park(this)阻塞当前线程,一但消除被中断后者线程被唤醒转移到同步队列,则退出循环,继续下一步;

这里涉及到一个中断模式的问题。中断模式之前提到过,有两种:REINTERRUPT和THROW_IE,分别表示针对被中断的线程在退出等待队列时的处理方式,前者重新中断,后者则抛出异常。
此处interruptMode表示的就是中断模式的值,初始赋值为0,然后通过checkInterruptWhileWaiting(node)方法不断的进行校验,其源码如下:

public class ConditionObject implements Condition, java.io.Serializable { private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } }

如果线程被中断则通过方法transferAfterCancelledWait(node)判断线程是否是在被唤醒之前被中断,如果是则返回true,否则返回false;如果返回true则采用THROW_IN模式,否则采用REINTERRUPT模式。无论是上面的哪一种模式都代表线程被中断了,那么此处interruptMode就不再是0,那么条件成立,break退出循环。除此之外transferAfterCancelledWait(node)方法无论返回true还是false,都会将现场节点转移到同步队列中

第五步:当前线程已经被转移到同步队列中,然后开始自旋以获取同步状态,待其获取到同步状态(锁)之后,返回该线程是否被中断,如果被中断,再根据其中断模式进行整理,如何整理呢,主要就是如果当前中断模式是THROW_IE模式,则保持不变,否则一律修改成REINTERRUPT模式,之后会再次进行一次同步队列节点清理。

第六步:最后针对不同的中断模式进行中断处理,如果是THROW_IN则抛出异常,如果是REINTERRUPT则再次进行中断。

awaitNanos(long):

public class ConditionObject implements Condition, java.io.Serializable { public final long awaitNanos(long nanosTimeout) throws InterruptedException { // 1-优先响应中断 if (Thread.interrupted()) throw new InterruptedException(); // 2-将当前线程封装成Node节点并添加到等待队列尾部 Node node = addConditionWaiter(); // 3-释放当前线程所占用的lock,在释放的过程中会唤醒同步队列中的下一个节点 int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout;// 计算截止时间点 int interruptMode = 0; // 4-阻塞当前线程,直到被中断或者被唤醒或者超时 // 4-1 校验当前线程是否被唤醒,如果没有进入循环体 while (!isOnSyncQueue(node)) { // 4-2 如果超时时间小于等于0,则表示线程立即超时,然后进行线程节点转移处理,并结束循环 if (nanosTimeout <= 0L) { transferAfterCancelledWait(node);// 转移线程节点 break; } // 4-3 如果超时设置时间nanosTimeout大于等于spinForTimeoutThreshold,则进行定时阻塞当前线程 if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); // 4-4 如果线程被中断,则转移线程到同步队列,并结束循环 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; // 每次循环都会计算新的nanosTimeout值,然后在下次循环的时候设置阻塞的时限 nanosTimeout = deadline - System.nanoTime(); } // 5-自旋等待获取到同步状态(即获取到lock) if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); // 6-处理被中断的情况 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return deadline - System.nanoTime(); } }

方法解析:

这个方法的流程与上面的await基本一致,只是在第4步中添加了关于超时判断的逻辑,这里就着重看一下这一部分,其余部分不再赘述。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwwysj.html