当前线程加入同步等待队列和同步等待队列的初始化是同一个方法,前文提到过:同步等待队列的初始化会延迟到第一次可能出现竞争的情况,这是为了避免无谓的资源浪费,具体方法是addWaiter(Node mode):
// 添加等待节点到同步等待队列,实际上初始化队列也是这个方法完成的 private Node addWaiter(Node mode) { // 基于当前线程创建一个新节点,节点的模式由调用者决定 Node node = new Node(mode); for (;;) { Node oldTail = tail; // 尾节点不为空说明队列已经初始化过,则把新节点加入到链表中,作为新的尾节点,建立和前驱节点的关联关系 if (oldTail != null) { node.setPrevRelaxed(oldTail); if (compareAndSetTail(oldTail, node)) { oldTail.next = node; return node; } } else { // 尾节点为空说明队列尚未初始化过,进行一次初始化操作 initializeSyncQueue(); } } }在首次调用addWaiter()方法,死循环至少执行两轮再跳出,因为同步队列必须初始化完成后(第一轮循环),然后再把当前线程所在的新节点实例添加到等待队列中再返(第二轮循环)当前的节点,这里需要注意的是新加入同步等待队列的节点一定是添加到队列的尾部并且会更新AQS中的tail属性为最新入队的节点实例。
假设我们使用Node.EXCLUSIVE模式把新增的等待线程加入队列,例如有三个线程分别是thread-1、thread-2和thread-3,线程入队的时候都处于阻塞状态,模拟一下依次调用上面的入队方法的同步队列的整个链表的状态。
先是线程thread-1加入等待队列:
接着是线程thread-2加入等待队列:
最后是线程thread-3加入等待队列:
如果仔细研究会发现,如果所有的入队线程都处于阻塞状态的话,新入队的线程总是添加到队列的tail节点,阻塞的线程总是"争抢"着成为head节点,这一点和CLH队列锁的阻塞线程总是基于前驱节点自旋以获取锁的思路是一致的。下面将会分析的独占模式与共享模式,线程加入等待队列都是通过addWaiter()方法。
理解条件等待队列前面已经相对详细地介绍过同步等待队列,在AQS中还存在另外一种相对特殊和复杂的等待队列-条件等待队列。介绍条件等待队列之前,要先介绍java.util.concurrent.locks.Condition接口。
public interface Condition { // 当前线程进入等待状态直到被唤醒或者中断 void await() throws InterruptedException; // 当前线程进入等待状态,不响应中断,阻塞直到被唤醒 void awaitUninterruptibly(); // 当前线程进入等待状态直到被唤醒或者中断,阻塞带时间限制 long awaitNanos(long nanosTimeout) throws InterruptedException; // 当前线程进入等待状态直到被唤醒或者中断,阻塞带时间限制 boolean await(long time, TimeUnit unit) throws InterruptedException; // 当前线程进入等待状态直到被唤醒或者中断,阻塞带时间限制 boolean awaitUntil(Date deadline) throws InterruptedException; // 唤醒单个阻塞线程 void signal(); // 唤醒所有阻塞线程 void signalAll(); }Condition可以理解为Object中的wait()、notify()和notifyAll()的替代品,因为Object中的相应方法是JNI(Native)方法,由JVM实现,对使用者而言并不是十分友好(有可能伴随JVM版本变更而受到影响),而Condition是基于数据结构和相应算法实现对应的功能,我们可以从源码上分析其实现。
Condition的实现类是AQS的公有内部类ConditionObject。ConditionObject提供的入队列方法如下:
public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; /** First node of condition queue. */ - 条件队列的第一个节点 private transient Node firstWaiter; /** Last node of condition queue. */ - 条件队列的最后一个节点 private transient Node lastWaiter; // 公有构造函数 public ConditionObject() { } // 添加条件等待节点 private Node addConditionWaiter() { // 这里做一次判断,当前线程必须步入此同步器实例 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 临时节点t赋值为lastWaiter引用 Node t = lastWaiter; // If lastWaiter is cancelled, clean out. // 最后一个节点不为条件等待状态,则是取消状态 if (t != null && t.waitStatus != Node.CONDITION) { // 解除所有取消等待的节点的连接 unlinkCancelledWaiters(); t = lastWaiter; } // 基于当前线程新建立一个条件等待类型的节点 Node node = new Node(Node.CONDITION); // 首次创建Condition的时候,最后一个节点临时引用t为null,则把第一个节点置为新建的节点 if (t == null) firstWaiter = node; else // 已经存在第一个节点,则通过nextWaiter连接新的节点 t.nextWaiter = node; // 最后一个节点的引用更新为新节点的引用 lastWaiter = node; return node; } // 从条件等待队列解除所有取消等待的节点的连接,其实就是所有取消节点移除的操作,涉及到双向链表的断链操作、第一个和最后一个节点的引用更新 private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; // 注意这里等待状态的判断 if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } } // 当前同步器实例持有的线程是否当前线程(currentThread()) protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); } // 暂时不分析其他方法 }