深入理解Java并发框架AQS系列(五):条件队列(Condition) (2)

我们初看AQS中的条件队列时,发现其提供了与JDK条件队列几乎一致的功能

JDK AQS
wait   await  
notify   singal  
notifyAll   singalAll  

用法上也及其相似:

await :

// 初始化 Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); try { lock.lock(); condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); }

singal:

Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); try { lock.lock(); condition.signal(); } finally { lock.unlock(); } 3.1、条件队列

我们知道在AQS内部维护了一个阻塞队列,数据结构如下:

阻塞队列FIFO数据结构

上图描述的是一个长度为 3 的FIFO阻塞队列,因为头结点常驻内存,所以不算在内;我们可以发现阻塞队列中每个节点都包含了前、后引用

那AQS内部的另一个条件队列又是什么样的数据结构呢?

条件队列数据结构

可见,条件队列为单向列表,只有指向下一个节点的引用;没有被唤醒的节点全部存储在条件队列上。上图描述的是一个长度为 5 的条件队列,即有5个线程执行了await()方法;与阻塞队列不同,条件队列没有常驻内存的“head结点”,且一个处于正常状态节点的waitStatus为 -2 。当有新节点加入时,将会追加至队列尾部

3.2、唤醒

当我们调用signal()方法时,会发生什么?我们还是拿长度为 5 的条件队列举例说明,在AQS内部会经历队列转移,即由条件队列转移至阻塞队列

signal条件队列向阻塞队列转移

而signalAll()执行时,具体执行流程与signal()类似,即会将条件队列中的所有节点全部转移至阻塞队列(并发度为1,按顺序依次激活)中,依靠阻塞队列自身依次唤醒的机制,达到激活所有线程的目的

四、JDK vs AQS

经过上文的介绍,似乎AQS做了与wait/notify相同的功能,相比较而言,甚至JDK的写法更简洁;那他们在性能上的表现如何呢?让我们来做个对比

4.1、对比

我们模拟这样的一个场景:启动10个线程,分别调用wait()方法,当所有线程都进入阻塞后,调用notifyAll(),10个线程均被唤醒并执行完毕后,方法结束。 上述方法执行10000次,对比JDK与AQS耗时

JDK测试代码:

public class ConditionCompareTest { @Test public void runTest() throws InterruptedException { long begin = System.currentTimeMillis(); for (int i = 0; i < 10000; i++) { if (i % 1000 == 0) { System.out.println(i); } jdkTest(); } long cost = System.currentTimeMillis() - begin; System.out.println("耗时: " + cost); } public void jdkTest() throws InterruptedException { Object lock = new Object(); List<Thread> list = Lists.newArrayList(); // 步骤一:启动10个线程,并进入wait等待 for (int i = 0; i < 10; i++) { Thread thread = new Thread(() -> { try { synchronized (lock) { lock.wait(); } } catch (InterruptedException e) { e.printStackTrace(); } }); thread.start(); list.add(thread); } // 步骤二:等待10个线程全部进入wait方法 while (true) { boolean allWaiting = true; for (Thread thread : list) { if (thread.getState() != Thread.State.WAITING) { allWaiting = false; break; } } if (allWaiting) { break; } } // 步骤三:唤醒10个线程 synchronized (lock) { lock.notifyAll(); } // 步骤四:等待10个线程全部执行完毕 for (Thread thread : list) { thread.join(); } } }

AQS测试代码:

public class ConditionCompareTest { private ReentrantLock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); @Test public void runTest() throws InterruptedException { long begin = System.currentTimeMillis(); for (int i = 0; i < 10000; i++) { if (i % 1000 == 0) { System.out.println(i); } aqsTest(); } long cost = System.currentTimeMillis() - begin; System.out.println("耗时: " + cost); } @Test public void aqsTest() throws InterruptedException { AtomicInteger lockedNum = new AtomicInteger(); List<Thread> list = Lists.newArrayList(); // 步骤一:启动10个线程,并进入wait等待 for (int i = 0; i < 10; i++) { Thread thread = new Thread(() -> { try { lock.lock(); lockedNum.incrementAndGet(); condition.await(); lock.unlock(); } catch (InterruptedException e) { e.printStackTrace(); } }); thread.start(); list.add(thread); } // 步骤二:等待10个线程全部进入wait方法 while (true) { if (lockedNum.get() != 10) { continue; } boolean allWaiting = true; for (Thread thread : list) { if (thread.getState() != Thread.State.WAITING) { allWaiting = false; break; } } if (allWaiting) { break; } } // 步骤三:唤醒10个线程 lock.lock(); condition.signalAll(); lock.unlock(); // 步骤四:等待10个线程全部执行完毕 for (Thread thread : list) { thread.join(); } } } 条件队列 耗时1 耗时2 耗时3 耗时4 耗时5 平均耗时(ms)
JDK   5000   5076   5054   5089   4942   5032  
AQS   5358   5440   5444   5473   5472   5437  
4.2、基准测试Q&A

基于以上的测试我们还是有一些疑问的,不要小看这些疑问,通过这些疑问我们可以把之前的知识点全都串联起来

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwjwdw.html