上述的例子还是过于简单,Oracle 官网 CountDownLatch 说明 有两个非常经典的使用场景,示例很简单,强烈建议查看相关示例代码,打开使用思路。我将两个示例代码以图片的形式展示在此处:
官网示例1第一个是开始信号 startSignal,阻止任何工人 Worker 继续工作,直到司机 Driver 准备好让他们继续工作
第二个是完成信号 doneSignal,允许司机 Driver 等待,直到所有的工人 Worker 完成。
官网示例2另一种典型的用法是将一个问题分成 N 个部分 (比如将一个大的 list 拆分成多分,每个 Worker 干一部分),Worker 执行完自己所处理的部分后,计数器减1,当所有子部分完成后,Driver 才继续向下执行
结合官网示例,相信你已经可以结合你自己的业务场景解,通过 CountDownLatch 解决一些串行瓶颈来提高运行效率了,会用还远远不够,咱得知道 CountDownLatch 的实现原理
源码分析CountDownLatch 是 AQS 实现中的最后一个内容,有了前序文章的知识铺垫:
Java AQS队列同步器以及ReentrantLock的应用
Java AQS共享式获取同步状态及Semaphore的应用分析
当你看到 CountDownLatch 的源码内容,你会高兴的笑起来,内容真是太少了
展开类结构全部内容就这点东西
既然 CountDownLatch 是基于 AQS 实现的,那肯定也离不开对同步状态变量 state 的操作,我们在初始化的时候就将计数器的值赋值给了state
另外,它可以多个线程同时获取,那一定是基于共享式获取同步变量的用法了,所以它需要通过重写下面两个方法控制同步状态变量 state :
tryAcquireShared()
tryReleaseShared()
CountDownLatch 暴露给使用者的只有 await() 和 countDown() 两个方法,前者是阻塞自己,因为只有获取同步状态才会才会出现阻塞的情况,那自然是在 await() 的方法内部会用到 tryAcquireShared();有获取就要有释放,那后者 countDown() 方法内部自然是要用到 tryReleaseShared() 方法了
PS:如果你对上面这个很自然的推断理解有困难,强烈建议你看一下前序文章的铺垫,以防止知识断层带来的困扰
await()先来看 await() 方法, 从方法签名上看,该方法会抛出 InterruptedException, 所以它是可以响应中断的,这个我们在 Java多线程中断机制 中明确说明过
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }其内部调用了同步器提供的模版方法 acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // 如果监测到中断标识为true,会重置标识,然后抛出 InterruptedException if (Thread.interrupted()) throw new InterruptedException(); // 调用重写的 tryAcquireShared 方法,该方法结果如果大于零则直接返回,程序继续向下执行,如果小于零,则会阻塞自己 if (tryAcquireShared(arg) < 0) // state不等于0,则尝试阻塞自己 doAcquireSharedInterruptibly(arg); }重写的 tryAcquireShared 方法非常简单, 就是判断同步状态变量 state 的值是否为 0, 如果为零 (子线程已经全部执行完毕)则返回1, 否则返回 -1
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }如果子线程没有全部执行完毕,则会通过 doAcquireSharedInterruptibly 方法阻塞自己,这个方法在 Java AQS共享式获取同步状态及Semaphore的应用分析 中已经仔细分析过了,这里就不再赘述了
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { // 再次尝试获取同步装阿嚏,如果大于0,说明子线程全部执行完毕,直接返回 int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // 阻塞自己 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }await() 方法的实现就是这么简单,接下来看看 countDown() 的实现原理
countDown() public void countDown() { sync.releaseShared(1); }