接下来跟源码,看一下程序的调用过程。
@Override @SuppressWarnings("unchecked") public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) { A container; if (isParallel() && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT)) && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) { container = collector.supplier().get(); BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator(); forEach(u -> accumulator.accept(container, u)); } else { container = evaluate(ReduceOps.makeRef(collector)); } return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH) ? (R) container : collector.finisher().apply(container); } 自定义收集器的深度剖析与并行缺陷 // 举例: 需求:将一个Set,进行一个收集.对结果进行增强,封装在一个map当中. // 输入:Set<String> // 输出:Map<String,String> // 示例输入: [hello,world,hello world] // 示例输出: {[hello,hello],[world,world],[hello world,hello world]} public class MySetCollector2<T> implements Collector<T, Set<T>, Map<T, T>> { @Override public Supplier<Set<T>> supplier() { System.out.println("supplier invoked!"); return HashSet::new; } @Override public BiConsumer<Set<T>, T> accumulator() { System.out.println("accumlator invoked!"); return (set, item) -> { set.add(item); //每次调用 打印出线程 这里会打印6次, System.out.println("accunlator : " +set+ ", "+ Thread.currentThread().getName()); //出现异常的原因在这里: // 一个线程去修改一个集合,同时另外一个线程去迭代它(遍历它)。程序就会抛出并发修改异常。如果是并行操作的话,就不要在操作中额外的添加操作。添加就添加,别再去打印他。 }; } @Override public BinaryOperator<Set<T>> combiner() { System.out.println("combiner invoked!"); //并行流的时候才会被调用. 将并行流的多个结果给合并起来 return (set1, set2) -> { set1.addAll(set2); return set2; }; } @Override public Function<Set<T>, Map<T, T>> finisher() { System.out.println("finisher invoked!"); // 中间类型和最终类型 一样,这个是不会被调用的. //这里不一样 . 会进行调用 return set -> { Map<T, T> map = new HashMap<>(); // Map<T, T> map = new TreeMap<>(); 直接返回一个排序的Map set.forEach(item -> map.put(item,item)); return map; }; } @Override public Set<Characteristics> characteristics() { System.out.println(" characteristics invoked"); return Collections.unmodifiableSet(EnumSet.of(Characteristics.UNORDERED));// 这个参数不能乱写. 要理解每个枚举的具体意思. // return Collections.unmodifiableSet(EnumSet.of(Characteristics.UNORDERED,Characteristics.CONCURRENT));// 这个参数不能乱写. 要理解每个枚举的具体意思. //加了这个参数 Characteristics.CONCURRENT // 会出异常, 会正常运行. Caused by: java.util.ConcurrentModificationException // return Collections.unmodifiableSet(EnumSet.of(Characteristics.UNORDERED,Characteristics.IDENTITY_FINISH)); // 加了参数Characteristics.IDENTITY_FINISH . 会报错 // Process 'command '/Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home/bin/java'' finished with non-zero exit value 1 // IDENTITY_FINISH 实际的含义: 如果用和这个参数,表示 Finish函数就是 identity函数。 并且转换一定要是成功的。失败的话会抛异常. // 这个收集器具有什么特性 ,由Characteristics 来定义. 就算你赋值的不实际,他也照样执行. } public static void main(String[] args) { List<String> list = Arrays.asList("hello","hello", "world", "helloworld","1","4","j"); Set<String> set = new HashSet<>(list); System.out.println("set"+set); // Map<String, String> collect = set.stream().collect(new MySetCollector2<>()); Map<String, String> collect = set.parallelStream().collect(new MySetCollector2<>()); //并行流 System.out.println(collect); } } 并行流缺陷详解 并行: accumlator invoked! accunlator : [j], main accunlator : [j, hello], main accunlator : [helloworld, 4, j, hello], ForkJoinPool.commonPool-worker-2 accunlator : [helloworld, 1, 4, j, hello], ForkJoinPool.commonPool-worker-2 accunlator : [helloworld, 1, world, 4, j, hello], ForkJoinPool.commonPool-worker-2 串行。 accunlator : [j], main accunlator : [helloworld], ForkJoinPool.commonPool-worker-11 accunlator : [helloworld, 1], ForkJoinPool.commonPool-worker-11 accunlator : [helloworld, 1, world], ForkJoinPool.commonPool-worker-11 accunlator : [4], ForkJoinPool.commonPool-worker-9 accunlator : [j, hello], main /** * Characteristics indicating properties of a {@code Collector}, which can * be used to optimize reduction implementations. */ enum Characteristics { // 特征 /** * Indicates that this collector is <em>concurrent</em>, meaning that * the result container can support the accumulator function being * called concurrently with the same result container from multiple * threads. * 并发的,同一个结果容器可以由多个线程同时调用。 * <p>If a {@code CONCURRENT} collector is not also {@code UNORDERED}, * then it should only be evaluated concurrently if applied to an * unordered data source. 如果不是UNORDERED。只能用于无序的数据源。 如果不加CONCURRENT,还是可以操作并行流。但是操作的不是一个结果容器,而是多个结果容器。则需要调用finisher. 如果加了CONCURRENT,则是多个线程操作同一结果容器。 则无需调用finisher. */ CONCURRENT, /** * Indicates that the collection operation does not commit to preserving * the encounter order of input elements. (This might be true if the * result container has no intrinsic order, such as a {@link Set}.) 收集操作并不保留顺序。无序的。 */ UNORDERED, /** * Indicates that the finisher function is the identity function and * can be elided. If set, it must be the case that an unchecked cast * from A to R will succeed. 如果用和这个参数,表示 Finish函数就是 identity函数。 并且转换一定要是成功的。不会调用Finish方法 */ IDENTITY_FINISH } 出异常的根本原因:一个线程去修改一个集合,同时另外一个线程去迭代它(遍历它)。程序就会抛出并发修改异常。
如果是并行操作的话,就不要在操作中额外的添加操作。添加就添加,别再去打印他。