异常处理CompletionStage exceptionally(fn); CompletionStage<R> whenComplete(consumer); CompletionStage<R> whenCompleteAsync(consumer); CompletionStage<R> handle(fn); CompletionStage<R> handleAsync(fn);
课后思考
创建采购订单的时候,需要校验一些规则,例如最大金额是和采购员级别相关的。有同学利用 CompletableFuture 实现了这个校验的功能,逻辑很简单,首先是从数据库中把相关规则查出来,然后执行规则校验。你觉得他的实现是否有问题呢?
//采购订单 PurchersOrder po; CompletableFuture<Boolean> cf = CompletableFuture.supplyAsync(()->{ //在数据库中查询规则 return findRuleByJdbc(); }).thenApply(r -> { //规则校验 return check(po, r); }); Boolean isOk = cf.join();解答
没有进行异常处理,
要指定专门的线程池做数据库查询(读数据库属于io操作,应该放在单独线程池,避免线程饥饿)
如果检查和查询都比较耗时,那么应该像之前的对账系统一样,采用生产者和消费者模式,让上一次的检查和下一次的查询并行起来。
25 | CompletionService:如何批量执行异步任务?CompletionService 批量提交异步任务
// 创建线程池 ExecutorService executor = Executors.newFixedThreadPool(3); // 异步向电商S1询价 Future<Integer> f1 = executor.submit( ()->getPriceByS1()); // 异步向电商S2询价 Future<Integer> f2 = executor.submit( ()->getPriceByS2()); // 异步向电商S3询价 Future<Integer> f3 = executor.submit( ()->getPriceByS3()); // 获取电商S1报价并保存 r=f1.get(); executor.execute(()->save(r)); // 获取电商S2报价并保存 r=f2.get(); executor.execute(()->save(r)); // 获取电商S3报价并保存 r=f3.get(); executor.execute(()->save(r));Future 实现“询价”程序
如上所示,需要一个个的get然后执行下一步操作,f1, f2, f3需要一次等待
// 创建阻塞队列 BlockingQueue<Integer> bq = new LinkedBlockingQueue<>(); //电商S1报价异步进入阻塞队列 executor.execute(()-> bq.put(f1.get())); //电商S2报价异步进入阻塞队列 executor.execute(()-> bq.put(f2.get())); //电商S3报价异步进入阻塞队列 executor.execute(()-> bq.put(f3.get())); //异步保存所有报价 for (int i=0; i<3; i++) { Integer r = bq.take(); executor.execute(()->save(r)); }阻塞队列的优化方案
如上所示 阻塞队列也能解决这种互相等待操作造成的资源浪费
利用 CompletionService 实现询价系统
CompletionService 内部实现了一个阻塞队列,默认 LinkedListBlockingQueue(建议覆盖,因为默认的是无界的)
CompletionService 会把Future对象放到阻塞队列中。
代码实现如下所示
TODO 验证是否是先执行完的,先入队
利用 CompletionService 实现 Dubbo 中的 Forking Cluster
Dubbo 中有一种叫做 Forking 的集群模式,这种集群模式下,支持并行地调用多个查询服务,只要有一个成功返回结果,整个服务就可以返回了。
// 创建线程池 ExecutorService executor = Executors.newFixedThreadPool(3); // 创建CompletionService CompletionService<Integer> cs = new ExecutorCompletionService<>(executor); // 用于保存Future对象 List<Future<Integer>> futures = new ArrayList<>(3); //提交异步任务,并保存future到futures futures.add( cs.submit(()->geocoderByS1())); futures.add( cs.submit(()->geocoderByS2())); futures.add( cs.submit(()->geocoderByS3())); // 获取最快返回的任务执行结果 Integer r = 0; try { // 只要有一个成功返回,则break for (int i = 0; i < 3; ++i) { r = cs.take().get(); //简单地通过判空来检查是否成功返回 if (r != null) { break; } } } finally { //取消所有任务 for(Future<Integer> f : futures) f.cancel(true); } // 返回结果 return r;总结
当需要批量提交异步任务的时候建议你使用 CompletionService。CompletionService 将线程池 Executor 和阻塞队列 BlockingQueue 的功能融合在了一起,能够让批量异步任务的管理更简单。除此之外,CompletionService 能够让异步任务的执行结果有序化,先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性,避免无谓的等待,同时还可以快速实现诸如 Forking Cluster 这样的需求。