库存-并发学习笔记 (11)

CompletionService 的实现类 ExecutorCompletionService,需要你自己创建线程池,虽看上去有些啰嗦,但好处是你可以让多个 ExecutorCompletionService 的线程池隔离,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险。

课后思考

本章使用 CompletionService 实现了一个询价应用的核心功能,后来又有了新的需求,需要计算出最低报价并返回,下面的示例代码尝试实现这个需求,你看看是否存在问题呢?

// 创建线程池 ExecutorService executor = Executors.newFixedThreadPool(3); // 创建CompletionService CompletionService<Integer> cs = new ExecutorCompletionService<>(executor); // 异步向电商S1询价 cs.submit(()->getPriceByS1()); // 异步向电商S2询价 cs.submit(()->getPriceByS2()); // 异步向电商S3询价 cs.submit(()->getPriceByS3()); // 将询价结果异步保存到数据库 // 并计算最低报价 AtomicReference<Integer> m = new AtomicReference<>(Integer.MAX_VALUE); for (int i=0; i<3; i++) { executor.execute(()->{ Integer r = null; try { r = cs.take().get(); } catch (Exception e) {} save(r); m.set(Integer.min(m.get(), r)); }); } return m;

Copy

解答

以上代码无法保证三个线程 和 主线程 return m的顺序。可以加个CountDownLatch 来保证线程执行完成 再让主线程return。

26 | Fork/Join:单机版的MapReduce

笔记

任务的角度看待并发编程

不难发现线程池、Future、CompletableFuture、CompletionService都是站在任务的角度看待并发编程,将视野扩大,不再将精力都浪费在线程的协作上

对于简单的并行任务,可以通过线程池+Future

如果任务之间有耦合关系,不管是AND或者OR,都可以CompletableFuture来解决

如果是批量任务,可以通过CompletionService来解决

并发编程关注的三个问题

互斥、

实现互斥的方案:锁:Synchronized、ReentrantLock、ReadWriteLock、StampedLock、、、

分工、

实现分工的方案:线程池、CompletionService、CompletableFuture

协作

线程协作的方案:管程:synchronized+wait+notify、Lock+Condition、CountDownLatch、CyclicBarrier

从更高的视野看待并发编程

任务

线程任务调度方式:线程池+Future、CompletionService、CompletableFuture

任务类型

并行任务:线程池 + Future + 阻塞队列、CompletableFuture

批量任务:CompletionService

聚合任务:CompletableFuture

分治任务:Fork/Join

Fork/Join

理解

Fork对应任务分解

join对应任务聚合

工具

ForkJoinPool

生产者-消费者模型

任务窃取当pool中的呃线程空闲了,会去窃取其他工作队列中的任务。TODO 确认是一个等待池还是每个线程有一个等待池

ForkJoinTask

抽象实现类:RecursiveAction(compute()无返回值)

抽象实现类:RecursiveTask(compute()有返回值)
以上两者的关系类似 ThreadPoolExecutor 和 Runnable的关系

总结

Fork/Join 并行计算框架的核心组件是 ForkJoinPool。ForkJoinPool 支持任务窃取机制,能够让所有线程的工作量基本均衡,
不会出现有的线程很忙,而有的线程很闲的状况,所以性能很好。Java 1.8 提供的 Stream API 里面并行流也是以 ForkJoinPool
为基础的。不过需要你注意的是,默认情况下所有的并行流计算都共享一个 ForkJoinPool,
这个共享的 ForkJoinPool 默认的线程数是 CPU 的核数;如果所有的并行流计算都是 CPU 密集型计算的话,完全没有问题,
但是如果存在 I/O 密集型的并行流计算,那么很可能会因为一个很慢的 I/O 计算而拖慢整个系统的性能。
所以建议用不同的 ForkJoinPool 执行不同类型的计算任务。

27 | 并发工具类模块热点问题答疑

笔记

注意while(true)的问题

一般while(true)需要break条件,一般是超时时间,不然容易导致死循环

while(true) & Lock和Condition里面的活锁问题

notifyAll和signalAll

一般使用All比notify和signal更安全

Semaphore 需要锁中锁

Semaphore 允许多个线程访问一个临界区,这也是一把双刃剑,当多个线程进入临界区时,如果需要访问共享变量就会存在并发问题,所以必须加锁,也就是说 Semaphore 需要锁中锁。

个人理解,Semaphore不是一个锁,只是一个并发工具,所以遇到共享变量问题,依然是需要锁的

锁的申请和释放要成对出现

回调总要关心执行线程是谁

当看到回调函数的时候,一定问一问执行回调函数的线程是谁

共享线程池:有福同享就要有难同当

对于I/O密集型和CPU密集型的线程池要做业务隔离

线上问题定位的利器:线程栈 dump

善于利用JSP和jstack命令

28 | Immutability模式:如何利用不变性解决并发问题?

快速实现具备不可变性的类

将一个类所有的属性都设置成 final 的,并且只允许存在只读方法,那么这个类基本上就具备不可变性了

将类也设置成 final 保证不能通过继承修改该类的final属性

String

String 类是final的,就是为了保证线程安全的

String 类的字符串替换操作 String#replace方法如何保证的线程安全?

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zygwfg.html