深入理解线程通信 (2)

CountDownLatch 可以实现 join 相同的功能,但是更加的灵活。

private static void countDownLatch() throws Exception{ int thread = 3 ; long start = System.currentTimeMillis(); final CountDownLatch countDown = new CountDownLatch(thread); for (int i= 0 ;i<thread ; i++){ new Thread(new Runnable() { @Override public void run() { LOGGER.info("thread run"); try { Thread.sleep(2000); countDown.countDown(); LOGGER.info("thread end"); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } countDown.await(); long stop = System.currentTimeMillis(); LOGGER.info("main over total time={}",stop-start); }

输出结果:

2018-03-16 20:19:44.126 [Thread-0] INFO c.c.actual.ThreadCommunication - thread run 2018-03-16 20:19:44.126 [Thread-2] INFO c.c.actual.ThreadCommunication - thread run 2018-03-16 20:19:44.126 [Thread-1] INFO c.c.actual.ThreadCommunication - thread run 2018-03-16 20:19:46.136 [Thread-2] INFO c.c.actual.ThreadCommunication - thread end 2018-03-16 20:19:46.136 [Thread-1] INFO c.c.actual.ThreadCommunication - thread end 2018-03-16 20:19:46.136 [Thread-0] INFO c.c.actual.ThreadCommunication - thread end 2018-03-16 20:19:46.136 [main] INFO c.c.actual.ThreadCommunication - main over total time=2012

CountDownLatch 也是基于 AQS(AbstractQueuedSynchronizer) 实现的,更多实现参考 ReentrantLock 实现原理

初始化一个 CountDownLatch 时告诉并发的线程,然后在每个线程处理完毕之后调用 countDown() 方法。

该方法会将 AQS 内置的一个 state 状态 -1 。

最终在主线程调用 await() 方法,它会阻塞直到 state == 0 的时候返回。

CyclicBarrier 并发工具 private static void cyclicBarrier() throws Exception { CyclicBarrier cyclicBarrier = new CyclicBarrier(3) ; new Thread(new Runnable() { @Override public void run() { LOGGER.info("thread run"); try { cyclicBarrier.await() ; } catch (Exception e) { e.printStackTrace(); } LOGGER.info("thread end do something"); } }).start(); new Thread(new Runnable() { @Override public void run() { LOGGER.info("thread run"); try { cyclicBarrier.await() ; } catch (Exception e) { e.printStackTrace(); } LOGGER.info("thread end do something"); } }).start(); new Thread(new Runnable() { @Override public void run() { LOGGER.info("thread run"); try { Thread.sleep(5000); cyclicBarrier.await() ; } catch (Exception e) { e.printStackTrace(); } LOGGER.info("thread end do something"); } }).start(); LOGGER.info("main thread"); }

CyclicBarrier 中文名叫做屏障或者是栅栏,也可以用于线程间通信。

它可以等待 N 个线程都达到某个状态后继续运行的效果。

首先初始化线程参与者。

调用 await() 将会在所有参与者线程都调用之前等待。

直到所有参与者都调用了 await() 后,所有线程从 await() 返回继续后续逻辑。

运行结果:

2018-03-18 22:40:00.731 [Thread-0] INFO c.c.actual.ThreadCommunication - thread run 2018-03-18 22:40:00.731 [Thread-1] INFO c.c.actual.ThreadCommunication - thread run 2018-03-18 22:40:00.731 [Thread-2] INFO c.c.actual.ThreadCommunication - thread run 2018-03-18 22:40:00.731 [main] INFO c.c.actual.ThreadCommunication - main thread 2018-03-18 22:40:05.741 [Thread-0] INFO c.c.actual.ThreadCommunication - thread end do something 2018-03-18 22:40:05.741 [Thread-1] INFO c.c.actual.ThreadCommunication - thread end do something 2018-03-18 22:40:05.741 [Thread-2] INFO c.c.actual.ThreadCommunication - thread end do something

可以看出由于其中一个线程休眠了五秒,所有其余所有的线程都得等待这个线程调用 await() 。

该工具可以实现 CountDownLatch 同样的功能,但是要更加灵活。甚至可以调用 reset() 方法重置 CyclicBarrier (需要自行捕获 BrokenBarrierException 处理) 然后重新执行。

线程响应中断 public class StopThread implements Runnable { @Override public void run() { while ( !Thread.currentThread().isInterrupted()) { // 线程执行具体逻辑 System.out.println(Thread.currentThread().getName() + "运行中。。"); } System.out.println(Thread.currentThread().getName() + "退出。。"); } public static void main(String[] args) throws InterruptedException { Thread thread = new Thread(new StopThread(), "thread A"); thread.start(); System.out.println("main 线程正在运行") ; TimeUnit.MILLISECONDS.sleep(10) ; thread.interrupt(); } }

输出结果:

thread A运行中。。 thread A运行中。。 thread A退出。。

可以采用中断线程的方式来通信,调用了 thread.interrupt() 方法其实就是将 thread 中的一个标志属性置为了 true。

并不是说调用了该方法就可以中断线程,如果不对这个标志进行响应其实是没有什么作用(这里对这个标志进行了判断)。

但是如果抛出了 InterruptedException 异常,该标志就会被 JVM 重置为 false。

线程池 awaitTermination() 方法

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wppsyj.html