CountDownLatch和CyclicBarrier 傻傻的分不清?超长精美图文又来了 (2)

上述的例子还是过于简单,Oracle 官网 CountDownLatch 说明 有两个非常经典的使用场景,示例很简单,强烈建议查看相关示例代码,打开使用思路。我将两个示例代码以图片的形式展示在此处:

官网示例1

第一个是开始信号 startSignal,阻止任何工人 Worker 继续工作,直到司机 Driver 准备好让他们继续工作

第二个是完成信号 doneSignal,允许司机 Driver 等待,直到所有的工人 Worker 完成。

CountDownLatch和CyclicBarrier 傻傻的分不清?超长精美图文又来了

官网示例2

另一种典型的用法是将一个问题分成 N 个部分 (比如将一个大的 list 拆分成多分,每个 Worker 干一部分),Worker 执行完自己所处理的部分后,计数器减1,当所有子部分完成后,Driver 才继续向下执行

CountDownLatch和CyclicBarrier 傻傻的分不清?超长精美图文又来了

结合官网示例,相信你已经可以结合你自己的业务场景解,通过 CountDownLatch 解决一些串行瓶颈来提高运行效率了,会用还远远不够,咱得知道 CountDownLatch 的实现原理

源码分析

CountDownLatch 是 AQS 实现中的最后一个内容,有了前序文章的知识铺垫:

Java AQS队列同步器以及ReentrantLock的应用

Java AQS共享式获取同步状态及Semaphore的应用分析

当你看到 CountDownLatch 的源码内容,你会高兴的笑起来,内容真是太少了

CountDownLatch和CyclicBarrier 傻傻的分不清?超长精美图文又来了

展开类结构全部内容就这点东西

CountDownLatch和CyclicBarrier 傻傻的分不清?超长精美图文又来了

既然 CountDownLatch 是基于 AQS 实现的,那肯定也离不开对同步状态变量 state 的操作,我们在初始化的时候就将计数器的值赋值给了state

CountDownLatch和CyclicBarrier 傻傻的分不清?超长精美图文又来了

另外,它可以多个线程同时获取,那一定是基于共享式获取同步变量的用法了,所以它需要通过重写下面两个方法控制同步状态变量 state :

tryAcquireShared()

tryReleaseShared()

CountDownLatch 暴露给使用者的只有 await() 和 countDown() 两个方法,前者是阻塞自己,因为只有获取同步状态才会才会出现阻塞的情况,那自然是在 await() 的方法内部会用到 tryAcquireShared();有获取就要有释放,那后者 countDown() 方法内部自然是要用到 tryReleaseShared() 方法了

PS:如果你对上面这个很自然的推断理解有困难,强烈建议你看一下前序文章的铺垫,以防止知识断层带来的困扰

await()

先来看 await() 方法, 从方法签名上看,该方法会抛出 InterruptedException, 所以它是可以响应中断的,这个我们在 Java多线程中断机制 中明确说明过

public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }

其内部调用了同步器提供的模版方法 acquireSharedInterruptibly

public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // 如果监测到中断标识为true,会重置标识,然后抛出 InterruptedException if (Thread.interrupted()) throw new InterruptedException(); // 调用重写的 tryAcquireShared 方法,该方法结果如果大于零则直接返回,程序继续向下执行,如果小于零,则会阻塞自己 if (tryAcquireShared(arg) < 0) // state不等于0,则尝试阻塞自己 doAcquireSharedInterruptibly(arg); }

重写的 tryAcquireShared 方法非常简单, 就是判断同步状态变量 state 的值是否为 0, 如果为零 (子线程已经全部执行完毕)则返回1, 否则返回 -1

protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }

如果子线程没有全部执行完毕,则会通过 doAcquireSharedInterruptibly 方法阻塞自己,这个方法在 Java AQS共享式获取同步状态及Semaphore的应用分析 中已经仔细分析过了,这里就不再赘述了

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { // 再次尝试获取同步装阿嚏,如果大于0,说明子线程全部执行完毕,直接返回 int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // 阻塞自己 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }

await() 方法的实现就是这么简单,接下来看看 countDown() 的实现原理

countDown() public void countDown() { sync.releaseShared(1); }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwwggd.html