Most visited

Recently visited

Added in API level 24

java.util.stream

Classes to support functional-style operations on streams of elements, such as map-reduce transformations on collections. For example:
int sum = widgets.stream()
                      .filter(b -> b.getColor() == RED)
                      .mapToInt(b -> b.getWeight())
                      .sum();
 

在这里,我们使用widgets ,一个Collection<Widget> ,作为用于流的源,然后执行在流的过滤器,地图,减少以获得红色小部件的权重的总和。 Summationreduction操作的示例。)

这个包中引入的关键抽象是 StreamIntStreamLongStream ,和DoubleStream超过目的和原始流intlongdouble类型。 流在几个方面与集合有所不同:

Streams can be obtained in a number of ways. Some examples include:

额外的流源可以由第三方库使用 these techniques提供

Stream operations and pipelines

流操作分为中间操作和终端操作,并组合起来形成流管线 流管道由源(例如Collection ,数组,发生器函数或I / O通道)组成; 接着是零个或多个中间操作,例如Stream.filterStream.map ; 以及诸如Stream.forEachStream.reduce的终端操作。

中间操作返回一个新的流。 他们总是懒惰 ; 执行诸如filter()类的中间操作实际上并不执行任何过滤,而是创建一个新的流,该流在遍历时包含与给定谓词相匹配的初始流的元素。 在管道的终端操作被执行之前,流水线源的遍历不会开始。

终端操作(例如Stream.forEachIntStream.sum )可能会遍历流以产生结果或副作用。 终端操作执行后,流管道被视为消耗,不能再使用; 如果您需要再次遍历相同的数据源,则必须返回到数据源以获取新的流。 在几乎所有情况下,终端操作都非常急切 ,在返回之前完成数据源的遍历和管道的处理。 只有终端操作iterator()spliterator()不是; 如果现有操作不足以完成任务,则可以将这些作为“逃生舱”提供,以实现任意客户端控制的管道穿越。

缓慢处理流程可实现显着的效率; 在诸如上述filter-map-sum示例的流水线中,可以将过滤,映射和求和融合为数据上的单次通过,并具有最小的中间状态。 懒惰还可以避免在没有必要时检查所有数据; 对于诸如“查找长度超过1000个字符的第一个字符串”等操作,只需检查足够的字符串即可找到具有所需特性的字符串,而无需检查源中可用的所有字符串。 (当输入流是无限的,而不仅仅是大的时候,这种行为变得更加重要。)

