并发包下常见的同步工具类详解(CountDownLatch,CyclicBarrier,Semaphore) (3)

至此,CountDownLatch的原理就搞明白了,它是以AQS的共享模式来实现复杂的并发流程控制的。当其内部的计数器不为0时,调用其await方法将导致线程加入同步队列并阻塞。当调用countDown方法使计数器的值为0时,会唤醒队列中第一个等待的线程,之后由该线程唤醒后面的线程,以此类推,直到阻塞在闭锁上的线程都被成功唤醒。

3.循环屏障CyclicBarrier 3.1 CyclicBarrier功能简介

CyclicBarrier通常称为循环屏障。它和CountDownLatch很相似,都可以使线程先等待然后再执行。不过CountDownLatch是使一批线程等待另一批线程执行完后再执行;而CyclicBarrier只是使等待的线程达到一定数目后再让它们继续执行。故而CyclicBarrier内部也有一个计数器,计数器的初始值在创建对象时通过构造参数指定,如下所示

public CyclicBarrier(int parties) { this(parties, null); }

每调用一次await()方法都将使阻塞的线程数+1,只有阻塞的线程数达到设定值时屏障才会打开,允许阻塞的所有线程继续执行。除此之外,CyclicBarrier还有几点需要注意的地方:

1.CyclicBarrier的计数器可以重置而CountDownLatch不行,这意味着CyclicBarrier实例可以被重复使用而CountDownLatch只能被使用一次。而这也是循环屏障循环二字的语义所在。

2.CyclicBarrier允许用户自定义barrierAction操作,这是个可选操作,可以在创建CyclicBarrier对象时指定

public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }

一旦用户在创建CyclicBarrier对象时设置了barrierAction参数,则在阻塞线程数达到设定值屏障打开前,会调用barrierAction的run()方法完成用户自定义的操作。

3.2 使用CyclicBarrier

还是以多线程分割大任务并发执行的例子来进行讲解,不过这次情况要稍微复杂些。线程在执行完分配给它的子任务后不能立即退出,必须等待所有任务都完成后再执行释放资源的操作。而主线程在所有子任务都执行完毕后也要执行特定的操作,且该操作在线程释放资源前。所有操作都以打印日志的方式进行模拟。代码如下:

/** * @author: takumiCX * @create: 2018-09-18 **/ public class CyclicBarrierTest { static CyclicBarrier cyclicBarrier; public static void main(String[] args) { int count = 10; //当所有子任务都执行完毕时,barrierAction的run方法会被调用 cyclicBarrier = new CyclicBarrier(count, () -> System.out.println("执行barrierAction操作!")); //开启多个线程执行子任务 for(int i=0;i<count;i++){ new Thread(new CyclicBarrierThread(cyclicBarrier,i)).start(); } } private static class CyclicBarrierThread implements Runnable { public CyclicBarrier cyclicBarrier; //任务序号 public int taskNum; public CyclicBarrierThread(CyclicBarrier cyclicBarrier, int taskNum) { this.cyclicBarrier = cyclicBarrier; this.taskNum = taskNum; } @Override public void run() { //执行子任务 System.out.println("子任务:"+taskNum+" 执行完毕!"); try { //等待所有子任务执行完成 cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } //释放资源 System.out.println("线程:"+taskNum+" 释放资源!"); } } }

开启10个线程执行子任务,每个线程执行完子任务后在CyclicBarrier上等待。等到所有子任务完成后,用户设置自定义的barrierAction操作即被执行,之后屏障正式打开,阻塞的所有线程将完成释放资源的操作。
结果如下图所示

并发包下常见的同步工具类详解(CountDownLatch,CyclicBarrier,Semaphore)

3.3 CyclicBarrier原理浅析

CyclicBarrier内部使用ReentrentLock来实现线程同步,而通过Condition来实现线程的阻塞和唤醒。当计数器值为0时,首先会执行用户自定义的barrierAction操作。

int index = --count; //计数器值 if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; //用户自定义的barrierAction if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } }

之后再进行阻塞线程的唤醒,以及将计数器重置为初始值。这部分代码在nextGeneration()中

private void nextGeneration() { // signal completion of last generation trip.signalAll(); //唤醒所有的阻塞线程 // set up next generation count = parties; //计数器重置为初始值 generation = new Generation(); } 4. 信号量Semaphore 4.1 Semaphore功能简介

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyzypg.html