CompleteFuture实现简单的任务编排实践

CompleteFuture实现简单的任务编排实践 一:前言

​ CompleteFuture是java8 新提供的API,是对函数式编程思想的体现,提供了很多的对于函数式编程支持。不止有同步处理功能,还有异步处理能力。

通过函数式编程可以实现线程的简单任务编排。高效,整洁实现多线程异步编程。

二:详细介绍

​ CompleteFuture 提供的API中以ansy结尾的都是异步处理的。

异步执行任务,并返回结果:supplyAsync 异步处理,并返回结果,默认使用 ForkJoinPool.commonPool()线程池,同时提供支持自定义线程池的API。

CompletableFuture.supplyAsync(() -> "HELLO"); // 自定义线程池 CompletableFuture.supplyAsync(()->"hello",ES);

异步执行任务,不返回结果:runAsync

CompletableFuture.runAsync(() -> System.out.println("HELLO WORLD !")); CompletableFuture.runAsync(() -> System.out.println("HELLO WORLD !"),ES);

依赖单一阶段:thenApply thenApplyAsync

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "HELLO") .thenApply(a -> return a + " lili!"; });

组合与撰写:thenCompose() ,thenCombine(),thenCombineAsync.

CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "hello") .thenCompose(res -> CompletableFuture.supplyAsync(() -> res + " lili")) .thenCompose(res -> CompletableFuture.supplyAsync(() -> res + " lucy")); // 执行结果: =====> hello lili lucy // mian线程下同步执行。 CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "hello") .thenCompose(res -> CompletableFuture.supplyAsync(() -> res + " lili")) .thenCompose(res -> CompletableFuture.supplyAsync(() -> res + " lucy")) .thenCombineAsync(CompletableFuture.supplyAsync(() -> " how are you!"), (a, b) -> a + b); log.info("=====> {}", f1.get()); // 执行结果: =====> hello lili lucy how are you!

依赖两个任务中的一个:applyToEither() ,那个任务先结束,就依赖那个任务。

CompletableFuture<String> voidCompletableFuture = CompletableFuture.supplyAsync(() -> { try {TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace();} return "lucy"; }).applyToEither(CompletableFuture.supplyAsync(() -> { try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();} return "lili"; }), a -> "hello " + a); log.info("ret ====> {}",voidCompletableFuture.get()); // 执行结果: ret ====> hello lili 如果下面sleep改成3s,执行结果:ret ====> hello lucy

消费型,依赖单阶段: thenAccept(),thenAcceptAsync()

CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "hello") .thenAcceptAsync(a -> { a = a + " lucy !"; log.info("ret ======> {}", a); }); log.info(" ======== end ========================"); // 执行结果:ret ======> hello lucy ! 而且是异步的,不会阻塞主线程,下面的end是先打印出来的

消费型,依赖两个任务都完成:thenAcceptBoth(),thenAcceptBothAsync()

CompletableFuture.supplyAsync(() -> "hello") .thenAcceptBoth(CompletableFuture.supplyAsync(() -> " lili"), (a, b) -> { try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } log.info("=======>{}", a + b); }); // 执行结果:=======>hello lili

消费型:acceptEither() 依赖两个任务中先执行结束的那个

CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } return "lucy"; }).acceptEither(CompletableFuture.supplyAsync(() -> "lili"), a -> { log.info("hello {}", a); }); // 执行结果:hello lili

消费型,无论正常,还是异常都会消费处理,而且不会吞掉异常 whenComplete(),whenCompleteAsync()

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { if (ThreadLocalRandom.current().nextInt(2) < 2) { throw new RuntimeException("error"); } return "hello"; }).whenComplete((a, e) -> { log.info("ret -> {}", a + " lili!"); log.error("error", e); }); log.info("future.get()-->{}", future.get()); // 执行结果:ret -> null lili! 而且打印两次错误日志,一次是log打印,一次是get的时候。

产出型,无论正常还是异常都是处理,并返回结果。handle,handleAsync

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello") .handle((a, e) -> a + " lili!"); log.info("ret ==> {}", future.get()); // 执行结果:ret ==> hello lili!

产出型,异常时候进行处理,并产出,有点像try-catch(),exceptionally()

CompletableFuture<Object> f = CompletableFuture.supplyAsync(() -> "Hello") .thenApplyAsync(res -> res + " World") .thenApplyAsync( res -> { throw new RuntimeException(" test has error"); // return res + "!"; }) .exceptionally( e -> { log.error("exceptionally exception",e); return "出异常了。。"; }); log.info("ret ====> {}", f.get()); // 执行结果:ret ====> 出异常了。。 // 假如不抛出异常,执行结果:ret ====> Hello World!

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwyxxy.html