中间业务进一步分为无状态有状态操作。 无状态操作(如filtermap在处理新元素时不会保留先前看到的元素的状态 - 每个元素都可以独立于其他元素上的操作进行处理。 有状态的操作,比如distinctsorted ,可以在处理新元素时合并先前看到的元素的状态。

有状态的操作可能需要在生成结果之前处理整个输入。 例如,只有在查看了流的所有元素之后,才能对排序流产生任何结果。 因此,在并行计算中,某些包含有状态中间操作的管道可能需要对数据进行多次传递,或者可能需要缓存重要数据。 只包含无状态中间操作的流水线可以一次处理,无论是顺序处理还是并行处理,只需最少的数据缓冲。

此外,有些操作被认为是短路操作。 如果在呈现无限输入时,中间操作是短路的,则其可能产生有限的流。 终端操作是短路的,如果出现无限输入时,它可能会在有限的时间内终止。 在流水线中进行短路操作是处理无限流在有限时间内正常终止的必要但不充分的条件。

Parallelism

具有显式for-循环的处理元素本质上是串行的。 流通过将计算重新定义为聚合操作的流水线而不是作为每个单独元素的命令操作来促进并行执行。 所有流操作都可以以串行或并行方式执行。 除非明确要求并行性,否则JDK中的流实现会创建串行流。 例如, Collection具有分别产生顺序流和并行流的方法stream()parallelStream() ; 其他流载方法(如range(int, int)产生顺序流,但通过调用它们的parallel()方法可以高效地并行化这些流。 要并行执行之前的“小部件权重总和”查询,我们可以这样做:

int sumOfWeights = widgets.parallelStream().filter(b -> b.getColor() == RED)
                               .mapToInt(b -> b.getWeight())
                               .sum();
 

本例的串行和并行版本之间的唯一区别是创建初始流,使用“ parallelStream() ”而不是“ stream() ”。 当启动终端操作时,根据流调用的流的方向,流管道将按顺序或并行执行。 可以使用isParallel()方法确定串流或并行执行的流是否可以执行,并且可以使用sequential()parallel()操作修改流的方向。 当启动终端操作时,根据流调用的流的模式顺序或并行执行流管道。

除了标识为明确不确定的操作(如 findAny() ,流是顺序执行还是并行执行都不应改变计算结果。

大多数流操作接受描述用户指定行为的参数,这些参数通常是lambda表达式。 为了保持正确的行为,这些行为参数必须是无干扰的 ,并且在大多数情况下必须是无状态的 这些参数总是functional interface的实例,例如Function ,并且通常是lambda表达式或方法引用。

Non-interference

Streams enable you to execute possibly-parallel aggregate operations over a variety of data sources, including even non-thread-safe collections such as ArrayList. This is possible only if we can prevent interference with the data source during the execution of a stream pipeline. Except for the escape-hatch operations iterator() and spliterator(), execution begins when the terminal operation is invoked, and ends when the terminal operation completes. For most data sources, preventing interference means ensuring that the data source is not modified at all during the execution of the stream pipeline. The notable exception to this are streams whose sources are concurrent collections, which are specifically designed to handle concurrent modification. Concurrent stream sources are those whose Spliterator reports the CONCURRENT characteristic.

因此,源流可能不并发的流管道中的行为参数不应该修改流的数据源。 如果行为参数修改或导致修改流的数据源,则说行为参数会干扰非并发数据源。 对所有管道的要求不限于平行管道。 除非流源是并发的,否则在执行流管道期间修改流的数据源可能会导致异常,不正确的答案或不一致的行为。 对于表现良好的流源,可以在终端操作开始之前对源进行修改,这些修改将反映在覆盖元素中。 例如,请考虑以下代码:

List<String> l = new ArrayList(Arrays.asList("one", "two"));
     Stream<String> sl = l.stream();
     l.add("three");
     String s = sl.collect(joining(" "));
 
First a list is created consisting of two strings: "one"; and "two". Then a stream is created from that list. Next the list is modified by adding a third string: "three". Finally the elements of the stream are collected and joined together. Since the list was modified before the terminal collect operation commenced the result will be a string of "one two three". All the streams returned from JDK collections, and most other JDK classes, are well-behaved in this manner; for streams generated by other libraries, see Low-level stream construction for requirements for building well-behaved streams.

Stateless behaviors

Stream pipeline results may be nondeterministic or incorrect if the behavioral parameters to the stream operations are stateful. A stateful lambda (or other object implementing the appropriate functional interface) is one whose result depends on any state which might change during the execution of the stream pipeline. An example of a stateful lambda is the parameter to map() in:
Set<Integer> seen = Collections.synchronizedSet(new HashSet<>());
     stream.parallel().map(e -> { if (seen.add(e)) return 0; else return e; })...
 
Here, if the mapping operation is performed in parallel, the results for the same input could vary from run to run, due to thread scheduling differences, whereas, with a stateless lambda expression the results would always be the same.

还要注意,试图从行为参数访问可变状态会给你提供一个在安全性和性能方面不好的选择; 如果您没有将访问权限同步到该状态,那么您就有数据竞争,因此您的代码已损坏,但如果您确实同步对该状态的访问权限,则可能会导致争用破坏您希望从中受益的并行性。 最好的方法是避免有状态的行为参数来完全流式操作; 通常有一种方法来重构流管道以避免有状态。

Side-effects

Side-effects in behavioral parameters to stream operations are, in general, discouraged, as they can often lead to unwitting violations of the statelessness requirement, as well as other thread-safety hazards.

如果行为参数确实有副作用,除非明确说明,有没有保证,而在visibility的那些副作用给其他线程,也没有任何保证相同的流管道内的“相同”的元素在不同的操作在同一个线程中执行。 此外,这些影响的排序可能令人惊讶。 即使流水线被限制产生与流源的碰撞顺序一致的结果 (例如, IntStream.range(0,5).parallel().map(x -> x*2).toArray()必须产生[0, 2, 4, 6, 8] ),也不能保证映射器函数应用于单个元素的顺序,或者在什么线程中为给定元素执行任何行为参数。

许多人可能会试图使用副作用的计算可以更安全有效地表达,而不会产生副作用,例如使用reduction而不是可变累加器。 但是,诸如使用println()进行调试的副作用通常是无害的。 少量流操作(例如forEach()peek()只能通过副作用进行操作; 这些应该小心使用。

作为如何将不恰当地使用副作用的流管道转换为不支持的流管道的示例,下面的代码搜索匹配给定正则表达式的字符串流,并将匹配放入列表中。

ArrayList<String> results = new ArrayList<>();
     stream.filter(s -> pattern.matcher(s).matches())
           .forEach(s -> results.add(s));  // Unnecessary use of side-effects!
 
This code unnecessarily uses side-effects. If executed in parallel, the non-thread-safety of ArrayList would cause incorrect results, and adding needed synchronization would cause contention, undermining the benefit of parallelism. Furthermore, using side-effects here is completely unnecessary; the forEach() can simply be replaced with a reduction operation that is safer, more efficient, and more amenable to parallelization:
List<String>results =
         stream.filter(s -> pattern.matcher(s).matches())
               .collect(Collectors.toList());  // No side-effects!
 

Ordering

流可能有也可能没有定义的相遇顺序 流是否具有遇到顺序取决于源和中间操作。 某些流源(例如List或阵列)本质上是有序的,而其他(如HashSet )则不是。 某些中间操作(例如sorted() )可能会在其他无序流上施加碰到命令,而其他中间操作可能会呈现无序的有序流,例如unordered() 此外,一些终端操作可能会忽略碰到次序,例如forEach()

如果流是有序的,大多数操作都被限制为按照它们的遇到顺序操作元素; 如果流的源是List含有[1, 2, 3] ,然后执行的结果map(x -> x*2)必须是[2, 4, 6] 但是,如果源没有定义碰到命令,则值[2, 4, 6]任何置换将是有效的结果。

对于顺序流,遇到顺序的存在与否不会影响性能,只会影响确定性。 如果一个流被排序,那么在相同的源上重复执行相同的流管线将产生相同的结果; 如果没有排序,重复执行可能会产生不同的结果。

对于并行流,放宽排序约束有时可以实现更高效的执行。 如果元素的顺序不相关,某些集合操作(例如过滤重复项( distinct() )或分组缩减项( Collectors.groupingBy() ))可以更高效地实现。 类似地,与碰到命令有内在联系的操作(例如limit() )可能需要缓冲来确保正确的排序,从而破坏并行性的好处。 在流有碰到命令的情况下,但用户并不特别关心该碰到命令,显式地用unordered()可以提高一些有状态或终端操作的并行性能。 然而,即使在排序约束下,大多数流管线(例如上面的“块的权重总和”)仍然有效地并行化。

Reduction operations

A reduction operation (also called a fold) takes a sequence of input elements and combines them into a single summary result by repeated application of a combining operation, such as finding the sum or maximum of a set of numbers, or accumulating elements into a list. The streams classes have multiple forms of general reduction operations, called reduce() and collect(), as well as multiple specialized reduction forms such as sum(), max(), or count().

当然,这样的操作可以很容易地实现为简单的顺序循环,如下所示:

int sum = 0;
    for (int x : numbers) {
       sum += x;
    }
 
However, there are good reasons to prefer a reduce operation over a mutative accumulation such as the above. Not only is a reduction "more abstract" -- it operates on the stream as a whole rather than individual elements -- but a properly constructed reduce operation is inherently parallelizable, so long as the function(s) used to process the elements are associative and stateless. For example, given a stream of numbers for which we want to find the sum, we can write:
int sum = numbers.stream().reduce(0, (x,y) -> x+y);
 
or:
int sum = numbers.stream().reduce(0, Integer::sum);
 

这些还原操作可以平行运行,几乎不需要任何修改:

int sum = numbers.parallelStream().reduce(0, Integer::sum);
 

因为实现可以并行操作数据的子集,所以减少并行,然后结合中间结果得到最终的正确答案。 (即使语言具有“并行for-each”构造,变异积累方法仍然需要开发人员为共享累加变量sum提供线程安全更新,并且所需的同步可能会消除并行性带来的任何性能增益。)相反,使用reduce()可以消除并行缩减操作的所有负担,并且该库可以提供高效的并行实现,而无需额外的同步。

前面显示的“小部件”示例显示了减少操作如何与其他操作结合使用以用批量操作替换循环。 如果widgets是包含getWeight方法的Widget对象的集合, Widget我们可以找到最重的小部件:

OptionalInt heaviest = widgets.parallelStream()
                                   .mapToInt(Widget::getWeight)
                                   .max();
 

在其更一般的形式,一个 reduce上类型的元素的操作 <T>得到类型的结果 <U>需要三个参数:

<U> U reduce(U identity,
              BiFunction<U, ? super T, U> accumulator,
              BinaryOperator<U> combiner);
 
Here, the identity element is both an initial seed value for the reduction and a default result if there are no input elements. The accumulator function takes a partial result and the next element, and produces a new partial result. The combiner function combines two partial results to produce a new partial result. (The combiner is necessary in parallel reductions, where the input is partitioned, a partial accumulation computed for each partition, and then the partial results are combined to produce a final result.)

更正式地说, identity值必须是组合函数的标识 这意味着,对于所有ucombiner.apply(identity, u)等于u 此外, combiner功能必须是associative ,并必须与兼容accumulator功能:对所有utcombiner.apply(u, accumulator.apply(identity, t))必须equals()accumulator.apply(u, t)

三参数形式是双参数形式的泛化,将映射步骤合并到累积步骤中。 我们可以使用更一般的形式重新构造简单的权重总和示例,如下所示:

int sumOfWeights = widgets.stream()
                               .reduce(0,
                                       (sum, b) -> sum + b.getWeight())
                                       Integer::sum);
 
though the explicit map-reduce form is more readable and therefore should usually be preferred. The generalized form is provided for cases where significant work can be optimized away by combining mapping and reducing into a single function.

Mutable reduction

A mutable reduction operation accumulates input elements into a mutable result container, such as a Collection or StringBuilder, as it processes the elements in the stream.

如果我们想要获取一串字符串并将它们连接成一个单一的长字符串,我们 可以通过普通的缩减 实现这一点:

String concatenated = strings.reduce("", String::concat)
 

我们会得到理想的结果,甚至可以并行工作。 但是,我们可能不会对表演感到高兴! 这样的实现会进行大量的字符串复制,并且运行时间将会是字符数量的O(n ^ 2) StringBuilder方法是将结果累积到StringBuilder ,这是一个用于累积字符串的可变容器。 我们可以使用相同的技术来平行化可变缩减,就像我们使用普通缩减一样。

可变缩减操作称为collect() ,因为它将所需结果一起收集到结果容器(如Collection collect操作需要三个功能:构造结果容器的新实例的供应者函数,将输入元素并入结果容器的累加器函数以及将一个结果容器的内容合并到另一个结果容器的组合函数。 这种形式与普通缩减的一般形式非常相似:

<R> R collect(Supplier<R> supplier,
               BiConsumer<R, ? super T> accumulator,
               BiConsumer<R, R> combiner);
 

reduce() ,以这种抽象方式表达collect的好处是,它可以直接适用于并行化:只要累积和组合函数满足适当的要求,我们就可以并行累积部分结果并将它们组合起来。 例如,要将流中元素的字符串表示形式收集到ArrayList ,我们可以为每种形式编写明显的顺序:

ArrayList<String> strings = new ArrayList<>();
     for (T element : stream) {
         strings.add(element.toString());
     }
 
Or we could use a parallelizable collect form:
ArrayList<String> strings = stream.collect(() -> new ArrayList<>(),
                                                (c, e) -> c.add(e.toString()),
                                                (c1, c2) -> c1.addAll(c2));
 
or, pulling the mapping operation out of the accumulator function, we could express it more succinctly as:
List<String> strings = stream.map(Object::toString)
                                  .collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
 
Here, our supplier is just the ArrayList constructor, the accumulator adds the stringified element to an ArrayList, and the combiner simply uses addAll to copy the strings from one container into the other.

collect的三个方面 - 供应商,累加器和组合器 - 紧密耦合。 我们可以使用Collector的抽象来捕捉所有三个方面。 以上用于将字符串收集到List可以使用标准Collector重写为:

List<String> strings = stream.map(Object::toString)
                                  .collect(Collectors.toList());
 

将可变减少包装到收集器中具有另一个优点:可组合性。 Collectors包含许多用于收集器的预定义工厂,包括将收集器转换为另一个收集器的组合器。 例如,假设我们有一个收集者计算员工流薪的总和,如下所示:

Collector<Employee, ?, Integer> summingSalaries
         = Collectors.summingInt(Employee::getSalary);
 
(The ? for the second type parameter merely indicates that we don't care about the intermediate representation used by this collector.) If we wanted to create a collector to tabulate the sum of salaries by department, we could reuse summingSalaries using groupingBy:
Map<Department, Integer> salariesByDept
         = employees.stream().collect(Collectors.groupingBy(Employee::getDepartment,
                                                            summingSalaries));
 

与常规还原操作一样,只有在满足适当条件的情况下才能并行执行collect()操作。 对于任何部分累积结果,将其与空结果容器组合必须产生相同的结果。 也就是说,部分累加结果p即任何系列累加器和组合器的调用的结果, p必须等于combiner.apply(p, supplier.get())

此外,然而,计算是分裂的,它必须产生相等的结果。 对于任何输入元素t1t2 ,以下计算中的结果r1r2必须是等效的:

A a1 = supplier.get();
     accumulator.accept(a1, t1);
     accumulator.accept(a1, t2);
     R r1 = finisher.apply(a1);  // result without splitting

     A a2 = supplier.get();
     accumulator.accept(a2, t1);
     A a3 = supplier.get();
     accumulator.accept(a3, t2);
     R r2 = finisher.apply(combiner.apply(a2, a3));  // result with splitting
 

这里,等同通常意味着根据equals(Object) 但在某些情况下,可以放宽等同性来解决秩序的差异。

Reduction, concurrency, and ordering

With some complex reduction operations, for example a collect() that produces a Map, such as:
Map<Buyer, List<Transaction>> salesByBuyer
         = txns.parallelStream()
               .collect(Collectors.groupingBy(Transaction::getBuyer));
 
it may actually be counterproductive to perform the operation in parallel. This is because the combining step (merging one Map into another by key) can be expensive for some Map implementations.

然而,假设在这个缩减中使用的结果容器是可同时修改的集合 - 例如ConcurrentHashMap 在这种情况下,累加器的并行调用实际上可以将它们的结果并发地存放到相同的共享结果容器中,而不需要组合器合并不同的结果容器。 这可能会提高并行执行性能。 我们称之为并发减少。

Collector支持并发还原标有CONCURRENT特性。 然而,并发收集也有不利之处。 如果多个线程同时将结果存放到共享容器中,则结果存放的顺序是非确定性的。 因此,只有在排序对正在处理的流不重要的情况下才能同时进行减少。 collect(Collector)实现将只执行并发减少

You can ensure the stream is unordered by using the unordered() method. For example:
Map<Buyer, List<Transaction>> salesByBuyer
         = txns.parallelStream()
               .unordered()
               .collect(groupingByConcurrent(Transaction::getBuyer));
 
(where groupingByConcurrent(Function ) is the concurrent equivalent of groupingBy).

请注意,如果给定键的元素按其出现在源中的顺序出现很重要,那么我们不能使用并发减少,因为排序是并发插入的一个伤害。 然后,我们将受到限制,要么实施顺序裁减,要么实施基于合并的并行裁减。

Associativity

An operator or function op is associative if the following holds:
(a op b) op c == a op (b op c)
 
The importance of this to parallel evaluation can be seen if we expand this to four terms:
a op b op c op d == (a op b) op (c op d)
 
So we can evaluate (a op b) in parallel with (c op d), and then invoke op on the results.

关联操作的例子包括数字加法,最小值和最大值以及字符串连接。

Low-level stream construction

So far, all the stream examples have used methods like stream() or stream(Object[]) to obtain a stream. How are those stream-bearing methods implemented?

StreamSupport有许多用于创建流的低级方法,全部使用某种形式的Spliterator 分割器是Iterator的并行Iterator ; 它描述了一个(可能是无限的)元素集合,支持顺序地前进,批量遍历,并将一部分输入拆分为另一个可以并行处理的分割器。 在最底层,所有的流都是由分裂者驱动的。

在实现分割器时有许多实现选择,几乎所有这些都是实现的简单性和使用分割器的流的运行时性能之间的折衷。 创建分割器最简单但性能最差的方法是使用spliteratorUnknownSize(java.util.Iterator, int)从迭代器中创建一个分割器。 虽然这样的分割器可以工作,但它可能会提供较差的并行性能,因为我们已经丢失了大小信息(底层数据集有多大),并且受限于简单的分割算法。

更高质量的分割器将提供均衡和已知尺寸的分割,准确的尺寸信息以及分割器或可供实现用于优化执行的数据的其他 characteristics分割器。

用于可变数据源的Spliterator还有一个额外的挑战; 绑定到数据的时间,因为数据可能会在创建分割器的时间和流式管道执行的时间之间发生变化。 理想情况下,流的分割器会报告IMMUTABLECONCURRENT的特征; 如果不是,应该是late-binding 如果源不能直接提供推荐的分割器,则它可能使用Supplier间接提供分割器,并通过Supplier接收版本stream()构建流。 分流器仅在流管道的终端操作开始后才从供应商处获得。

这些要求显着减少了流源的突变与流管线的执行之间潜在干扰的范围。 基于具有所需特征的分割器的流或使用基于供应商的工厂形式的分流器在开始终端操作之前不受数据源修改的影响(假设流操作的行为参数满足非流行操作的要求标准,干扰和无状态)。 详情请参阅Non-Interference

Interfaces

BaseStream<T, S extends BaseStream<T, S>> 流的基本接口,它是支持顺序和并行聚合操作的元素序列。
Collector<T, A, R> 将输入元素累加到可变结果容器中的一个 mutable reduction operation ,可选地在所有输入元素已被处理之后将累积结果变换成最终表示。
DoubleStream 支持顺序和并行聚合操作的一系列原始双值元素。
DoubleStream.Builder DoubleStream可变建设者。
IntStream 一系列支持顺序和并行聚合操作的基本int值元素。
IntStream.Builder IntStream可变建设者。
LongStream 一系列支持顺序和并行聚合操作的原始长值元素。
LongStream.Builder LongStream可变建设者。
Stream<T> 一系列支持顺序和并行集合操作的元素。
Stream.Builder<T> Stream可变建设者。

Classes

Collectors 实现各种有用的缩减操作的实施例 Collector ,诸如将元素累积到集合中,根据各种准则来概括元素等等。
StreamSupport 用于创建和操作流的低级实用程序方法。

Enums

Collector.Characteristics 指示 Collector属性的特性,可用于优化缩减实现。

Hooray!