用谓词限制stream

是否有一个Java 8stream操作限制(可能是无限的) Stream直到第一个元素无法匹配一个谓词? 在下面的例子中看起来像(不存在的) takeWhile操作,并打印所有小于10的数字?

 IntStream .iterate(1, n -> n + 1) .takeWhile(n -> n < 10) .forEach(System.out::println); 

如果没有这样的操作,通用的方法是什么?

这样的操作应该可以通过Java 8 Stream ,但不一定能够高效地完成 – 例如,您不一定要并行执行这样的操作,因为您必须按顺序查看元素。

API不提供一个简单的方法来做到这一点,但最简单的方法是采取Stream.iterator() ,包装Iterator有一个“采取”实施,然后返回到Spliterator ,然后Stream 。 或者 – 也许 – 包装Spliterator ,尽pipe在这个实现中它不能再被分割。

下面是takeWhile上未经testing的takeWhile实现:

 static <T> Spliterator<T> takeWhile( Spliterator<T> splitr, Predicate<? super T> predicate) { return new Spliterators.AbstractSpliterator<T>(splitr.estimateSize(), 0) { boolean stillGoing = true; @Override public boolean tryAdvance(Consumer<? super T> consumer) { if (stillGoing) { boolean hadNext = splitr.tryAdvance(elem -> { if (predicate.test(elem)) { consumer.accept(elem); } else { stillGoing = false; } }); return hadNext && stillGoing; } return false; } }; } static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<? super T> predicate) { return StreamSupport.stream(takeWhile(stream.spliterator(), predicate), false); } 

操作takeWhiledropWhile已被添加到JDK 9中。您的示例代码

 IntStream .iterate(1, n -> n + 1) .takeWhile(n -> n < 10) .forEach(System.out::println); 

在JDK 9下编译和运行时,其行为与您期望的完全相同。

在撰写本文时,JDK 9尚未发布。 所有规格如有变更。 预发布快照构build和文档可在https://jdk9.java.net/获得; 。 目前的直接链接在这里 。

allMatch()是一个短路function,所以你可以用它来停止处理。 主要的缺点是你必须做两次testing:一次看你是否应该处理它,再看看是否继续下去。

 IntStream .iterate(1, n -> n + 1) .peek(n->{if (n<10) System.out.println(n);}) .allMatch(n->n < 10); 

作为@StuartMarks答案的后续。 我的StreamEx库具有与当前的JDK-9实现兼容的takeWhile操作。 在JDK-9下运行时,它只是委托给JDK实现(通过MethodHandle.invokeExact ,它非常快)。 在JDK-8下运行时,将使用“polyfill”实现。 所以使用我的库,问题可以这样解决:

 IntStreamEx.iterate(1, n -> n + 1) .takeWhile(n -> n < 10) .forEach(System.out::println); 

takeWhile是protonpack库提供的function之一。

 Stream<Integer> infiniteInts = Stream.iterate(0, i -> i + 1); Stream<Integer> finiteInts = StreamUtils.takeWhile(infiniteInts, i -> i < 10); assertThat(finiteInts.collect(Collectors.toList()), hasSize(10)); 

我相信这可以大大改善:(有人可能会使线程安全也许)

 Stream<Integer> stream = Stream.iterate(0, n -> n + 1); TakeWhile.stream(stream, n -> n < 10000) .forEach(n -> System.out.print((n == 0 ? "" + n : "," + n))); 

一个肯定的黑客…不高雅 – 但它的作品〜:D

 class TakeWhile<T> implements Iterator<T> { private final Iterator<T> iterator; private final Predicate<T> predicate; private volatile T next; private volatile boolean keepGoing = true; public TakeWhile(Stream<T> s, Predicate<T> p) { this.iterator = s.iterator(); this.predicate = p; } @Override public boolean hasNext() { if (!keepGoing) { return false; } if (next != null) { return true; } if (iterator.hasNext()) { next = iterator.next(); keepGoing = predicate.test(next); if (!keepGoing) { next = null; } } return next != null; } @Override public T next() { if (next == null) { if (!hasNext()) { throw new NoSuchElementException("Sorry. Nothing for you."); } } T temp = next; next = null; return temp; } public static <T> Stream<T> stream(Stream<T> s, Predicate<T> p) { TakeWhile tw = new TakeWhile(s, p); Spliterator split = Spliterators.spliterator(tw, Integer.MAX_VALUE, Spliterator.ORDERED); return StreamSupport.stream(split, false); } } 

你可以使用java8 + rxjava 。

 import java.util.stream.IntStream; import rx.Observable; // Example 1) IntStream intStream = IntStream.iterate(1, n -> n + 1); Observable.from(() -> intStream.iterator()) .takeWhile(n -> { System.out.println(n); return n < 10; } ).subscribe() ; // Example 2 IntStream intStream = IntStream.iterate(1, n -> n + 1); Observable.from(() -> intStream.iterator()) .takeWhile(n -> n < 10) .forEach( n -> System.out.println(n)); 

这是一个在整数上完成的版本 – 如问题中所述。

用法:

 StreamUtil.takeWhile(IntStream.iterate(1, n -> n + 1), n -> n < 10); 

以下是StreamUtil的代码:

 import java.util.PrimitiveIterator; import java.util.Spliterators; import java.util.function.IntConsumer; import java.util.function.IntPredicate; import java.util.stream.IntStream; import java.util.stream.StreamSupport; public class StreamUtil { public static IntStream takeWhile(IntStream stream, IntPredicate predicate) { return StreamSupport.intStream(new PredicateIntSpliterator(stream, predicate), false); } private static class PredicateIntSpliterator extends Spliterators.AbstractIntSpliterator { private final PrimitiveIterator.OfInt iterator; private final IntPredicate predicate; public PredicateIntSpliterator(IntStream stream, IntPredicate predicate) { super(Long.MAX_VALUE, IMMUTABLE); this.iterator = stream.iterator(); this.predicate = predicate; } @Override public boolean tryAdvance(IntConsumer action) { if (iterator.hasNext()) { int value = iterator.nextInt(); if (predicate.test(value)) { action.accept(value); return true; } } return false; } } } 

