对encounter order的有序性和无序性,示例代码如下:
public static void unorderedExample() { // 我们用 TreeMap 来做实验,因为 ArrayList 的特殊性,很难展示 unordered 的特性 // TreeSet 中的元素是按从小到大排序的,即 [-7, -3, 1, 5, 12] TreeSet<Integer> set = new TreeSet<>(Arrays.asList(1, 12, 5, -7, -3)); // 按 encounter order 打印 set,输出为:-7, -3, 1, 5, 12 System.out.println("The encounter order of set: "); set.stream().forEachOrdered(s -> System.out.print(s + " ")); System.out.println(); // TreeSet 是有序的,所以来自 TreeSet 的 Stream 也是有序的 // 当 Stream 是有序时,执行操作 limit(2) ,不管是串行还是并行,也不管执行多少次,结果都是前两位数字 [-7, -3] System.out.println("Limit ordered Stream: "); set.stream().parallel().limit(2).forEachOrdered(s -> System.out.print(s + " ")); System.out.println(); // 我们使用 unordered 方法将 Stream 转换为无序的。 // 当 Stream 是无序时,并行执行操作 limit(2) ,会发现执行多次时,输出的数字是不一样的(不确定性) System.out.println("Limit unordered Stream: "); System.out.print("first time: "); set.stream().unordered().parallel().limit(2).forEachOrdered(s -> System.out.print(s + " ")); System.out.println(); System.out.print("second time: "); set.stream().unordered().parallel().limit(2).forEachOrdered(s -> System.out.print(s + " ")); System.out.println(); }上述示例代码的输出类似:
The encounter order of set: -7 -3 1 5 12 Limit ordered Stream: -7 -3 Limit unordered Stream: first time: -3 5 second time: 5 12大家可以仔细体会。欢迎加群讨论!!!
纯函数操作回顾本系列文章的第一篇,纯函数(purely function)指的是它不会改变函数以外的其它状态,换而言之,即不会改变在该函数之外定义的变量值。纯函数不会导致“副作用(side-effects)。
在Stream的并行编程中,纯函数操作非常关键,否则我们依然需要考虑线程安全的问题。
举例说明:
public static void unsafeParallelOperation() { List<String> provinces = Arrays.asList("Guangdong", "Jiangsu", "Guangxi", "Jiangxi", "Shandong"); // "副作用" 导致的线程不安全问题 ArrayList<String> results = new ArrayList<>(); provinces.parallelStream() // 过滤掉以 G 开头的省份 .filter(s -> !s.startsWith("G")) // 在 lambda表达式中修改了 results 的值, // 说明了 "s -> results.add(s)" 并非一个纯函数, // 带来了不必要的 "副作用", // 在并行执行时,会导致线程不安全的问题。 .forEach(s -> results.add(s)); System.out.println(results); }上述示例代码存在线程不安全的问题 —— 多个线程会同时修改 ArrayList 类型的 results ,我们需要对 results 变量加锁。
正确的做法是:
public static void safeParallelOperation() { List<String> provinces = Arrays.asList("Guangdong", "Jiangsu", "Guangxi", "Jiangxi", "Shandong"); List<String> results = provinces.parallelStream() // 过滤掉以 G 开头的省份 .filter(s -> !s.startsWith("G")) // 没有 "副作用" .collect(Collectors.toList()); System.out.println(results); }通过内置的 Collectors.toList() 方法,就不存在“副作用”,从而也无需考虑线程安全问题。
Collectors与ConcurrentMap回顾一下,在介绍Stream的规约方法 Stream.collect(Collector) 时,我们提到了一个需求场景:将员工按照部门分组。
并行执行的实现代码类似:
public static void groupEmployeesToMap() { List<Employee> employees = Utils.makeEmployees(); Map<String, List<Employee>> map = employees.parallelStream() .collect(Collectors.groupingBy(Employee::getDepartment)); System.out.println(map); }虽然上述代码可以实现功能,但性能可能并不尽如人意,因为在并行执行时,需要将多个中间结果汇总为最终的结果,但合并两个Map,性能损耗可能非常大(例如HashMap,底层是数组+红黑树实现的,合并时复杂度不低)。
自然而然,聪明的Java程序员会想到:如果并行执行得到的中间结果和最终结果都是使用同一个Map实例,那就不需要合并两个Map了,当然,因为并行执行涉及到多线程,因此,这个Map实例要求是线程安全的。典型的线程安全的Map,当然首选ConcurrentHashMap 啦。
这就是Collectors工具类中与ConcurrentMap相关的方法的实现原理,主要包括:
1. toConcurrentMap 系列方法
2. groupingByConcurrent 系列方法