我们初看AQS中的条件队列时,发现其提供了与JDK条件队列几乎一致的功能
JDK AQSwait await
notify singal
notifyAll singalAll
用法上也及其相似:
await :
// 初始化 Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); try { lock.lock(); condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); }singal:
Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); try { lock.lock(); condition.signal(); } finally { lock.unlock(); } 3.1、条件队列我们知道在AQS内部维护了一个阻塞队列,数据结构如下:
上图描述的是一个长度为 3 的FIFO阻塞队列,因为头结点常驻内存,所以不算在内;我们可以发现阻塞队列中每个节点都包含了前、后引用
那AQS内部的另一个条件队列又是什么样的数据结构呢?
可见,条件队列为单向列表,只有指向下一个节点的引用;没有被唤醒的节点全部存储在条件队列上。上图描述的是一个长度为 5 的条件队列,即有5个线程执行了await()方法;与阻塞队列不同,条件队列没有常驻内存的“head结点”,且一个处于正常状态节点的waitStatus为 -2 。当有新节点加入时,将会追加至队列尾部
3.2、唤醒当我们调用signal()方法时,会发生什么?我们还是拿长度为 5 的条件队列举例说明,在AQS内部会经历队列转移,即由条件队列转移至阻塞队列
而signalAll()执行时,具体执行流程与signal()类似,即会将条件队列中的所有节点全部转移至阻塞队列(并发度为1,按顺序依次激活)中,依靠阻塞队列自身依次唤醒的机制,达到激活所有线程的目的
四、JDK vs AQS经过上文的介绍,似乎AQS做了与wait/notify相同的功能,相比较而言,甚至JDK的写法更简洁;那他们在性能上的表现如何呢?让我们来做个对比
4.1、对比我们模拟这样的一个场景:启动10个线程,分别调用wait()方法,当所有线程都进入阻塞后,调用notifyAll(),10个线程均被唤醒并执行完毕后,方法结束。 上述方法执行10000次,对比JDK与AQS耗时
JDK测试代码:
public class ConditionCompareTest { @Test public void runTest() throws InterruptedException { long begin = System.currentTimeMillis(); for (int i = 0; i < 10000; i++) { if (i % 1000 == 0) { System.out.println(i); } jdkTest(); } long cost = System.currentTimeMillis() - begin; System.out.println("耗时: " + cost); } public void jdkTest() throws InterruptedException { Object lock = new Object(); List<Thread> list = Lists.newArrayList(); // 步骤一:启动10个线程,并进入wait等待 for (int i = 0; i < 10; i++) { Thread thread = new Thread(() -> { try { synchronized (lock) { lock.wait(); } } catch (InterruptedException e) { e.printStackTrace(); } }); thread.start(); list.add(thread); } // 步骤二:等待10个线程全部进入wait方法 while (true) { boolean allWaiting = true; for (Thread thread : list) { if (thread.getState() != Thread.State.WAITING) { allWaiting = false; break; } } if (allWaiting) { break; } } // 步骤三:唤醒10个线程 synchronized (lock) { lock.notifyAll(); } // 步骤四:等待10个线程全部执行完毕 for (Thread thread : list) { thread.join(); } } }AQS测试代码:
public class ConditionCompareTest { private ReentrantLock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); @Test public void runTest() throws InterruptedException { long begin = System.currentTimeMillis(); for (int i = 0; i < 10000; i++) { if (i % 1000 == 0) { System.out.println(i); } aqsTest(); } long cost = System.currentTimeMillis() - begin; System.out.println("耗时: " + cost); } @Test public void aqsTest() throws InterruptedException { AtomicInteger lockedNum = new AtomicInteger(); List<Thread> list = Lists.newArrayList(); // 步骤一:启动10个线程,并进入wait等待 for (int i = 0; i < 10; i++) { Thread thread = new Thread(() -> { try { lock.lock(); lockedNum.incrementAndGet(); condition.await(); lock.unlock(); } catch (InterruptedException e) { e.printStackTrace(); } }); thread.start(); list.add(thread); } // 步骤二:等待10个线程全部进入wait方法 while (true) { if (lockedNum.get() != 10) { continue; } boolean allWaiting = true; for (Thread thread : list) { if (thread.getState() != Thread.State.WAITING) { allWaiting = false; break; } } if (allWaiting) { break; } } // 步骤三:唤醒10个线程 lock.lock(); condition.signalAll(); lock.unlock(); // 步骤四:等待10个线程全部执行完毕 for (Thread thread : list) { thread.join(); } } } 条件队列 耗时1 耗时2 耗时3 耗时4 耗时5 平均耗时(ms)JDK 5000 5076 5054 5089 4942 5032
AQS 5358 5440 5444 5473 5472 5437
4.2、基准测试Q&A
基于以上的测试我们还是有一些疑问的,不要小看这些疑问,通过这些疑问我们可以把之前的知识点全都串联起来