共享模式的同步器的一个显著特点就是:头节点的第一个有效(非取消)的后继节点,总是尝试获取资源,一旦获取资源成功就会解除阻塞并且晋升为头节点,原来所在节点会移除出同步等待队列,原来的队列长度就会减少1,重新设置头节点的过程会传播唤醒的状态,简单来说就是唤醒一个有效的后继节点,只要一个节点可以晋升为头节点,它的后继节点就能被唤醒,以此类推。节点的唤醒顺序遵循类似于FIFO的原则,通俗说就是先阻塞或者阻塞时间最长则先被唤醒。
使用共享模式同步器的主要类库有:
信号量Semaphore。
倒数栅栏CountDownLatch。
Condition的实现Condition实例的建立是在Lock接口的newCondition()方法,它是锁条件等待的实现,基于作用或者语义可以见Condition接口的相关API注释:
Condition是对象监视器锁方法Object#wait()、Object#notify()和Object#notifyAll()的替代实现,对象监视器锁实现锁的时候作用的效果是每个锁对象必须使用多个wait-set(JVM内置的等待队列),通过Object提供的方法和监视器锁结合使用就能达到Lock的实现效果。如果替换synchronized方法和语句并且结合使用Lock和Condition,就能替换并且达到对象监视器锁的效果。
Condition必须固有地绑定在一个Lock的实现类上,也就是要通过Lock的实例建立Condition实例,而且Condition的方法调用使用必须在Lock的"锁定代码块"中,这一点和synchronized关键字以及Object的相关JNI方法使用的情况十分相似。
前文介绍过Condition接口提供的方法以及Condition队列,也就是条件等待队列,通过画图简单介绍了它的队列节点组成。实际上,条件等待队列需要结合同步等待队列使用,这也刚好对应于前面提到的Condition的方法调用使用必须在Lock的锁定代码块中。听起来很懵逼,我们慢慢分析一下ConditionObject的方法源码就能知道具体的原因。
先看ConditionObject#await()方法:
// 退出等待后主动进行中断当前线程 private static final int REINTERRUPT = 1; // 退出等待后抛出InterruptedException异常 private static final int THROW_IE = -1; /** * 可中断的条件等待实现 * 1、当前线程处于中断状态则抛出InterruptedException * 2、保存getState返回的锁状态,并且使用此锁状态调用release释放所有的阻塞线程 * 3、线程加入等待队列进行阻塞,直到signall或者中断 * 4、通过保存getState返回的锁状态调用acquire方法 * 5、第4步中阻塞过程中中断则抛出InterruptedException */ public final void await() throws InterruptedException { // 如果线程是中断状态则清空中断标记位并且抛出InterruptedException if (Thread.interrupted()) throw new InterruptedException(); // 当前线程所在的新节点加入条件等待队列 Node node = addConditionWaiter(); // 释放当前AQS中的所有资源返回资源的status保存值,也就是基于status的值调用release(status) - 其实这一步是解锁操作 int savedState = fullyRelease(node); // 初始化中断模式 int interruptMode = 0; // 如果节点新建的节点不位于同步队列中(理论上应该是一定不存在),则对节点所在线程进行阻塞,第二轮循环理论上节点一定在同步等待队列中 while (!isOnSyncQueue(node)) { LockSupport.park(this); // 处理节点所在线程中断的转换操作 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 节点所在线程被唤醒后,如果节点所在线程没有处于中断状态,则以独占模式进行头节点竞争 // 注意这里使用的status是前面释放资源时候返回的保存下来的status if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // 下一个等待节点不空,则从等待队列中移除所有取消的等待节点 if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); // interruptMode不为0则按照中断模式进行不同的处理 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } // 释放当前AQS中的所有资源,其实也就是基于status的值调用release(status) // 这一步对于锁实现来说,就是一个解锁操作 final int fullyRelease(Node node) { try { int savedState = getState(); if (release(savedState)) return savedState; throw new IllegalMonitorStateException(); } catch (Throwable t) { // 释放失败则标记等待状态为取消 node.waitStatus = Node.CANCELLED; throw t; } } // 传入的节点是否在同步队列中 final boolean isOnSyncQueue(Node node) { // 节点等待您状态为CONDITION或者前驱节点为null则返回false if (node.waitStatus == Node.CONDITION || node.prev == null) return false; // 因为等待队列是通过nextWaiter连接,next引用存在说明节点位于同步队列 if (node.next != null) return true; // 从同步队列的尾部向前遍历是否存在传入的节点实例 return findNodeFromTail(node); } // 从同步队列的尾部向前遍历是否存在传入的节点实例 private boolean findNodeFromTail(Node node) { for (Node p = tail;;) { if (p == node) return true; if (p == null) return false; p = p.prev; } } // 这是一个很复杂的判断,用了两个三目表达式,作用是如果新建的等待节点所在线程中断, // 则把节点的状态由CONDITION更新为0,并且加入到同步等待队列中,返回THROW_IE中断状态,如果加入同步队列失败,返回REINTERRUPT // 如果新建的等待节点所在线程没有中断,返回0,也就是初始状态的interruptMode private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } // 节点线程中断取消等待后的转换操作 final boolean transferAfterCancelledWait(Node node) { // CAS更新节点的状态由CONDITION更改为0 if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) { // 节点加入同步等待队列 enq(node); return true; } // 这里尝试自旋,直到节点加入同步等待队列成功 while (!isOnSyncQueue(node)) Thread.yield(); return false; } // 等待完毕后报告中断处理,前边的逻辑得到的interruptMode如果为THROW_IE则抛出InterruptedException,如果为REINTERRUPT则中断当前线程 private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); }