复制stream以避免“stream已经被操作或closures”(Java 8)
我想复制一个Java 8stream,以便我可以处理它两次。 我可以collect
一个列表,并从中获得新的stream;
// doSomething() returns a stream List<A> thing = doSomething().collect(toList()); thing.stream()... // do stuff thing.stream()... // do other stuff
但我认为应该有一个更高效/优雅的方式。 有没有办法复制stream而不把它变成一个集合?
实际上,我正在处理任意一个stream,所以我们需要先处理左侧的投影,然后再移动到正确的投影中,然后用另一种方式处理。 有点像这样(到目前为止,我不得不使用toList
技巧)。
List<Either<Pair<A, Throwable>, A>> results = doSomething().collect(toList()); Stream<Pair<A, Throwable>> failures = results.stream().flatMap(either -> either.left()); failures.forEach(failure -> ... ); Stream<A> successes = results.stream().flatMap(either -> either.right()); successes.forEach(success -> ... );
我认为你对效率的假设是倒退的。 如果您只使用一次数据,则可以获得巨大的效率回报,因为您无需存储数据,而stream为您提供了强大的“循环融合”优化function,可以让您通过pipe道有效地传输整个数据。
如果您想要重复使用相同的数据,那么根据定义,您必须生成两次(确定性地)或存储它。 如果它已经在一个集合中,好极了; 那么迭代两次便宜。
我们在“叉stream”的devise中做过实验。 我们发现,这种支持有实际成本。 它以牺牲这种罕见的情况为代价来承担一般情况(使用一次)。 最大的问题是处理“两条pipe道不以相同速率消耗数据时会发生什么”。 现在你又回到了缓冲区。 这是一个显然没有发挥重要作用的特征。
如果您想要重复操作相同的数据,请将其存储起来,或将您的操作组织为消费者,然后执行以下操作:
stream()...stuff....forEach(e -> { consumerA(e); consumerB(e); });
您也可以查看RxJava库,因为它的处理模式更适合这种“stream式分岔”。
使用java.util.function.Supplier 。
来自http://winterbe.com/posts/2014/07/31/java8-stream-tutorial-examples/ :
重用stream
Java 8stream不能被重用。 一旦你打电话给任何terminal操作streamclosures:
Stream<String> stream = Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> s.startsWith("a")); stream.anyMatch(s -> true); // ok stream.noneMatch(s -> true); // exception
anyMatch在同一个stream之后调用noneMatch导致以下exception:
java.lang.IllegalStateException: stream has already been operated upon or closed at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229) at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459) at com.winterbe.java8.Streams5.test7(Streams5.java:38) at com.winterbe.java8.Streams5.main(Streams5.java:28)
为了克服这个限制,我们必须为每个我们想要执行的terminal操作创build一个新的stream链,例如我们可以创build一个stream供应商来构build一个新的stream,其中已经build立了所有的中间操作:
Supplier<Stream<String>> streamSupplier = () -> Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> s.startsWith("a")); streamSupplier.get().anyMatch(s -> true); // ok streamSupplier.get().noneMatch(s -> true); // ok
每次调用
get()
构造一个新的stream,我们将保存该stream以调用所需的terminal操作。
我们为jOOλ中的stream实现了一个duplicate()
方法,我们创build了一个开源库来改进jOOQ的集成testing。 本质上,你可以写:
Tuple2<Seq<A>, Seq<A>> duplicates = Seq.seq(doSomething()).duplicate();
在内部,有一个缓冲区存储从一个stream中消耗的所有值,但不是从另一个stream消耗的值。 如果你的两个stream以相同的速度消耗,那么这可能是高效的, 如果你可以忍受线程安全的缺乏 。
algorithm的工作原理如下:
static <T> Tuple2<Seq<T>, Seq<T>> duplicate(Stream<T> stream) { final List<T> gap = new LinkedList<>(); final Iterator<T> it = stream.iterator(); @SuppressWarnings("unchecked") final Iterator<T>[] ahead = new Iterator[] { null }; class Duplicate implements Iterator<T> { @Override public boolean hasNext() { if (ahead[0] == null || ahead[0] == this) return it.hasNext(); return !gap.isEmpty(); } @Override public T next() { if (ahead[0] == null) ahead[0] = this; if (ahead[0] == this) { T value = it.next(); gap.offer(value); return value; } return gap.poll(); } } return tuple(seq(new Duplicate()), seq(new Duplicate())); }
更多源代码在这里
Tuple2
可能像你的Pair
types,而Seq
是Stream
的一些增强。
您可以创build一个可运行的stream(例如):
results.stream() .flatMap(either -> Stream.<Runnable> of( () -> failure(either.left()), () -> success(either.right()))) .forEach(Runnable::run);
如果failure
和success
是应用的操作。 然而,这将会创build相当多的临时对象,并且可能不会比从一个集合和stream/重复两次开始更有效率。
处理元素多次的另一种方法是使用Stream.peek(Consumer) :
doSomething().stream() .peek(either -> handleFailure(either.left())) .foreach(either -> handleSuccess(either.right()));
peek(Consumer)
可以根据需要多次链接。
doSomething().stream() .peek(element -> handleFoo(element.foo())) .peek(element -> handleBar(element.bar())) .peek(element -> handleBaz(element.baz())) .foreach(element-> handleQux(element.qux()));
独眼巨人反应 ,一个我贡献的图书馆,有一个静态的方法,可以让你复制一个stream(并返回一个JOOλstream的元组)。
Stream<Integer> stream = Stream.of(1,2,3); Tuple2<Stream<Integer>,Stream<Integer>> streams = StreamUtils.duplicate(stream);
请参阅评论,在现有的stream上使用重复项时会导致性能损失。 更高效的替代方法是使用Streamable:
还有一个(懒惰)的Streamable类,可以从Stream,Iterable或Array构造并重复播放多次。
Streamable<Integer> streamable = Streamable.of(1,2,3); streamable.stream().forEach(System.out::println); streamable.stream().forEach(System.out::println);
AsStreamable.synchronizedFromStream(stream) – 可以用来创build一个Streamable,它将以一种可以跨线程共享的方式延迟填充它的后备集合。 Streamable.fromStream(stream)不会产生任何同步开销。
对于这个特定的问题,你也可以使用分区。 就像是
// Partition Eighters into left and right List<Either<Pair<A, Throwable>, A>> results = doSomething(); Map<Boolean, Object> passingFailing = results.collect(Collectors.partitioningBy(s -> s.isLeft())); passingFailing.get(true) <- here will be all passing (left values) passingFailing.get(false) <- here will be all failing (right values)