这是从JDK 9 java.util.stream.Stream.takeWhile(Predicate)复制的源代码。

 static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<? super T> p) { class Taking extends Spliterators.AbstractSpliterator<T> implements Consumer<T> { private static final int CANCEL_CHECK_COUNT = 63; private final Spliterator<T> s; private int count; private T t; private final AtomicBoolean cancel = new AtomicBoolean(); private boolean takeOrDrop = true; Taking(Spliterator<T> s) { super(s.estimateSize(), s.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED)); this.s = s; } @Override public boolean tryAdvance(Consumer<? super T> action) { boolean test = true; if (takeOrDrop && // If can take (count != 0 || !cancel.get()) && // and if not cancelled s.tryAdvance(this) && // and if advanced one element (test = p.test(t))) { // and test on element passes action.accept(t); // then accept element return true; } else { // Taking is finished takeOrDrop = false; // Cancel all further traversal and splitting operations // only if test of element failed (short-circuited) if (!test) cancel.set(true); return false; } } @Override public Comparator<? super T> getComparator() { return s.getComparator(); } @Override public void accept(T t) { count = (count + 1) & CANCEL_CHECK_COUNT; this.t = t; } @Override public Spliterator<T> trySplit() { return null; } } return StreamSupport.stream(new Taking(stream.spliterator()), stream.isParallel()).onClose(stream::close); } 

去图书馆AbacusUtil 。 它提供了你想要的确切API和更多:

 IntStream.iterate(1, n -> n + 1).takeWhile(n -> n < 10).forEach(System.out::println); 

声明:我是AbacusUtil的开发人员。

除了短路terminal操作之外,您不能中止stream,这会使一些stream值不处理,而不考虑它们的值。 但是如果你只是想避免在一个stream上的操作,你可以添加一个转换和过滤到stream:

 import java.util.Objects; class ThingProcessor { static Thing returnNullOnCondition(Thing thing) { return( (*** is condition met ***)? null : thing); } void processThings(Collection<Thing> thingsCollection) { thingsCollection.stream() *** regular stream processing *** .map(ThingProcessor::returnNullOnCondition) .filter(Objects::nonNull) *** continue stream processing *** } } // class ThingProcessor 

当事物满足某些条件时,将事物stream转换为空值,然后过滤掉空值。 如果你愿意沉迷于副作用,一旦遇到一些事情,你可以设置条件值为真,所有后续的事情都会被过滤掉,而不pipe它们的值如何。 但是,即使不是,您也可以通过从您不想处理的stream中过滤值来节省大量(如果不是全部的话)处理。

实际上有两种方法可以在Java 8中完成,没有任何额外的库或使用Java 9。

如果你想在控制台上打印2到20的数字,你可以这样做:

 IntStream.iterate(2, (i) -> i + 2).peek(System.out::println).allMatch(i -> i < 20); 

要么

 IntStream.iterate(2, (i) -> i + 2).peek(System.out::println).anyMatch(i -> i >= 20); 

输出是在这两种情况下:

 2 4 6 8 10 12 14 16 18 20 

没有人提到anyMatch呢。 这是这篇文章的原因。

即使我有类似的要求 – 调用Web服务,如果失败,重试3次。 如果在经过这么多次审判后仍然失败,请发送电子邮件通知。 经过Googlesearch,“anyMatch”成为了一个救世主。 我的示例代码如下。 在以下示例中,如果“webServiceCall”方法在第一次迭代本身中返回true,则由于我们称之为“anyMatch”,stream不会进一步迭代。 我相信,这是你正在寻找的。

import java.util.stream.IntStream;

import io.netty.util.internal.ThreadLocalRandom;

类TrialStreamMatch {

 public static void main(String[] args) { if(!IntStream.range(1,3).anyMatch(integ -> webServiceCall(integ))){ //Code for sending email notifications } } public static boolean webServiceCall(int i){ //For time being, I have written a code for generating boolean randomly //This whole piece needs to be replaced by actual web-service client code boolean bool = ThreadLocalRandom.current().nextBoolean(); System.out.println("Iteration index :: "+i+" bool :: "+bool); //Return success status -- true or false return bool; } 

}

我有另一个快速的解决scheme,通过实施(这实际上是不干净的,但你明白了):

 public static void main(String[] args) { System.out.println(StreamUtil.iterate(1, o -> o + 1).terminateOn(15) .map(o -> o.toString()).collect(Collectors.joining(", "))); } static interface TerminatedStream<T> { Stream<T> terminateOn(T e); } static class StreamUtil { static <T> TerminatedStream<T> iterate(T seed, UnaryOperator<T> op) { return new TerminatedStream<T>() { public Stream<T> terminateOn(T e) { Builder<T> builder = Stream.<T> builder().add(seed); T current = seed; while (!current.equals(e)) { current = op.apply(current); builder.add(current); } return builder.build(); } }; } } 

这里是我使用Java Stream库的尝试。

  IntStream.iterate(0, i -> i + 1) .filter(n -> { if (n < 10) { System.out.println(n); return false; } else { return true; } }) .findAny(); 

如果你有不同的问题,可能需要不同的解决scheme,但是对于你目前的问题,我只需要:

 IntStream .iterate(1, n -> n + 1) .limit(10) .forEach(System.out::println);