CompleteFuture实现简单的任务编排实践 一:前言
CompleteFuture是java8 新提供的API,是对函数式编程思想的体现,提供了很多的对于函数式编程支持。不止有同步处理功能,还有异步处理能力。
通过函数式编程可以实现线程的简单任务编排。高效,整洁实现多线程异步编程。
二:详细介绍 CompleteFuture 提供的API中以ansy结尾的都是异步处理的。
异步执行任务,并返回结果:supplyAsync 异步处理,并返回结果,默认使用 ForkJoinPool.commonPool()线程池,同时提供支持自定义线程池的API。
CompletableFuture.supplyAsync(() -> "HELLO"); // 自定义线程池 CompletableFuture.supplyAsync(()->"hello",ES);异步执行任务,不返回结果:runAsync
CompletableFuture.runAsync(() -> System.out.println("HELLO WORLD !")); CompletableFuture.runAsync(() -> System.out.println("HELLO WORLD !"),ES);依赖单一阶段:thenApply thenApplyAsync
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "HELLO") .thenApply(a -> return a + " lili!"; });组合与撰写:thenCompose() ,thenCombine(),thenCombineAsync.
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "hello") .thenCompose(res -> CompletableFuture.supplyAsync(() -> res + " lili")) .thenCompose(res -> CompletableFuture.supplyAsync(() -> res + " lucy")); // 执行结果: =====> hello lili lucy // mian线程下同步执行。 CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "hello") .thenCompose(res -> CompletableFuture.supplyAsync(() -> res + " lili")) .thenCompose(res -> CompletableFuture.supplyAsync(() -> res + " lucy")) .thenCombineAsync(CompletableFuture.supplyAsync(() -> " how are you!"), (a, b) -> a + b); log.info("=====> {}", f1.get()); // 执行结果: =====> hello lili lucy how are you!依赖两个任务中的一个:applyToEither() ,那个任务先结束,就依赖那个任务。
CompletableFuture<String> voidCompletableFuture = CompletableFuture.supplyAsync(() -> { try {TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace();} return "lucy"; }).applyToEither(CompletableFuture.supplyAsync(() -> { try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();} return "lili"; }), a -> "hello " + a); log.info("ret ====> {}",voidCompletableFuture.get()); // 执行结果: ret ====> hello lili 如果下面sleep改成3s,执行结果:ret ====> hello lucy消费型,依赖单阶段: thenAccept(),thenAcceptAsync()
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "hello") .thenAcceptAsync(a -> { a = a + " lucy !"; log.info("ret ======> {}", a); }); log.info(" ======== end ========================"); // 执行结果:ret ======> hello lucy ! 而且是异步的,不会阻塞主线程,下面的end是先打印出来的消费型,依赖两个任务都完成:thenAcceptBoth(),thenAcceptBothAsync()
CompletableFuture.supplyAsync(() -> "hello") .thenAcceptBoth(CompletableFuture.supplyAsync(() -> " lili"), (a, b) -> { try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } log.info("=======>{}", a + b); }); // 执行结果:=======>hello lili消费型:acceptEither() 依赖两个任务中先执行结束的那个
CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } return "lucy"; }).acceptEither(CompletableFuture.supplyAsync(() -> "lili"), a -> { log.info("hello {}", a); }); // 执行结果:hello lili消费型,无论正常,还是异常都会消费处理,而且不会吞掉异常 whenComplete(),whenCompleteAsync()
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { if (ThreadLocalRandom.current().nextInt(2) < 2) { throw new RuntimeException("error"); } return "hello"; }).whenComplete((a, e) -> { log.info("ret -> {}", a + " lili!"); log.error("error", e); }); log.info("future.get()-->{}", future.get()); // 执行结果:ret -> null lili! 而且打印两次错误日志,一次是log打印,一次是get的时候。产出型,无论正常还是异常都是处理,并返回结果。handle,handleAsync
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello") .handle((a, e) -> a + " lili!"); log.info("ret ==> {}", future.get()); // 执行结果:ret ==> hello lili!产出型,异常时候进行处理,并产出,有点像try-catch(),exceptionally()
CompletableFuture<Object> f = CompletableFuture.supplyAsync(() -> "Hello") .thenApplyAsync(res -> res + " World") .thenApplyAsync( res -> { throw new RuntimeException(" test has error"); // return res + "!"; }) .exceptionally( e -> { log.error("exceptionally exception",e); return "出异常了。。"; }); log.info("ret ====> {}", f.get()); // 执行结果:ret ====> 出异常了。。 // 假如不抛出异常,执行结果:ret ====> Hello World!