和朱晔一起复习Java并发(一):线程池 (19)

在刚才的测试中,我们用到了定时任务线程池,这里我们再做一个测试。
我们来测试一下scheduleAtFixedRate和scheduleWithFixedDelay的区别。
在下面的代码里,我们分别进行两次测试:

一次是让任务在固定的频率去执行,一次是让任务具有固定的延迟去执行

然后1秒后关闭定时任务线程池(也是通过这个定时任务的调度功能去实现)

主线程一直等待到线程池终止

@Test public void test1() throws InterruptedException { AtomicInteger scheduleAtFixedRateTotal = new AtomicInteger(); ScheduledExecutorService scheduleAtFixedRateExecutorService = Executors.newSingleThreadScheduledExecutor(); ScheduledFuture scheduleAtFixedRateTotalFuture = scheduleAtFixedRateExecutorService.scheduleAtFixedRate(() -> { try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } log.info("scheduleAtFixedRate:" + scheduleAtFixedRateTotal.incrementAndGet()); }, 0, 100, TimeUnit.MILLISECONDS); scheduleAtFixedRateExecutorService.schedule(() -> scheduleAtFixedRateTotalFuture.cancel(false), 1, TimeUnit.SECONDS); while (!scheduleAtFixedRateTotalFuture.isDone()) TimeUnit.MILLISECONDS.sleep(1); Assert.assertEquals(11, scheduleAtFixedRateTotal.get()); AtomicInteger scheduleWithFixedDelayTotal = new AtomicInteger(); ScheduledExecutorService scheduleWithFixedDelayExecutorService = Executors.newSingleThreadScheduledExecutor(); ScheduledFuture scheduleWithFixedDelayFuture = scheduleWithFixedDelayExecutorService.scheduleWithFixedDelay(() -> { try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } log.info("scheduleWithFixedDelay:" + scheduleWithFixedDelayTotal.incrementAndGet()); }, 0, 100, TimeUnit.MILLISECONDS); scheduleWithFixedDelayExecutorService.schedule(() -> scheduleWithFixedDelayFuture.cancel(false), 1, TimeUnit.SECONDS); while (!scheduleWithFixedDelayFuture.isDone()) TimeUnit.MILLISECONDS.sleep(1); Assert.assertEquals(5, scheduleWithFixedDelayTotal.get()); }

通过断言我们也可以知道,固定频率执行任务"忽略"了任务执行的时间,所以能执行更多次数,最终在1秒执行了11次。固定延迟执行任务是在任务完成后再延迟固定的间隔去执行,最终只能在1秒左右的时间执行5次。
下面是输出:

image_1dfl9s69u19tgh3s10r911j21t5g3k.png-520.1kB

在这里,忽略两个字,我打上了双引号,那是因为对于单线程的定时任务线程池,任务如果执行太慢,比频率还慢,那么线程池也没这个能力去fixRate执行。这里你可以尝试一下把休眠时间100ms修改为200ms,重新运行后结果如下:

image_1dfladh1p1k721fl31dc1tuv1t1e41.png-367.4kB


什么?单元测试还是通过的?难道不是应该运行了5次就结束了吗?注意观察,我们的线程池并没有在1秒后结束,而是维持了2秒多,还是老问题,因为是单线程,我们的关闭线程池的那个任务也无法及时得到执行。
尝试修改代码,把cancel任务放到独立的线程池去执行:

Executors.newSingleThreadScheduledExecutor().schedule(() -> scheduleAtFixedRateTotalFuture.cancel(false), 1, TimeUnit.SECONDS);

(或者也可以使用2个线程的定时任务线程池)
可以看到,这次的结果符合预期:

image_1dflam0bu12d21qik532581163n4e.png-162.3kB

ForkJoin线程池

从Java 1.8开始,Executors提供了newWorkStealingPool来获得一个ForkJoin线程池。
从命名可以看到,这是一个工作窃取线程池,传统的线程池具有公共的一个任务队列,在任务很多,任务执行很快(CPU密集型任务)的情况下,会发生比较多的竞争问题,而ForkJoin的话每一个线程都有自己的任务队列,如果自己的队列没有任务的话可以从其它队列窃取任务,这样确保了吞吐的同时,减少了竞争。我们写一段代码来比较一下:

image_1dflb1rdqa2q34inem1mb8end4r.png-194.3kB

@Slf4j public class ForkJoinPoolBenchmark { @Test public void test() throws InterruptedException { AtomicLong atomicLong = new AtomicLong(); StopWatch stopWatch = new StopWatch(); ExecutorService normal = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); ExecutorService forkjoin = Executors.newWorkStealingPool(Runtime.getRuntime().availableProcessors()); stopWatch.start("normal"); LongStream.rangeClosed(1, 10000000).forEach(__->normal.submit(atomicLong::incrementAndGet)); normal.shutdown(); normal.awaitTermination(1, TimeUnit.HOURS); stopWatch.stop(); long r = atomicLong.get(); stopWatch.start("forkjoin"); LongStream.rangeClosed(1, 10000000).forEach(__->forkjoin.submit(atomicLong::incrementAndGet)); forkjoin.shutdown(); forkjoin.awaitTermination(1, TimeUnit.HOURS); stopWatch.stop(); log.info(stopWatch.prettyPrint()); log.info("result:{},{}", r, atomicLong.get()); } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zgdxwx.html