并发包下常见的同步工具类详解(CountDownLatch,CyclicBarrier,Semaphore)

1. 前言

在实际开发中,碰上CPU密集且执行时间非常耗时的任务,通常我们会选择将该任务进行分割,以多线程方式同时执行若干个子任务,等这些子任务都执行完后再将所得的结果进行合并。这正是著名的map-reduce思想,不过map-reduce通常被用在分布式计算的语境下,这里举这个例子只是为了说明对多线程并发执行流程进行控制的重要性,比如某些线程必须等其他线程执行完后才能开始它的工作。使用jdk中的内置锁或者重入锁配合等待通知机制可以实现这个需求,但是会比较麻烦。因为不管是内置还是重入锁,它们关注的重点在于如何协调多线程对共享资源的访问,而不是协调特定线程的执行次序,完成复杂的并发流程控制。好在JDK在并发包下提供了CountDownLatch,CyclicBarrier,Semaphore等并发工具,可以让我们站在更高的角度思考并解决这个问题。

2. 闭锁CountDownLatch 2.1 CountDownLatch功能简介

CountDownLatch通常称之为闭锁。它可以使一个或一批线程在闭锁上等待,等到其他线程执行完相应操作后,闭锁打开,这些等待的线程才可以继续执行。确切的说,闭锁在内部维护了一个倒计数器。通过该计数器的值来决定闭锁的状态,从而决定是否允许等待的线程继续执行。该计数器的初始值由用户在创建闭锁对象时通过传入的构造参数决定,如下所示

/** * Constructs a {@code CountDownLatch} initialized with the given count. * * @param count the number of times {@link #countDown} must be invoked * before threads can pass through {@link #await} * @throws IllegalArgumentException if {@code count} is negative */ public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }

默认计数器初始值不能小于0,否则将抛出异常。

当计数器的值大于0时,该闭锁处于关闭状态,调用闭锁的await()方法将导致当前线程在闭锁上等待。

public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }

但是我们可以通过调用闭锁的countDown()方法来使闭锁的计数值减少

public void countDown() { sync.releaseShared(1); }

每调用一次countDown()方法都会使闭锁的计数值减少1,所以闭锁的计数器准确来说是个倒计数器。当计数值减少到0时,阻塞在闭锁上的线程将被唤醒从而继续执行。下面以一个类似map-reduce的例子来对CountDownLatch的用法做讲解。

2.2 使用CountDownLatch

为了计算一个CPU密集型的大任务,将该任务分割成10个子任务,交由开启的10个子线程去执行。当所有子任务执行完毕后,主线程再执行后续的工作。任务的执行时间以线程休眠进行模拟,整个流程以日志方式进行记录。完整代码如下

/** * @author: takumiCX * @create: 2018-09-17 **/ class CountDownLatchTest { static CountDownLatch countDownLatch; public static void main(String[] args) throws InterruptedException { int count=10; //初始化计数器值为10 countDownLatch=new CountDownLatch(count); //开启10个子线程执行子任务 for(int i=0;i<count;i++){ Thread thread = new Thread(new CountDownThread(countDownLatch,i)); thread.start(); } //主线程等待,直到所有子任务完成 countDownLatch.await(); //模拟主线程执行后续工作 TimeUnit.SECONDS.sleep(1); System.out.println("任务执行完毕!"); } private static class CountDownThread implements Runnable{ CountDownLatch countDownLatch; //子任务序号 int taskNum; public CountDownThread(CountDownLatch countDownLatch, int taskNum) { this.countDownLatch = countDownLatch; this.taskNum = taskNum; } @Override public void run() { try { //模拟子任务的执行 TimeUnit.MILLISECONDS.sleep(30); } catch (InterruptedException e) { e.printStackTrace(); } //任务执行完毕,则调用countDown方法使计数器值减少1 countDownLatch.countDown(); System.out.println("子任务:"+taskNum+" 执行完毕!"); } } }

结果如下所示

并发包下常见的同步工具类详解(CountDownLatch,CyclicBarrier,Semaphore)

可以看到主线程在所有子任务执行完前必须在闭锁上等待。当最后一个子任务完成后,它将被唤醒,从而可以继续之后的工作。

2.3 CountDownLatch原理浅析

CountDownLatch底层也是通过AQS实现的。和ReentrentLock以独占的方式获取和释放同步状态不同,CountDownLatch是以共享的方式获取和释放同步状态的。独占式和共享式的区别主要有以下几点:

1.独占式一次只允许一个线程获取同步状态,而共享式一次允许多个线程同时获取同步状态。

2.当在同步队列等待的线程被唤醒然后成功获取同步状态时,它还必须唤醒后续结点中的线程,并将这个过程传递下去,使得多个线程可以同时获取到同步状态。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyzypg.html