源码分析:Phaser 之更灵活的同步屏障

Phaser 是 JDK 1.7 开始提供的一个可重复使用的同步屏障,功能类似于CyclicBarrier和CountDownLatch,但使用更灵活,支持对任务的动态调整,并支持分层结构来达到更高的吞吐量。

Registration(注册)

与其他屏障的情况不同,在 Phaser 上注册同步的参与方的数量可能随时间而变化。任务可以在任何时候注册(使用方法register、bulkRegister或建立初始参与方数量的构造函数),可以在任何到达时取消注册(使用arriveAndDeregister),注册和注销只影响内部计数,任务无法查询它们是否已注册。

Synchronization(同步)

像CyclicBarrier,Phaser也可以重复await。方法arriveAndAwaitAdvance()有效果类似于CyclicBarrier.await 。phaser的每一代都有一个相关的phase number,初始值为0,当所有注册的任务都到达phaser时phase+1,到达最大值(Integer.MAX_VALUE)之后清零。使用phase number可以独立控制到达phaser 和 等待其他线程 的动作,通过下面两种类型的方法:

Arrival(到达机制)
arrive和arriveAndDeregister方法记录到达状态。
这些方法不会阻塞,但是会返回一个相关的arrival phase number;也就是说,phase number用来确定到达状态。当所有任务都到达给定phase时,可以执行一个可选的函数,这个函数通过重写onAdvance方法实现,通常可以用来控制终止状态。
重写此方法类似于为CyclicBarrier提供一个barrierAction(执行的命令线程),但比它更灵活。

Waiting(等待机制)
awaitAdvance方法需要一个表示 arrival phase number 的参数,并且在phaser前进到与给定phase不同的phase时返回。和CyclicBarrier不同,即使等待线程已经被中断,awaitAdvance方法也会一直等待。中断状态和超时时间同样可用,但是当任务等待中断或超时后未改变phaser的状态时会遭遇异常。如果有必要,在方法forceTermination之后可以执行这些异常的相关的handler进行恢复操作,Phaser也可能被ForkJoinPool中的任务使用,这样在其他任务阻塞等待一个phase时可以保证足够的并行度来执行任务。

Termination(终止机制)

可以用isTerminated方法检查phaser的终止状态。

在终止时,所有同步方法立刻返回一个负值。

在终止时尝试注册也没有效果。当调用onAdvance返回true时Termination被触发。当deregistration操作使已注册的parties变为0时,onAdvance的默认实现就会返回true。也可以重写onAdvance方法来定义终止动作。forceTermination方法也可以释放等待线程并且允许它们终止。

Tiering(分层结构)

Phaser支持分层结构(树状构造)来减少竞争。

注册了大量parties的Phaser可能会因为同步竞争消耗很高的成本, 因此可以设置一些子Phaser来共享一个通用的parent。这样的话即使每个操作消耗了更多的开销,但是会提高整体吞吐量。
在一个分层结构的phaser里,子节点phaser的注册和取消注册都通过父节点管理。

子节点phaser通过构造或方法register、bulkRegister进行首次注册时,在其父节点上注册。子节点phaser通过调用arriveAndDeregister进行最后一次取消注册时,也在其父节点上取消注册。

Monitoring(状态监控)

由于同步方法可能只被已注册的parties调用,所以phaser的当前状态也可能被任何调用者监控。在任何时候,可以通过getRegisteredParties获取parties数,其中getArrivedParties方法返回已经到达当前phase的parties数。当剩余的parties(通过方法getUnarrivedParties获取)到达时,phase进入下一代。这些方法返回的值可能只表示短暂的状态,所以一般来说在同步结构里并没有啥卵用。

分层运行示意图

使用示例 void runTasks(List<Runnable> tasks) throws InterruptedException{ // "1" to register self final Phaser phaser = new Phaser(1); // create and start threads for (final Runnable task : tasks) { phaser.register(); new Thread() { @Override public void run() { // await all creation // 类似 CountDownLatch.await() 和 CyclicBarrier.await() System.out.println("等待所有的任务+1"); phaser.arriveAndAwaitAdvance(); task.run(); } }.start(); } // allow threads to start and deregister self TimeUnit.SECONDS.sleep(1); System.out.println("jinglingwang.cn 放行。。。。。。"); // 类似 CountDownLatch.countDown() 减到了0 和 CyclicBarrier 中的最后一个线程调用了await() phaser.arriveAndDeregister(); } 多阶段运行示例

这里的阶段有点类似多次使用CyclicBarrier,并不是Phaser的分层

void runTasks2() { // 定义阶段数 int phases = 3; // 进入下一个阶段需要的参与数(线程数) int parties = 5; // 自定义onAdvance https://jinglingwang.cn Phaser phaser = new Phaser(parties){ @Override protected boolean onAdvance(int phase,int registeredParties){ System.out.println("阶段phase: "+(phase +1) +" 执行完毕"); return phase > phases || registeredParties == 0; } }; for(int i = 1; i <= parties; i++){ new Thread(()->{ for(int j = 1; j <= phases; j++){ System.out.println(Thread.currentThread().getName() + " doing 阶段:"+ j); phaser.arriveAndAwaitAdvance(); } },"Thread-"+i).start(); } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wssyfy.html