如果不加CONCURRENT,还是可以操作并行流。但是操作的不是一个结果容器,而是多个结果容器。则需要调用finisher.
如果加了CONCURRENT,则是多个线程操作同一结果容器。 则无需调用finisher.
超线程(HT, Hyper-Threading)是英特尔研发的一种技术,于2002年发布。超线程技术原先只应用于Xeon 处理器中,当时称为“Super-Threading”。之后陆续应用在Pentium 4 HT中。早期代号为Jackson。 [1]
通过此技术,英特尔实现在一个实体CPU中,提供两个逻辑线程。之后的Pentium D纵使不支持超线程技术,但就集成了两个实体核心,所以仍会见到两个线程。超线程的未来发展,是提升处理器的逻辑线程。英特尔于2016年发布的Core i7-6950X便是将10核心的处理器,加上超线程技术,使之成为20个逻辑线程的产品
收集器总结:Collectors类中方法的实现练习。收集器总是有中间的容器。有必要的总结一下收集器中的方法。
当你具备一些前提的东西之后,你再去看难的东西就会觉得理所当然的。
对于Collectors静态工厂类来说,实现一共分为两种情况:
通过CollectorImpl来实现。
通过reducing方法来实现;reducing方法本身又是通过CollectorImpl实现的。
总的来说,都是通过CollectorImpl来实现的。
1. toCollection(collectionFactory) 。 将集合转成指定的集合。 public static <T, C extends Collection<T>> Collector<T, ?, C> toCollection(Supplier<C> collectionFactory) { return new CollectorImpl<>(collectionFactory, Collection<T>::add, (r1, r2) -> { r1.addAll(r2); return r1; }, CH_ID); } 2. toList()是 toCollection()方法的一种具体实现。 public static <T> Collector<T, ?, List<T>> toList() { return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new, List::add, (left, right) -> { left.addAll(right); return left; }, CH_ID); } 3. toSet() 是toCollection()方法的一种具体实现。 public static <T> Collector<T, ?, Set<T>> toSet() { return new CollectorImpl<>((Supplier<Set<T>>) HashSet::new, Set::add, (left, right) -> { left.addAll(right); return left; }, CH_UNORDERED_ID); } 4. joining(); 融合成一个字符串。还有两个重载的,单参数的和多参数的 public static Collector<CharSequence, ?, String> joining() { return new CollectorImpl<CharSequence, StringBuilder, String>( StringBuilder::new, StringBuilder::append, (r1, r2) -> { r1.append(r2); return r1; }, StringBuilder::toString, CH_NOID); } public static Collector<CharSequence, ?, String> joining(CharSequence delimiter) { return joining(delimiter, "", ""); } public static Collector<CharSequence, ?, String> joining(CharSequence delimiter, CharSequence prefix, CharSequence suffix) { return new CollectorImpl<>( () -> new StringJoiner(delimiter, prefix, suffix), StringJoiner::add, StringJoiner::merge, StringJoiner::toString, CH_NOID); } 5.mapping(); 将收集器的A 映射成B public static <T, U, A, R> Collector<T, ?, R> mapping(Function<? super T, ? extends U> mapper, Collector<? super U, A, R> downstream) { BiConsumer<A, ? super U> downstreamAccumulator = downstream.accumulator(); return new CollectorImpl<>(downstream.supplier(), (r, t) -> downstreamAccumulator.accept(r, mapper.apply(t)), downstream.combiner(), downstream.finisher(), downstream.characteristics()); } such as : Map<City, Set<String>> lastNamesByCity = people.stream().collect(groupingBy(Person::getCity, mapping(Person::getLastName, toSet()))); 6.collectingAndThen(); 收集处理转换完后, 再去进行一个转换。 public static<T,A,R,RR> Collector<T,A,RR> collectingAndThen(Collector<T,A,R> downstream, Function<R,RR> finisher) { Set<Collector.Characteristics> characteristics = downstream.characteristics(); if (characteristics.contains(Collector.Characteristics.IDENTITY_FINISH)) { if (characteristics.size() == 1) characteristics = Collectors.CH_NOID; else { characteristics = EnumSet.copyOf(characteristics); characteristics.remove(Collector.Characteristics.IDENTITY_FINISH); // 这个地方为什么要把IDENTITY_FINISH 去掉。 // 如果不去掉的话, 最终结果直接返回中间结果的类型 characteristics = Collections.unmodifiableSet(characteristics); } } return new CollectorImpl<>(downstream.supplier(), downstream.accumulator(), downstream.combiner(), downstream.finisher().andThen(finisher), characteristics); } such as : List<String> people = people.stream().collect(collectingAndThen(toList(),Collections::unmodifiableList)); 7. counting(); 计数。 public static <T> Collector<T, ?, Long> counting() { return reducing(0L, e -> 1L, Long::sum); } 8. 最大值最小值 public static <T> Collector<T, ?, Optional<T>> minBy(Comparator<? super T> comparator) { return reducing(BinaryOperator.minBy(comparator)); } public static <T> Collector<T, ?, Optional<T>> maxBy(Comparator<? super T> comparator) { return reducing(BinaryOperator.maxBy(comparator)); } 9. summingInt();求和。 public static <T> Collector<T, ?, Integer> summingInt(ToIntFunction<? super T> mapper) { return new CollectorImpl<>( () -> new int[1], // 这个地方为什么不可以用一个0,来当做中间类型呢?数字本身是一个值类型的,不可变的,没法引用。数组本身是一个引用类型,可以进行传递。数组本身是一个容器。 (a, t) -> { a[0] += mapper.applyAsInt(t); }, (a, b) -> { a[0] += b[0]; return a; }, a -> a[0], CH_NOID); } public static <T> Collector<T, ?, Long> summingLong(ToLongFunction<? super T> mapper) { return new CollectorImpl<>( () -> new long[1], (a, t) -> { a[0] += mapper.applyAsLong(t); }, (a, b) -> { a[0] += b[0]; return a; }, a -> a[0], CH_NOID); } public static <T> Collector<T, ?, Double> summingDouble(ToDoubleFunction<? super T> mapper) { /* * In the arrays allocated for the collect operation, index 0 * holds the high-order bits of the running sum, index 1 holds * the low-order bits of the sum computed via compensated * summation, and index 2 holds the simple sum used to compute * the proper result if the stream contains infinite values of * the same sign. */ return new CollectorImpl<>( () -> new double[3], (a, t) -> { sumWithCompensation(a, mapper.applyAsDouble(t)); a[2] += mapper.applyAsDouble(t);}, (a, b) -> { sumWithCompensation(a, b[0]); a[2] += b[2]; return sumWithCompensation(a, b[1]); }, a -> computeFinalSum(a), CH_NOID); } 10. averagingInt(); 求平均值。 public static <T> Collector<T, ?, Double> averagingInt(ToIntFunction<? super T> mapper) { return new CollectorImpl<>( () -> new long[2], (a, t) -> { a[0] += mapper.applyAsInt(t); a[1]++; }, (a, b) -> { a[0] += b[0]; a[1] += b[1]; return a; }, a -> (a[1] == 0) ? 0.0d : (double) a[0] / a[1], CH_NOID); } public static <T> Collector<T, ?, Double> averagingLong(ToLongFunction<? super T> mapper) { return new CollectorImpl<>( () -> new long[2], (a, t) -> { a[0] += mapper.applyAsLong(t); a[1]++; }, (a, b) -> { a[0] += b[0]; a[1] += b[1]; return a; }, a -> (a[1] == 0) ? 0.0d : (double) a[0] / a[1], CH_NOID); } public static <T> Collector<T, ?, Double> averagingDouble(ToDoubleFunction<? super T> mapper) { /* * In the arrays allocated for the collect operation, index 0 * holds the high-order bits of the running sum, index 1 holds * the low-order bits of the sum computed via compensated * summation, and index 2 holds the number of values seen. */ return new CollectorImpl<>( () -> new double[4], (a, t) -> { sumWithCompensation(a, mapper.applyAsDouble(t)); a[2]++; a[3]+= mapper.applyAsDouble(t);}, (a, b) -> { sumWithCompensation(a, b[0]); sumWithCompensation(a, b[1]); a[2] += b[2]; a[3] += b[3]; return a; }, a -> (a[2] == 0) ? 0.0d : (computeFinalSum(a) / a[2]), CH_NOID); } 11. reducing() ; 详解。 public static <T> Collector<T, ?, T> reducing(T identity, BinaryOperator<T> op) { return new CollectorImpl<>( boxSupplier(identity), (a, t) -> { a[0] = op.apply(a[0], t); }, (a, b) -> { a[0] = op.apply(a[0], b[0]); return a; }, a -> a[0], CH_NOID); } 12. groupingBy(); 分组方法详解。 public static <T, K> Collector<T, ?, Map<K, List<T>>> //使用者本身不注重中间类型怎么操作。 groupingBy(Function<? super T, ? extends K> classifier) { return groupingBy(classifier, toList()); //调用两个参数的 groupingBy(); } * @param <T> the type of the input elements //T; 接收的类型。 * @param <K> the type of the keys // K,分类器函数中间返回结果的类型。 * @param <A> the intermediate accumulation type of the downstream collector * @param <D> the result type of the downstream reduction * public static <T, K, A, D> Collector<T, ?, Map<K, D>> groupingBy(Function<? super T, ? extends K> classifier, Collector<? super T, A, D> downstream) { return groupingBy(classifier, HashMap::new, downstream); // 调用三参数的 groupingBy() } //功能最完全的groupingBy(); /** * Returns a {@code Collector} implementing a cascaded "group by" operation * on input elements of type {@code T}, grouping elements according to a * classification function, and then performing a reduction operation on * the values associated with a given key using the specified downstream * {@code Collector}. The {@code Map} produced by the Collector is created * with the supplied factory function. * * <p>The classification function maps elements to some key type {@code K}. * The downstream collector operates on elements of type {@code T} and * produces a result of type {@code D}. The resulting collector produces a * {@code Map<K, D>}. * * <p>For example, to compute the set of last names of people in each city, * where the city names are sorted: * <pre>{@code * Map<City, Set<String>> namesByCity * = people.stream().collect(groupingBy(Person::getCity, TreeMap::new, * mapping(Person::getLastName, toSet()))); * }</pre> * * @implNote * The returned {@code Collector} is not concurrent. For parallel stream * pipelines, the {@code combiner} function operates by merging the keys * from one map into another, which can be an expensive operation. If * preservation of the order in which elements are presented to the downstream * collector is not required, using {@link #groupingByConcurrent(Function, Supplier, Collector)} * may offer better parallel performance. * 返回的 并不是并发的。如果顺序并不是很重要的话, 推荐使用groupingByConcurrent(); 并发的分组函数。 * @param <T> the type of the input elements * @param <K> the type of the keys * @param <A> the intermediate accumulation type of the downstream collector * @param <D> the result type of the downstream reduction * @param <M> the type of the resulting {@code Map} * @param classifier a classifier function mapping input elements to keys * @param downstream a {@code Collector} implementing the downstream reduction * @param mapFactory a function which, when called, produces a new empty * {@code Map} of the desired type * @return a {@code Collector} implementing the cascaded group-by operation * * @see #groupingBy(Function, Collector) * @see #groupingBy(Function) * @see #groupingByConcurrent(Function, Supplier, Collector) */ public static <T, K, D, A, M extends Map<K, D>> Collector<T, ?, M> groupingBy(Function<? super T, ? extends K> classifier, Supplier<M> mapFactory, Collector<? super T, A, D> downstream) { Supplier<A> downstreamSupplier = downstream.supplier(); BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator(); BiConsumer<Map<K, A>, T> accumulator = (m, t) -> { K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key"); A container = m.computeIfAbsent(key, k -> downstreamSupplier.get()); downstreamAccumulator.accept(container, t); }; BinaryOperator<Map<K, A>> merger = Collectors.<K, A, Map<K, A>>mapMerger(downstream.combiner()); //接收两个参数,参会一个结果。 @SuppressWarnings("unchecked") Supplier<Map<K, A>> mangledFactory = (Supplier<Map<K, A>>) mapFactory; // 进行一个强制的类型转换。 if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) { //如果 IDENTITY_FINISH , 则不用调用finisher方法。 return new CollectorImpl<>(mangledFactory, accumulator, merger, CH_ID); } else { @SuppressWarnings("unchecked") Function<A, A> downstreamFinisher = (Function<A, A>) downstream.finisher(); Function<Map<K, A>, M> finisher = intermediate -> { intermediate.replaceAll((k, v) -> downstreamFinisher.apply(v)); @SuppressWarnings("unchecked") M castResult = (M) intermediate; return castResult; }; return new CollectorImpl<>(mangledFactory, accumulator, merger, finisher, CH_NOID); } } 13. groupingByConcurrent(); 并发的分组方法。 使用前提是对数据里边的顺序没有要求。 /** * Returns a concurrent {@code Collector} implementing a cascaded "group by" * operation on input elements of type {@code T}, grouping elements * according to a classification function, and then performing a reduction * operation on the values associated with a given key using the specified * downstream {@code Collector}. */ // ConcurrentHashMap 是一个支持并发的Map public static <T, K> Collector<T, ?, ConcurrentMap<K, List<T>>> groupingByConcurrent(Function<? super T, ? extends K> classifier) { return groupingByConcurrent(classifier, ConcurrentHashMap::new, toList()); } public static <T, K, A, D> Collector<T, ?, ConcurrentMap<K, D>> groupingByConcurrent(Function<? super T, ? extends K> classifier, Collector<? super T, A, D> downstream) { return groupingByConcurrent(classifier, ConcurrentHashMap::new, downstream); } public static <T, K, A, D, M extends ConcurrentMap<K, D>> Collector<T, ?, M> groupingByConcurrent(Function<? super T, ? extends K> classifier, Supplier<M> mapFactory, Collector<? super T, A, D> downstream) { Supplier<A> downstreamSupplier = downstream.supplier(); BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator(); BinaryOperator<ConcurrentMap<K, A>> merger = Collectors.<K, A, ConcurrentMap<K, A>>mapMerger(downstream.combiner()); @SuppressWarnings("unchecked") Supplier<ConcurrentMap<K, A>> mangledFactory = (Supplier<ConcurrentMap<K, A>>) mapFactory; BiConsumer<ConcurrentMap<K, A>, T> accumulator; if (downstream.characteristics().contains(Collector.Characteristics.CONCURRENT)) { accumulator = (m, t) -> { K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key"); A resultContainer = m.computeIfAbsent(key, k -> downstreamSupplier.get()); downstreamAccumulator.accept(resultContainer, t); }; } else { accumulator = (m, t) -> { K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key"); A resultContainer = m.computeIfAbsent(key, k -> downstreamSupplier.get()); synchronized (resultContainer) { // 这里有一个同步的操作。虽然是多线程操作同一容器,但是同时还是只有一个线程操作,进行了同步。 downstreamAccumulator.accept(resultContainer, t); } }; } if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) { return new CollectorImpl<>(mangledFactory, accumulator, merger, CH_CONCURRENT_ID); } else { @SuppressWarnings("unchecked") Function<A, A> downstreamFinisher = (Function<A, A>) downstream.finisher(); Function<ConcurrentMap<K, A>, M> finisher = intermediate -> { intermediate.replaceAll((k, v) -> downstreamFinisher.apply(v)); @SuppressWarnings("unchecked") M castResult = (M) intermediate; return castResult; }; return new CollectorImpl<>(mangledFactory, accumulator, merger, finisher, CH_CONCURRENT_NOID); } } 14. partitioningBy(); 分区详解。 public static <T> Collector<T, ?, Map<Boolean, List<T>>> partitioningBy(Predicate<? super T> predicate) { return partitioningBy(predicate, toList()); } public static <T, D, A> Collector<T, ?, Map<Boolean, D>> partitioningBy(Predicate<? super T> predicate, Collector<? super T, A, D> downstream) { BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator(); BiConsumer<Partition<A>, T> accumulator = (result, t) -> downstreamAccumulator.accept(predicate.test(t) ? result.forTrue : result.forFalse, t); BinaryOperator<A> op = downstream.combiner(); BinaryOperator<Partition<A>> merger = (left, right) -> new Partition<>(op.apply(left.forTrue, right.forTrue), op.apply(left.forFalse, right.forFalse)); Supplier<Partition<A>> supplier = () -> new Partition<>(downstream.supplier().get(), downstream.supplier().get()); if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) { return new CollectorImpl<>(supplier, accumulator, merger, CH_ID); } else { Function<Partition<A>, Map<Boolean, D>> finisher = par -> new Partition<>(downstream.finisher().apply(par.forTrue), downstream.finisher().apply(par.forFalse)); return new CollectorImpl<>(supplier, accumulator, merger, finisher, CH_NOID); } }jdk的代码,就是我们学习的范本。
讲这么细的原因并不是因为要自己去写,是为了了解内部是具体怎么实现的。调用的时候就信心非常的足。
附一个小插曲。