同一性:对于任何一条并行线路来说 ,需要满足a == combiner.apply(a, supplier.get())。举例来说:
(List list1,List list2 -> {list1.addAll(list2);return list1})
结合性: 下方有举例。
Collector收集器的实现源码详解
/**
* A <a href="package-summary.html#Reduction">mutable reduction operation</a> that
* accumulates input elements into a mutable result container, optionally transforming
* the accumulated result into a final representation after all input elements
* have been processed. Reduction operations can be performed either sequentially
* or in parallel.
Collector作为一个接口。它是一个可变的汇聚操作,将输入元素累计到一个可变的结果容器中;它会在所有元素都处理 完毕后将累计的结果作为一个最终的表示(这是一个可选操作);它支持串行与并行两种方式执行。(并不是说并行一定比串行快。)
* <p>Examples of mutable reduction operations include:
* accumulating elements into a {@code Collection}; concatenating
* strings using a {@code StringBuilder}; computing summary information about
* elements such as sum, min, max, or average; computing "pivot table" summaries
* such as "maximum valued transaction by seller", etc. The class {@link Collectors}
* provides implementations of many common mutable reductions.
Collects本身提供了关于Collectoe的常见汇聚实现,Collectors本身实际上是一个工厂。
* <p>A {@code Collector} is specified by four functions that work together to
* accumulate entries into a mutable result container, and optionally perform
* a final transform on the result. They are: <ul>
*
<li>creation of a new result container ({@link #supplier()})</li>
*
<li>incorporating a new data element into a result container ({@link #accumulator()})</li>
*
<li>combining two result containers into one ({@link #combiner()})</li>
*
<li>performing an optional final transform on the container ({@link #finisher()})</li>
* </ul>
Collector 包含了4个参数
* <p>Collectors also have a set of characteristics, such as
* {@link Characteristics#CONCURRENT}, that provide hints that can be used by a
* reduction implementation to provide better performance.
*
* <p>A sequential implementation of a reduction using a collector would
* create a single result container using the supplier function, and invoke the
* accumulator function once for each input element. A parallel implementation
* would partition the input, create a result container for each partition,
* accumulate the contents of each partition into a subresult for that partition,
* and then use the combiner function to merge the subresults into a combined
* result.
举例说明:
1,2, 3, 4
四个部分结果。
1,2 -》 5
5,3 -》 6
6,4 -》 6
### 同一性和结合性的解析:
* <p>To ensure that sequential and parallel executions produce equivalent
* results, the collector functions must satisfy an <em>identity</em> and an
* <a href="package-summary.html#Associativity">associativity</a> constraints.
为了确保串行和并行的结果一致,需要进行额外的处理。必须要满足两个约束。
identity 同一性
associativity 结合性
* <p>The identity constraint says that for any partially accumulated result,
* combining it with an empty result container must produce an equivalent
* result. That is, for a partially accumulated result {@code a} that is the
* result of any series of accumulator and combiner invocations, {@code a} must
* be equivalent to {@code combiner.apply(a, supplier.get())}.
同一性: 对于任何一条并行线路来说,需要满足a == combiner.apply(a, supplier.get())
* <p>The associativity constraint says that splitting the computation must
* produce an equivalent result. That is, for any input elements {@code t1}
* and {@code t2}, the results {@code r1} and {@code r2} in the computation
* below must be equivalent:
* <pre>{@code
*
A a1 = supplier.get();
串行:
*
accumulator.accept(a1, t1); 第一个参数,每次累加的中间结果。 第二个参数,下一个要处理的参数
*
accumulator.accept(a1, t2);
*
R r1 = finisher.apply(a1); // result without splitting
*
*
A a2 = supplier.get();
并行:
*
accumulator.accept(a2, t1); 第一个参数,每次累加的中间结果。 第二个参数,下一个要处理的参数
*
A a3 = supplier.get();
*
accumulator.accept(a3, t2);
*
R r2 = finisher.apply(combiner.apply(a2, a3)); // result with splitting
* } </pre>
结合性: 如上例。 最终要求 r1 == r2
* <p>For collectors that do not have the {@code UNORDERED} characteristic,
* two accumulated results {@code a1} and {@code a2} are equivalent if
* {@code finisher.apply(a1).equals(finisher.apply(a2))}. For unordered
* collectors, equivalence is relaxed to allow for non-equality related to
* differences in order. (For example, an unordered collector that accumulated
* elements to a {@code List} would consider two lists equivalent if they
* contained the same elements, ignoring order.)
对于无序的收集器来说,等价性就被放松了,会考虑到顺序上的区别对应的不相等性。
两个集合中包含了相同的元素,但是忽略了顺序。这种情况下两个的集合也是等价的。
### collector复合与注意事项:
* <p>Libraries that implement reduction (汇聚) based on {@code Collector}, such as
* {@link Stream#collect(Collector)}, must adhere to the following constraints:
* <ul>
*
<li>The first argument passed to the accumulator function, both
*
arguments passed to the combiner function, and the argument passed to the
*
finisher function must be the result of a previous invocation of the
*
result supplier, accumulator, or combiner functions.</li>
*
<li>The implementation should not do anything with the result of any of
*
the result supplier, accumulator, or combiner functions other than to
*
pass them again to the accumulator, combiner, or finisher functions,
*
or return them to the caller of the reduction operation.</li>
具体的实现来说,不应该对中间返回的结果进行额外的操作。除了最终的返回的结果。
*
<li>If a result is passed to the combiner or finisher
*
function, and the same object is not returned from that function, it is
*
never used again.</li>
如果一个结果被传递给combiner or finisher,但是并没有返回一个你传递的对象,说明你生成了一个新的结果或者创建了新的对象。这个结果就不会再被使用了。
*
<li>Once a result is passed to the combiner or finisher function, it
*
is never passed to the accumulator function again.</li>
一旦一个结果被传递给了 combiner or finisher 函数,他就不会再被传递给了accumulator函数了。
*
<li>For non-concurrent collectors, any result returned from the result
*
supplier, accumulator, or combiner functions must be serially
*
thread-confined. This enables collection to occur in parallel without
*
the {@code Collector} needing to implement any additional synchronization.
*
The reduction implementation must manage that the input is properly
*
partitioned, that partitions are processed in isolation, and combining
*
happens only after accumulation is complete.</li>
线程和线程之间的处理都是独立的,最终结束时再进行合并。
*
<li>For concurrent collectors, an implementation is free to (but not
*
required to) implement reduction concurrently. A concurrent reduction
*
is one where the accumulator function is called concurrently from
*
multiple threads, using the same concurrently-modifiable result container,
*
rather than keeping the result isolated during accumulation.
*
A concurrent reduction should only be applied if the collector has the
*
{@link Characteristics#UNORDERED} characteristics or if the
*
originating data is unordered.</li>
如果不是并发收集器,4个线程会生成4个中间结果。
是并发收集器的话,4个线程会同时调用一个结果容器。
* </ul>
*
* <p>In addition to the predefined implementations in {@link Collectors}, the
* static factory methods {@link #of(Supplier, BiConsumer, BinaryOperator, Characteristics...)}
* can be used to construct collectors. For example, you could create a collector
* that accumulates widgets into a {@code TreeSet} with:
*
* <pre>{@code
*
Collector<Widget, ?, TreeSet<Widget>> intoSet =
*
Collector.of(TreeSet::new, TreeSet::add,
*
(left, right) -> { left.addAll(right); return left; });
* }</pre>
通过Collector.of(传进一个新的要操作的元素,结果容器处理的步骤,多线程处理的操作)
将流中的每个Widget 添加到TreeSet中
* (This behavior is also implemented by the predefined collector
* {@link Collectors#toCollection(Supplier)}).
*
* @apiNote
* Performing a reduction operation with a {@code Collector} should produce a
* result equivalent to:
* <pre>{@code
*
R container = collector.supplier().get();
*
for (T t : data)
*
collector.accumulator().accept(container, t);
*
return collector.finisher().apply(container);
* }</pre>
api的说明: collector的finisher汇聚的实现过程。
* <p>However, the library is free to partition the input, perform the reduction
* on the partitions, and then use the combiner function to combine the partial
* results to achieve a parallel reduction. (Depending on the specific reduction
* operation, this may perform better or worse, depending on the relative cost
* of the accumulator and combiner functions.)
性能取决于accumulator and combiner的代价。 也就是说 并行流 并不一定比串行流效率高。
* <p>Collectors are designed to be <em>composed</em>; many of the methods
* in {@link Collectors} are functions that take a collector and produce
* a new collector. For example, given the following collector that computes
* the sum of the salaries of a stream of employees:
* <pre>{@code
*
Collector<Employee, ?, Integer> summingSalaries
*
= Collectors.summingInt(Employee::getSalary))
* }</pre>
搜集器是可以组合的: take a collector and produce a new collector.
搜集器的实现过程。 如 员工的工资的求和。
* If we wanted to create a collector to tabulate the sum of salaries by
* department, we could reuse the "sum of salaries" logic using
* {@link Collectors#groupingBy(Function, Collector)}:
* <pre>{@code
*
Collector<Employee, ?, Map<Department, Integer>> summingSalariesByDept
*
= Collectors.groupingBy(Employee::getDepartment, summingSalaries);
* }</pre>
如果我们想要新建一个搜集器,我们可以复用之前的搜集器。
实现过程。
* @see Stream#collect(Collector)
* @see Collectors
*
* @param <T> the type of input elements to the reduction operation
<T> 代表 流中的每一个元素的类型。
* @param <A> the mutable accumulation type of the reduction operation (often
*
hidden as an implementation detail)
<A> 代表 reduction操作的可变容器的类型。表示中间操作生成的结果的类型(如ArrayList)。
* @param <R> the result type of the reduction operation
<R> 代表 结果类型
* @since 1.8
*/
public interface Collector<T, A, R>{
/**
* A function that creates and returns a new mutable result container.
* A就代表每一次返回结果的类型
* @return a function which returns a new, mutable result container
*/
Supplier<A> supplier(); // 提供一个结果容器
/**
* A function that folds a value into a mutable result container.
* A代表中间操作返回结果的类型。 T是下一个代操作的元素的类型。
* @return a function which folds a value into a mutable result container
*/
BiConsumer<A, T> accumulator(); //不断的向结果容器中添加元素。
/**
* A function that accepts two partial results and merges them. The
* combiner function may fold state from one argument into the other and
* return that, or may return a new result container.
* A 中间操作返回结果的类型。
* @return a function which combines two partial results into a combined
* result
*/
BinaryOperator<A> combiner(); //在多线程中 合并 部分结果。
/**
和并行流紧密相关的
接收两个结果,将两个部分结果合并到一起。
combiner函数,有4个线程同时去执行,那么就会有生成4个部分结果。
举例说明:
1,2, 3, 4
四个部分结果。
1,2 -》 5
5,3 -》 6
6,4 -》 6
1,2合并返回5 属于return a new result container.
6,4合并返回6,属于The combiner function may fold state from one argument into the other and return that。
*/
/**
* Perform the final transformation from the intermediate accumulation type
* {@code A} to the final result type {@code R}.
*R 是最终返回结果的类型。
* <p>If the characteristic {@code IDENTITY_TRANSFORM} is
* set, this function may be presumed to be an identity transform with an
* unchecked cast from {@code A} to {@code R}.
*
* @return a function which transforms the intermediate result to the final
* result
*/
Function<A, R> finisher(); // 合并中间的值,给出返回值。
/**
* Returns a {@code Set} of {@code Collector.Characteristics} indicating
* the characteristics of this Collector. This set should be immutable.
*
* @return an immutable set of collector characteristics
*/
Set<Characteristics> characteristics(); //特征的集合
/**
* Returns a new {@code Collector} described by the given {@code supplier},
* {@code accumulator}, and {@code combiner} functions. The resulting
* {@code Collector} has the {@code Collector.Characteristics.IDENTITY_FINISH}
* characteristic.
*
* @param supplier The supplier function for the new collector
* @param accumulator The accumulator function for the new collector
* @param combiner The combiner function for the new collector
* @param characteristics The collector characteristics for the new
*
collector
* @param <T> The type of input elements for the new collector
* @param <R> The type of intermediate accumulation result, and final result,
*
for the new collector
* @throws NullPointerException if any argument is null
* @return the new {@code Collector}
*/
public static<T, R> Collector<T, R, R> of(Supplier<R> supplier,
BiConsumer<R, T> accumulator,
BinaryOperator<R> combiner,
Characteristics... characteristics) {
Objects.requireNonNull(supplier);
Objects.requireNonNull(accumulator);
Objects.requireNonNull(combiner);
Objects.requireNonNull(characteristics);
Set<Characteristics> cs = (characteristics.length == 0)
? Collectors.CH_ID
: Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH,
characteristics));
return new Collectors.CollectorImpl<>(supplier, accumulator, combiner, cs);
}
/**
* Returns a new {@code Collector} described by the given {@code supplier},
* {@code accumulator}, {@code combiner}, and {@code finisher} functions.
*
* @param supplier The supplier function for the new collector
* @param accumulator The accumulator function for the new collector
* @param combiner The combiner function for the new collector
* @param finisher The finisher function for the new collector
* @param characteristics The collector characteristics for the new
*
collector
* @param <T> The type of input elements for the new collector
* @param <A> The intermediate accumulation type of the new collector
* @param <R> The final result type of the new collector
* @throws NullPointerException if any argument is null
* @return the new {@code Collector}
*/
public static<T, A, R> Collector<T, A, R> of(Supplier<A> supplier,
BiConsumer<A, T> accumulator,
BinaryOperator<A> combiner,
Function<A, R> finisher,
Characteristics... characteristics) {
Objects.requireNonNull(supplier);
Objects.requireNonNull(accumulator);
Objects.requireNonNull(combiner);
Objects.requireNonNull(finisher);
Objects.requireNonNull(characteristics);
Set<Characteristics> cs = Collectors.CH_NOID;
if (characteristics.length > 0) {
cs = EnumSet.noneOf(Characteristics.class);
Collections.addAll(cs, characteristics);
cs = Collections.unmodifiableSet(cs);
}
return new Collectors.CollectorImpl<>(supplier, accumulator, combiner, finisher, cs);
}
/**
* Characteristics indicating properties of a {@code Collector}, which can
* be used to optimize reduction implementations.
*/
enum Characteristics { // 特征
/**
* Indicates that this collector is <em>concurrent</em>, meaning that
* the result container can support the accumulator function being
* called concurrently with the same result container from multiple
* threads.
* 并发的,同一个结果容器可以由多个线程同时调用。
* <p>If a {@code CONCURRENT} collector is not also {@code UNORDERED},
* then it should only be evaluated concurrently if applied to an
* unordered data source.
如果不是UNORDERED。只能用于无序的数据源。
如果不加CONCURRENT,还是可以操作并行流。但是操作的不是一个结果容器,而是多个结果容器。则需要调用finisher.
如果加了CONCURRENT,则是多个线程操作同一结果容器。 则无需调用finisher.
*/
CONCURRENT,
/**
* Indicates that the collection operation does not commit to preserving
* the encounter order of input elements. (This might be true if the
* result container has no intrinsic order, such as a {@link Set}.)
收集操作并不保留顺序。
*/
UNORDERED,
/**
* Indicates that the finisher function is the identity function and
* can be elided. If set, it must be the case that an unchecked cast
* from A to R will succeed.
如果用和这个参数,表示 Finish函数就是 identity函数。 并且转换一定要是成功的。
*/
IDENTITY_FINISH
}
}
Java8(4)(五)收集器比较器用法详解及源码剖析
收集器用法详解与多级分组和分区
为什么在collectors类中定义一个静态内部类?
static class CollectorImpl<T, A, R> implements Collector<T, A, R>
设计上,本身就是一个辅助类,是一个工厂。作用是给开发者提供常见的收集器实现。提供的方法都是静态方法,可以直接调用。