用谓词限制stream
是否有一个Java 8stream操作限制(可能是无限的) Stream
直到第一个元素无法匹配一个谓词? 在下面的例子中看起来像(不存在的) takeWhile
操作,并打印所有小于10的数字?
IntStream .iterate(1, n -> n + 1) .takeWhile(n -> n < 10) .forEach(System.out::println);
如果没有这样的操作,通用的方法是什么?
这样的操作应该可以通过Java 8 Stream
,但不一定能够高效地完成 – 例如,您不一定要并行执行这样的操作,因为您必须按顺序查看元素。
API不提供一个简单的方法来做到这一点,但最简单的方法是采取Stream.iterator()
,包装Iterator
有一个“采取”实施,然后返回到Spliterator
,然后Stream
。 或者 – 也许 – 包装Spliterator
,尽pipe在这个实现中它不能再被分割。
下面是takeWhile
上未经testing的takeWhile
实现:
static <T> Spliterator<T> takeWhile( Spliterator<T> splitr, Predicate<? super T> predicate) { return new Spliterators.AbstractSpliterator<T>(splitr.estimateSize(), 0) { boolean stillGoing = true; @Override public boolean tryAdvance(Consumer<? super T> consumer) { if (stillGoing) { boolean hadNext = splitr.tryAdvance(elem -> { if (predicate.test(elem)) { consumer.accept(elem); } else { stillGoing = false; } }); return hadNext && stillGoing; } return false; } }; } static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<? super T> predicate) { return StreamSupport.stream(takeWhile(stream.spliterator(), predicate), false); }
操作takeWhile
和dropWhile
已被添加到JDK 9中。您的示例代码
IntStream .iterate(1, n -> n + 1) .takeWhile(n -> n < 10) .forEach(System.out::println);
在JDK 9下编译和运行时,其行为与您期望的完全相同。
在撰写本文时,JDK 9尚未发布。 所有规格如有变更。 预发布快照构build和文档可在https://jdk9.java.net/获得; 。 目前的直接链接在这里 。
allMatch()
是一个短路function,所以你可以用它来停止处理。 主要的缺点是你必须做两次testing:一次看你是否应该处理它,再看看是否继续下去。
IntStream .iterate(1, n -> n + 1) .peek(n->{if (n<10) System.out.println(n);}) .allMatch(n->n < 10);
作为@StuartMarks答案的后续。 我的StreamEx库具有与当前的JDK-9实现兼容的takeWhile
操作。 在JDK-9下运行时,它只是委托给JDK实现(通过MethodHandle.invokeExact
,它非常快)。 在JDK-8下运行时,将使用“polyfill”实现。 所以使用我的库,问题可以这样解决:
IntStreamEx.iterate(1, n -> n + 1) .takeWhile(n -> n < 10) .forEach(System.out::println);
takeWhile
是protonpack库提供的function之一。
Stream<Integer> infiniteInts = Stream.iterate(0, i -> i + 1); Stream<Integer> finiteInts = StreamUtils.takeWhile(infiniteInts, i -> i < 10); assertThat(finiteInts.collect(Collectors.toList()), hasSize(10));
我相信这可以大大改善:(有人可能会使线程安全也许)
Stream<Integer> stream = Stream.iterate(0, n -> n + 1); TakeWhile.stream(stream, n -> n < 10000) .forEach(n -> System.out.print((n == 0 ? "" + n : "," + n)));
一个肯定的黑客…不高雅 – 但它的作品〜:D
class TakeWhile<T> implements Iterator<T> { private final Iterator<T> iterator; private final Predicate<T> predicate; private volatile T next; private volatile boolean keepGoing = true; public TakeWhile(Stream<T> s, Predicate<T> p) { this.iterator = s.iterator(); this.predicate = p; } @Override public boolean hasNext() { if (!keepGoing) { return false; } if (next != null) { return true; } if (iterator.hasNext()) { next = iterator.next(); keepGoing = predicate.test(next); if (!keepGoing) { next = null; } } return next != null; } @Override public T next() { if (next == null) { if (!hasNext()) { throw new NoSuchElementException("Sorry. Nothing for you."); } } T temp = next; next = null; return temp; } public static <T> Stream<T> stream(Stream<T> s, Predicate<T> p) { TakeWhile tw = new TakeWhile(s, p); Spliterator split = Spliterators.spliterator(tw, Integer.MAX_VALUE, Spliterator.ORDERED); return StreamSupport.stream(split, false); } }
你可以使用java8 + rxjava 。
import java.util.stream.IntStream; import rx.Observable; // Example 1) IntStream intStream = IntStream.iterate(1, n -> n + 1); Observable.from(() -> intStream.iterator()) .takeWhile(n -> { System.out.println(n); return n < 10; } ).subscribe() ; // Example 2 IntStream intStream = IntStream.iterate(1, n -> n + 1); Observable.from(() -> intStream.iterator()) .takeWhile(n -> n < 10) .forEach( n -> System.out.println(n));
这是一个在整数上完成的版本 – 如问题中所述。
用法:
StreamUtil.takeWhile(IntStream.iterate(1, n -> n + 1), n -> n < 10);
以下是StreamUtil的代码:
import java.util.PrimitiveIterator; import java.util.Spliterators; import java.util.function.IntConsumer; import java.util.function.IntPredicate; import java.util.stream.IntStream; import java.util.stream.StreamSupport; public class StreamUtil { public static IntStream takeWhile(IntStream stream, IntPredicate predicate) { return StreamSupport.intStream(new PredicateIntSpliterator(stream, predicate), false); } private static class PredicateIntSpliterator extends Spliterators.AbstractIntSpliterator { private final PrimitiveIterator.OfInt iterator; private final IntPredicate predicate; public PredicateIntSpliterator(IntStream stream, IntPredicate predicate) { super(Long.MAX_VALUE, IMMUTABLE); this.iterator = stream.iterator(); this.predicate = predicate; } @Override public boolean tryAdvance(IntConsumer action) { if (iterator.hasNext()) { int value = iterator.nextInt(); if (predicate.test(value)) { action.accept(value); return true; } } return false; } } }
这是从JDK 9 java.util.stream.Stream.takeWhile(Predicate)复制的源代码。
static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<? super T> p) { class Taking extends Spliterators.AbstractSpliterator<T> implements Consumer<T> { private static final int CANCEL_CHECK_COUNT = 63; private final Spliterator<T> s; private int count; private T t; private final AtomicBoolean cancel = new AtomicBoolean(); private boolean takeOrDrop = true; Taking(Spliterator<T> s) { super(s.estimateSize(), s.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED)); this.s = s; } @Override public boolean tryAdvance(Consumer<? super T> action) { boolean test = true; if (takeOrDrop && // If can take (count != 0 || !cancel.get()) && // and if not cancelled s.tryAdvance(this) && // and if advanced one element (test = p.test(t))) { // and test on element passes action.accept(t); // then accept element return true; } else { // Taking is finished takeOrDrop = false; // Cancel all further traversal and splitting operations // only if test of element failed (short-circuited) if (!test) cancel.set(true); return false; } } @Override public Comparator<? super T> getComparator() { return s.getComparator(); } @Override public void accept(T t) { count = (count + 1) & CANCEL_CHECK_COUNT; this.t = t; } @Override public Spliterator<T> trySplit() { return null; } } return StreamSupport.stream(new Taking(stream.spliterator()), stream.isParallel()).onClose(stream::close); }
去图书馆AbacusUtil 。 它提供了你想要的确切API和更多:
IntStream.iterate(1, n -> n + 1).takeWhile(n -> n < 10).forEach(System.out::println);
声明:我是AbacusUtil的开发人员。
除了短路terminal操作之外,您不能中止stream,这会使一些stream值不处理,而不考虑它们的值。 但是如果你只是想避免在一个stream上的操作,你可以添加一个转换和过滤到stream:
import java.util.Objects; class ThingProcessor { static Thing returnNullOnCondition(Thing thing) { return( (*** is condition met ***)? null : thing); } void processThings(Collection<Thing> thingsCollection) { thingsCollection.stream() *** regular stream processing *** .map(ThingProcessor::returnNullOnCondition) .filter(Objects::nonNull) *** continue stream processing *** } } // class ThingProcessor
当事物满足某些条件时,将事物stream转换为空值,然后过滤掉空值。 如果你愿意沉迷于副作用,一旦遇到一些事情,你可以设置条件值为真,所有后续的事情都会被过滤掉,而不pipe它们的值如何。 但是,即使不是,您也可以通过从您不想处理的stream中过滤值来节省大量(如果不是全部的话)处理。
实际上有两种方法可以在Java 8中完成,没有任何额外的库或使用Java 9。
如果你想在控制台上打印2到20的数字,你可以这样做:
IntStream.iterate(2, (i) -> i + 2).peek(System.out::println).allMatch(i -> i < 20);
要么
IntStream.iterate(2, (i) -> i + 2).peek(System.out::println).anyMatch(i -> i >= 20);
输出是在这两种情况下:
2 4 6 8 10 12 14 16 18 20
没有人提到anyMatch呢。 这是这篇文章的原因。
即使我有类似的要求 – 调用Web服务,如果失败,重试3次。 如果在经过这么多次审判后仍然失败,请发送电子邮件通知。 经过Googlesearch,“anyMatch”成为了一个救世主。 我的示例代码如下。 在以下示例中,如果“webServiceCall”方法在第一次迭代本身中返回true,则由于我们称之为“anyMatch”,stream不会进一步迭代。 我相信,这是你正在寻找的。
import java.util.stream.IntStream;
import io.netty.util.internal.ThreadLocalRandom;
类TrialStreamMatch {
public static void main(String[] args) { if(!IntStream.range(1,3).anyMatch(integ -> webServiceCall(integ))){ //Code for sending email notifications } } public static boolean webServiceCall(int i){ //For time being, I have written a code for generating boolean randomly //This whole piece needs to be replaced by actual web-service client code boolean bool = ThreadLocalRandom.current().nextBoolean(); System.out.println("Iteration index :: "+i+" bool :: "+bool); //Return success status -- true or false return bool; }
}
我有另一个快速的解决scheme,通过实施(这实际上是不干净的,但你明白了):
public static void main(String[] args) { System.out.println(StreamUtil.iterate(1, o -> o + 1).terminateOn(15) .map(o -> o.toString()).collect(Collectors.joining(", "))); } static interface TerminatedStream<T> { Stream<T> terminateOn(T e); } static class StreamUtil { static <T> TerminatedStream<T> iterate(T seed, UnaryOperator<T> op) { return new TerminatedStream<T>() { public Stream<T> terminateOn(T e) { Builder<T> builder = Stream.<T> builder().add(seed); T current = seed; while (!current.equals(e)) { current = op.apply(current); builder.add(current); } return builder.build(); } }; } }
这里是我使用Java Stream库的尝试。
IntStream.iterate(0, i -> i + 1) .filter(n -> { if (n < 10) { System.out.println(n); return false; } else { return true; } }) .findAny();
如果你有不同的问题,可能需要不同的解决scheme,但是对于你目前的问题,我只需要:
IntStream .iterate(1, n -> n + 1) .limit(10) .forEach(System.out::println);
- SwingWorker,Thread.sleep()或javax.swing.timer? 我需要“插入一个暂停”
- 你可以扔什么Java?
- 为Java程序创buildWindows安装程序
- Javadoc注释中的多行代码示例
- 按属性分组对象列表:Java
- 使用JPA存储映射<String,String>
- Java和libGDX / LWJGL游戏全屏错误的大小在Ubuntu上的多个显示器
- Java中System.out.println的含义是什么?
- IncompatibleClassChangeError:class ClassMetadataReadingVisitor将接口ClassVisitor作为超类