Java 8新特性之CompletableFuture:组合式异步编程(3)

List<Shop> shops = Arrays.asList(
                new Shop("one"),
                new Shop("two"),
                new Shop("three"),
                new Shop("four"));


        long start = System.nanoTime();
        List<String> str = shops.stream().map(shop -> String.format("%s price: %.2f", shop.getName(), shop.getPrice(shop.getName()))).collect(toList());
        System.out.print(str);
        long end = System.nanoTime();
        System.out.print((end - start) / 1000000);

[one price: 161.83, two price: 126.04, three price: 153.20, four price: 166.06]
4110

每次调用getPrice方法都会阻塞1秒钟,对付这种我们可以使用并行流来进行优化:

List<String> str = shops.parallelStream().map(shop -> String.format("%s price: %.2f", shop.getName(), shop.getPrice(shop.getName()))).collect(toList());

1137

明显速度提升了,现在对四个商品查询 实现了并行,所以只耗时1秒多点,下面我们尝试CompletableFuture:

List<CompletableFuture<String>> str2 = shops.stream().map(shop->
                        CompletableFuture.supplyAsync(
                                ()->String.format("%s price: %.2f", shop.getName(), shop.getPrice(shop.getName())))).collect(toList());

我们使用工厂方法supplyAsync创建CompletableFuture对象,使用这种方式我们会得到一个List<CompletableFuture<String>>,列表中的每一个ComplatableFuture对象在计算完成后都会包含商品的名称。但是我们要求返回的是List<String>,所以需要等待所有的future执行完毕,再将里面的值提取出来,填充到列表中才能返回。

List<String> str3 =str2.stream().map(CompletableFuture::join).collect(toList());

为了返回List<String> 需要对str2添加第二个map操作,对List中的所有future对象执行join操作,一个接一个的等待他们的运行结束。CompletableFuture类中的join和Future接口中的get方法有相同的含义,并且声明在Future接口中,唯一的不同是join不会抛出任何检测到的异常。

1149

现在使用了两个不同的Stream流水线,而不是在同一个处理流的流水线上一个接一个的防治两个map操作。考虑流操作之间的延迟特性,如果你在单一流水线中处理流,发向不同商家的请求只能以同步、顺序执行的方式才会成功。因此每个创建CompletableFuture对象只能在前一个操作结束之后,再join返回计算结果。

更好的解决方式

并行流的版本工作的非常好,那是因为他可以并行处理8个任务,获取操作系统线程数量:

System.out.print(Runtime.getRuntime().availableProcessors());

但是如果列表是9个呢?那么执行结果就会2秒。因为他最多只能让8个线程处于繁忙状态。 但是使用CompletableFuture允许你对执行器Executor进行配置,尤其是线程池的大小,这是并行流API无法实现的。

定制执行器

//创建一个线程池,线程池的数目为100何商店数目二者中较小的一个值
        final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread t = new Thread(r);
                        t.setDaemon(true); //使用守护线程 ---这种方式不会阻止程序的关停
                        return t;
                    }
                });

这个线程池是一个由守护线程构成的线程池,Java程序无法终止或退出正在运行中的线程,所以最后剩下的那个线程会由于一直等待无法发生的事件而引发问题。与此相反,如果将线程标记为守护进程,意味着程序退出时它也会被回收。这二者之间没有性能上的差异。现在可以将执行器作为第二个参数传递给supplyAsync方法了。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/e024cc39655ece152be623006fedb425.html