Java中的函数式编程(八)流Stream并行编程

在本系列文章的第一篇,我们提到了函数式编程的优点之一是“易于并发编程”。

Java作为一个多线程的语言,它通过 Stream 来提供了并发编程的便利性。

题外话:

严格来说,并发和并行是两个不同的概念。
“并发(Concurrency)”强调的是在同一时间开始执行多个任务,通常会涉及多线程之间的上下文切换;
“并行(Parallelism)”强调的是将一个大任务分解为多个小任务后,再同时执行这些小任务,得到多个中间结果后再汇总为一个最终结果。
但在多CPU和分布式的时代,并发和并行的概念联系越来越紧密。至少在Java的Stream中,我们可以将并发和并行理解为同一个意思:基于多线程技术,对一个大任务分拆为多个小任务,分配到不同的线程中执行,得到多个中间结果后再汇总为一个最终结果。

本文的示例代码可从gitee上获取:https://gitee.com/cnmemset/javafp

Stream的并行编程

并行编程是Stream的一个重要功能和特性。它的一个优点是:不管数据源是否线程安全,通过并行流(parallel stream)都可以轻松的实现并行编程。

Stream的并行编程,底层是基于 ForkJoinPool 技术来实现的。ForkJoinPool是Java 7引入的用于并行执行的任务框架,核心思想是将一个大任务拆分成多个小任务(即fork),然后再将多个小任务的处理结果汇总到一个结果上(即join)。此外,它也提供基本的线程池功能,譬如设置最大并发线程数,关闭线程池等。

在本系列之前的文章中,也零零散散的提到了一些关于并行编程的知识点。本文再做一个更系统的总结。

并行流(parallel stream)

Stream的并行操作都是基于并行流(parallel stream)。

生成一个并行流也非常简单:

1. 通过 Collection.parallelStream 方法可以得到一个并行流

2. 生成一个串行的Stream后,可以通过方法 BaseStream.parallel() 将一个串行流(serial stream)转换成并行流。当然,我们也可以通过方法 BaseStream.sequential() 将一个并行流转换成串行流。

通过方法 BaseStream.isParallel() 可以判断一个 stream 是否是并行流。

不管数据源是否线程安全(譬如ArrayList、HashSet,它们都不支持多线程),我们都可以使用parallelStream 轻松实现并行编程,不需要额外的线程同步操作,这是parallelStream 最大的优点。

顺序性

encounter order,指的是Stream中元素的出现顺序。如果觉得encounter order过于抽象,可以将它简单理解为数据源(data source)的元素顺序。本小节涉及到的有序或无序都特指encounter order。

一个Stream是否具备encounter order的有序性,取决于它的数据源(data source)和中间操作(intermediate operations)。例如,List或者数组的Steam是有序的,但HashSet的Steam则是无序的。而中间操作Stream.sorted,可以将一个无序的Stream转换成有序的;中间操作Stream.unordered 则将一个有序的Stream转换成无序的。

有趣的是,有些终止操作(terminal operations)是无视encounter order的。什么意思呢?以最常见的Stream.forEach 为例,在并行执行的时候,即使数据源是List,forEach方法处理元素的顺序也是无序的。要保证处理顺序,需要使用方法 Stream.forEachOrdered 。

示例代码:

public static void forEachExample() {     ArrayList<Integer> list = new ArrayList<>(Arrays.asList(1, 2, 3, 4, 5));       System.out.println("===forEach====");       // 在并行流中, forEach 方法是无视 Stream 的 encounter order 的     list.parallelStream().forEach(i -> {         System.out.println(i + ":thread-" + Thread.currentThread().getName());     });       System.out.println("===forEachOrdered====");       // 在并行流中, forEachOrdered 方法可以保持 encounter order     list.parallelStream().forEachOrdered(i -> {         System.out.println(i + ":thread-" + Thread.currentThread().getName());     }); }

上述代码的输出类似:

===forEach==== 3:thread-main 5:thread-ForkJoinPool.commonPool-worker-2 1:thread-main 4:thread-ForkJoinPool.commonPool-worker-3 2:thread-ForkJoinPool.commonPool-worker-1 ===forEachOrdered==== 1:thread-ForkJoinPool.commonPool-worker-4 2:thread-ForkJoinPool.commonPool-worker-1 3:thread-ForkJoinPool.commonPool-worker-1 4:thread-ForkJoinPool.commonPool-worker-1 5:thread-ForkJoinPool.commonPool-worker-1

可以看出,在并行执行时,forEach 是无视Stream的encounter order的,而 forEachOrdered 虽然也是在多线程环境下执行,但仍然可以保证Stream的encounter order。

在Stream并行编程中,理解encounter order很重要。因为对于大多数的Stream操作,即使是并行执行,如果Stream是有序的,那么操作后得到的Stream也保持有序。例如,对一个数据源为List [1,2,3] 的有序Stream,执行 map(x -> x * x) 操作后,结果一定是 [1, 4, 9]。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwzpxw.html