Java中的函数式编程(八)流Stream并行编程 (3)

但使用 ConcurrentHashMap 有个缺点:它不能保证 Stream 的 encounter order,所以只有当你确定元素的顺序不影响最终结果时,才使用与ConcurrentMap相关的方法。

最后,还要注意,只有在并行编程时,我们才要考虑使用 toConcurrentMap 或者 groupingByConcurrent 方法,否则会因为不必要的线程同步操作,反而影响了性能。

规约操作的注意事项

在本系列介绍规约操作的文章中,已经提到了很多关于并行编程的注意事项,本小节将它们汇总起来,供大家参考。

reduce(T, BinaryOperator)

reduce(T, BinaryOperator)的方法签名是:

T reduce(T identity, BinaryOperator<T> accumulator);

其中 T 是 Stream 的泛型类型。

参数 identity 是规约操作的初始值。

参数accumulator 要求满足结合律(associative)。

参数 accumulator 定义的函数必须满足结合律(associative),否则在一些顺序不确定的或并行的场景中会导致不正确的结果。

此外,如果是并行执行的话,对参数 identity 还有一个要求:对任意值 t,要满足 accumulator.apply(identity, t) == t 。否则,会导致错误的结果。

public static void reduceStream2() {     List<Integer> list = Arrays.asList(1, 3, 5, 7, 9);       // 这是正确的范例:因为数字 0 是累加操作的 identity 。     sum = list.parallelStream().reduce(0, (x, y) -> x + y); // 输出为 0+1+3+5+7+9 = 25     System.out.println(sum);       // 这是错误的范例:因为数字 5 并不是累加操作的 identity 。     sum = list.parallelStream().reduce(5, (x, y) -> x + y); // 本意是输出为 5+1+3+5+7+9 = 30,但实际上会输出一个比30大的数字。     System.out.println(sum); } reduce(U, BiFunction, BinaryOperator)

具体的方法签名是:

<U> U reduce(U identity,              BiFunction<U, ? super T, U> accumulator,              BinaryOperator<U> combiner);

其中 U 是返回值的类型,T 是 Stream 的泛型类型。

参数 identity 是规约操作的初始值。

参数accumulator 是与Stream中单个元素的合并操作,等同于函数 U apply(U u, T t)。

参数 combiner 是将并行执行得到的多个中间结果进行合并的操作,等同于函数 U apply(U u1, U u2)。

在并行编程中,对3个参数都有一些特殊要求:

1. 参数 combiner 必须满足结合律

2. 参数 identity,对于任意值 u,必须满足 combiner.apply(identity, u) == u

3. 参数 accumulator 和 combiner 两者必须兼容,即对于任意值 u 和 t,必须满足:

combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)

collect(Supplier, BiConsumer, BiConsumer)

ollect方法的签名是:

<R> R collect(Supplier<R> supplier,               BiConsumer<R, ? super T> accumulator,               BiConsumer<R, R> combiner);

其中 R 是返回值的类型,通常是一个容器类(例如 Collection 或 Map)。T 是Stream中的元素类型。

参数 supplier 是用来创建一个容器实例的函数。

参数 accumulator 是将Stream中的一个元素合并到容器中的函数。

参数 combiner 是将两个容器归并为一个容器的函数,只在并行执行的时候用到。

在并行执行的场景下,我们有一些额外的要求:

1. combiner函数满足结合律

2. 要求combiner 和 accumulator 是兼容的(compatible),即对于任意的r和t, combiner.accept(r, accumulator.accept(supplier.get(), t)) == accumulator.accept(r, t)

结语

Stream 提供了非常方便的并行编程API,但它还是存在很多问题,非常容易踩坑。

其中,最为人诟病的是它的不可控性。因为 Parallel Stream 的底层是基于 ForkJoinPool ,而 ForkJoinPool 的工作线程数是在虚拟机启动时指定的,如果 Stream 并行执行的任务数量过多或耗时过多,甚至会影响应用程序中其它使用 ForkJoinPool 的功能。

总的来说,除非你非常了解你正在做的事情,否则不要使用 Stream 的并行编程API 。取而代之,我们可以直接使用Java中多线程技术(例如线程池)来处理。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwzpxw.html