源码分析:CyclicBarrier 之循环栅栏

CyclicBarrier 是一个同步辅助工具,允许一组线程全部等待彼此达到共同屏障点,且等待的线程被释放后还可以重新使用,所以叫做Cyclic(循环的)。

应用场景

比如出去旅行时,导游需要等待所有的客人到齐后,导游才会给大家讲解注意事项等

官方示例

在JDK的源码注释中,提供了一个简单的示例demo,稍加修改后就可以运行

public class Solver { AtomicInteger sum = new AtomicInteger(0); // 自己新增的一个标识,true代表所有的计算完成了 volatile boolean done = false; final int N; final int[][] data; final CyclicBarrier barrier; class Worker implements Runnable { int myRow; Worker(int row) { myRow = row; } @Override public void run() { while (!done()) { int rowSum = Arrays.stream(data[myRow]).sum(); // 计算行的和 System.out.println("processRow(myRow):" + rowSum); sum.addAndGet(rowSum); try { barrier.await(); } catch (InterruptedException ex) { return; } catch (BrokenBarrierException ex) { return; } } } } private boolean done(){ return done; } public Solver(int[][] matrix) throws InterruptedException{ data = matrix; N = matrix.length; Runnable barrierAction = () -> { System.out.println("mergeRows(...):"+sum.get()); // 输出二维数组的总和 done = true; }; barrier = new CyclicBarrier(N, barrierAction); List<Thread> threads = new ArrayList<Thread>(N); for (int i = 0; i < N; i++) { Thread thread = new Thread(new Worker(i)); threads.add(thread); thread.start(); } // wait until done for (Thread thread : threads){ thread.join(); } } public static void main(String[] args) throws InterruptedException{ int[][] matrix = {{1,2,3},{4,5,6}}; Solver solver = new Solver(matrix); } } 源码分析 主要的属性 /** 防护栅栏入口的锁 */ private final ReentrantLock lock = new ReentrantLock(); /** 等待直到跳闸的条件 */ private final Condition trip = lock.newCondition(); /** 构造方法参数,在障碍被释放之前必须调用等待的线程数 */ private final int parties; /* 越过栅栏时运行的命令 */ private final Runnable barrierCommand; /** 当前的一代,控制CyclicBarrier的循环 */ private Generation generation = new Generation(); /** 记录仍在等待的参与方线程数量,初始值等于parties */ private int count; 主要内部类 /** 代:屏障的每次使用都表示为一个生成实例 */ private static class Generation { boolean broken = false; // 标识当前的栅栏已破坏或唤醒,jinglingwang.cn } 构造方法

一共有两个构造方法,第一个构造方法仅需要传入一个int值,表示调用等待的线程数;第二个构造方法多了一个runnable接口,当所有的线程越过栅栏时执行的命令,没有则为null;

public CyclicBarrier(int parties) { this(parties, null); } public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; // Runnable 命令线程 } await() 方法

每个需要在栅栏处等待的线程都需要显式地调用这个方法。

public int await() throws InterruptedException, BrokenBarrierException { try { // 调用await方法,0:不超时 return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } dowait() 方法

主要的障碍代码

private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { // 当前锁 final ReentrantLock lock = this.lock; // 加锁 lock.lock(); try { // 当前代 final Generation g = generation; // 检查当前代的状态,是否要抛出BrokenBarrierException异常 if (g.broken) throw new BrokenBarrierException(); // 当前线程被中断了 if (Thread.interrupted()) { // 屏障被打破 breakBarrier(); throw new InterruptedException(); } // count减一 int index = --count; // index等于0,说明最后一个线程到达了屏障处 if (index == 0) { // tripped boolean ranAction = false; // 标识Runnable 命令线程是否有执行 try { final Runnable command = barrierCommand; // 第二个构造方法的入参,需要运行的命令线程 if (command != null) command.run(); // 执行命令线程。by:jinglingwang.cn ranAction = true; nextGeneration(); // 更新重置整个屏障 return 0; } finally { if (!ranAction) // ranAction 没有被设置成true;被中断了 breakBarrier(); } } // 循环直到跳闸,断开,中断或超时 for (;;) { try { if (!timed) // 没有设超时时间,直接调用条件锁的await方法阻塞等待 trip.await(); else if (nanos > 0L) // 有超时时间 nanos = trip.awaitNanos(nanos); //调用条件锁的await方法阻塞等待一段时间 } catch (InterruptedException ie) { // 捕获中断异常 if (g == generation && ! g.broken) { breakBarrier(); //被中断,当前代会被标识成已被破坏 throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } // 如果上面代码没有异常,理论上只有被唤醒后才会执行到下面的代码 // 再次检查当前代是否已经被破坏 if (g.broken) throw new BrokenBarrierException(); // 正常来说,最后一个线程在执行上面的代码时,会调用nextGeneration,重新生成generation // 所以线程被唤醒后,这里条件会成立 if (g != generation) return index; // 超时检查 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); //抛出超时异常 } } } finally { // 释放锁 lock.unlock(); } } /** 重置屏障,回到初始状态,说明可以重复使用*/ private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; // 重置等的参与方线程数量计数,回到最初的状态 generation = new Generation(); } private void breakBarrier() { // 标识当前的栅栏状态 generation.broken = true; count = parties; // 条件锁,唤醒所有等待的线程,jinglingwang.cn trip.signalAll(); }

dowait() 方法过程总结:

参与方的多个线程执行逻辑代码后,分别调用await方法

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpxzjd.html