CompletionService 的实现类 ExecutorCompletionService,需要你自己创建线程池,虽看上去有些啰嗦,但好处是你可以让多个 ExecutorCompletionService 的线程池隔离,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险。
课后思考
本章使用 CompletionService 实现了一个询价应用的核心功能,后来又有了新的需求,需要计算出最低报价并返回,下面的示例代码尝试实现这个需求,你看看是否存在问题呢?
// 创建线程池 ExecutorService executor = Executors.newFixedThreadPool(3); // 创建CompletionService CompletionService<Integer> cs = new ExecutorCompletionService<>(executor); // 异步向电商S1询价 cs.submit(()->getPriceByS1()); // 异步向电商S2询价 cs.submit(()->getPriceByS2()); // 异步向电商S3询价 cs.submit(()->getPriceByS3()); // 将询价结果异步保存到数据库 // 并计算最低报价 AtomicReference<Integer> m = new AtomicReference<>(Integer.MAX_VALUE); for (int i=0; i<3; i++) { executor.execute(()->{ Integer r = null; try { r = cs.take().get(); } catch (Exception e) {} save(r); m.set(Integer.min(m.get(), r)); }); } return m;Copy
解答
以上代码无法保证三个线程 和 主线程 return m的顺序。可以加个CountDownLatch 来保证线程执行完成 再让主线程return。
26 | Fork/Join:单机版的MapReduce笔记
从任务的角度看待并发编程
不难发现线程池、Future、CompletableFuture、CompletionService都是站在任务的角度看待并发编程,将视野扩大,不再将精力都浪费在线程的协作上
对于简单的并行任务,可以通过线程池+Future
如果任务之间有耦合关系,不管是AND或者OR,都可以CompletableFuture来解决
如果是批量任务,可以通过CompletionService来解决
并发编程关注的三个问题
互斥、
实现互斥的方案:锁:Synchronized、ReentrantLock、ReadWriteLock、StampedLock、、、
分工、
实现分工的方案:线程池、CompletionService、CompletableFuture
协作
线程协作的方案:管程:synchronized+wait+notify、Lock+Condition、CountDownLatch、CyclicBarrier
从更高的视野看待并发编程
任务
线程任务调度方式:线程池+Future、CompletionService、CompletableFuture
任务类型
并行任务:线程池 + Future + 阻塞队列、CompletableFuture
批量任务:CompletionService
聚合任务:CompletableFuture
分治任务:Fork/Join
Fork/Join
理解
Fork对应任务分解
join对应任务聚合
工具
ForkJoinPool
生产者-消费者模型
任务窃取当pool中的呃线程空闲了,会去窃取其他工作队列中的任务。TODO 确认是一个等待池还是每个线程有一个等待池
ForkJoinTask
抽象实现类:RecursiveAction(compute()无返回值)
抽象实现类:RecursiveTask(compute()有返回值)
以上两者的关系类似 ThreadPoolExecutor 和 Runnable的关系
总结
Fork/Join 并行计算框架的核心组件是 ForkJoinPool。ForkJoinPool 支持任务窃取机制,能够让所有线程的工作量基本均衡,
不会出现有的线程很忙,而有的线程很闲的状况,所以性能很好。Java 1.8 提供的 Stream API 里面并行流也是以 ForkJoinPool
为基础的。不过需要你注意的是,默认情况下所有的并行流计算都共享一个 ForkJoinPool,
这个共享的 ForkJoinPool 默认的线程数是 CPU 的核数;如果所有的并行流计算都是 CPU 密集型计算的话,完全没有问题,
但是如果存在 I/O 密集型的并行流计算,那么很可能会因为一个很慢的 I/O 计算而拖慢整个系统的性能。
所以建议用不同的 ForkJoinPool 执行不同类型的计算任务。
笔记
注意while(true)的问题
一般while(true)需要break条件,一般是超时时间,不然容易导致死循环
while(true) & Lock和Condition里面的活锁问题
notifyAll和signalAll
一般使用All比notify和signal更安全
Semaphore 需要锁中锁
Semaphore 允许多个线程访问一个临界区,这也是一把双刃剑,当多个线程进入临界区时,如果需要访问共享变量就会存在并发问题,所以必须加锁,也就是说 Semaphore 需要锁中锁。
个人理解,Semaphore不是一个锁,只是一个并发工具,所以遇到共享变量问题,依然是需要锁的
锁的申请和释放要成对出现
回调总要关心执行线程是谁
当看到回调函数的时候,一定问一问执行回调函数的线程是谁
共享线程池:有福同享就要有难同当
对于I/O密集型和CPU密集型的线程池要做业务隔离
线上问题定位的利器:线程栈 dump
善于利用JSP和jstack命令
28 | Immutability模式:如何利用不变性解决并发问题?快速实现具备不可变性的类
将一个类所有的属性都设置成 final 的,并且只允许存在只读方法,那么这个类基本上就具备不可变性了
将类也设置成 final 保证不能通过继承修改该类的final属性
String
String 类是final的,就是为了保证线程安全的
String 类的字符串替换操作 String#replace方法如何保证的线程安全?