获取stream的最后一个元素的最有效的方法
stream没有last()
方法:
Stream<T> stream; T last = stream.last(); // No such method
什么是最优雅和/或有效的方式来获得最后一个元素(或空的stream空)?
做一个简单的返回当前值的简化:
Stream<T> stream; T last = stream.reduce((a, b) -> b).orElse(null);
这在很大程度上取决于Stream
的性质。 请记住,“简单”并不一定意味着“高效”。 如果你怀疑这条河stream非常大,进行繁重的操作或者有一个预先知道尺寸的源头,那么下面的方法可能比简单的scheme更有效率:
static <T> T getLast(Stream<T> stream) { Spliterator<T> sp=stream.spliterator(); if(sp.hasCharacteristics(Spliterator.SIZED|Spliterator.SUBSIZED)) { for(;;) { Spliterator<T> part=sp.trySplit(); if(part==null) break; if(sp.getExactSizeIfKnown()==0) { sp=part; break; } } } T value=null; for(Iterator<T> it=recursive(sp); it.hasNext(); ) value=it.next(); return value; } private static <T> Iterator<T> recursive(Spliterator<T> sp) { Spliterator<T> prev=sp.trySplit(); if(prev==null) return Spliterators.iterator(sp); Iterator<T> it=recursive(sp); if(it!=null && it.hasNext()) return it; return recursive(prev); }
你可以用下面的例子来说明不同之处:
String s=getLast( IntStream.range(0, 10_000_000).mapToObj(i-> { System.out.println("potential heavy operation on "+i); return String.valueOf(i); }).parallel() ); System.out.println(s);
它将打印:
potential heavy operation on 9999999 9999999
换句话说,它没有在第一个9999999元素上执行操作,而只在最后一个元素上执行。
这只是对Holger答案的一个重构,因为代码虽然太棒了,但有点难以理解,特别是对于Java之前不是C程序员的人来说。 希望我的重构示例类对那些不熟悉分裂者,他们做什么,或者他们如何工作的人来说更容易一些。
public class LastElementFinderExample { public static void main(String[] args){ String s = getLast( LongStream.range(0, 10_000_000_000L).mapToObj(i-> { System.out.println("potential heavy operation on "+i); return String.valueOf(i); }).parallel() ); System.out.println(s); } public static <T> T getLast(Stream<T> stream){ Spliterator<T> sp = stream.spliterator(); if(isSized(sp)) { sp = getLastSplit(sp); } return getIteratorLastValue(getLastIterator(sp)); } private static boolean isSized(Spliterator<?> sp){ return sp.hasCharacteristics(Spliterator.SIZED|Spliterator.SUBSIZED); } private static <T> Spliterator<T> getLastSplit(Spliterator<T> sp){ return splitUntil(sp, s->s.getExactSizeIfKnown() == 0); } private static <T> Iterator<T> getLastIterator(Spliterator<T> sp) { return Spliterators.iterator(splitUntil(sp, null)); } private static <T> T getIteratorLastValue(Iterator<T> it){ T result = null; while (it.hasNext()){ result = it.next(); } return result; } private static <T> Spliterator<T> splitUntil(Spliterator<T> sp, Predicate<Spliterator<T>> condition){ Spliterator<T> result = sp; for (Spliterator<T> part = sp.trySplit(); part != null; part = result.trySplit()){ if (condition == null || condition.test(result)){ result = part; } } return result; } }
我相信这个解决scheme比Holger的解决scheme更有效率和可读性:
import java.util.Spliterator; import static java.util.Spliterator.ORDERED; import java.util.stream.Stream; /** * @param <T> the type of elements in the stream * @param stream a stream * @return the last element in the stream * @throws AssertionError if the stream is unordered */ public static <T> Optional<T> getLastElement(Stream<T> stream) { Spliterator<T> spliterator = stream.spliterator(); assert (spliterator.hasCharacteristics(ORDERED)): "Operation makes no sense on unordered streams"; // First we skip as many elements as possible Consumer<T> noOp = input -> {}; while (true) { // trySplit() moves the first spliterator forward by the size of the second spliterator Spliterator<T> second = spliterator.trySplit(); if (second == null) break; if (!spliterator.tryAdvance(noOp)) { // If the first spliterator is empty, continue splitting the second spliterator spliterator = second; } } // Then we consume the last element(s) LastElementConsumer<T> consumer = new LastElementConsumer<>(); spliterator.forEachRemaining(consumer); return consumer.get(); }
[…]
import java.util.Optional; import java.util.function.Consumer; /** * A consumer that returns the last value that was consumed. * <p> * @param <T> the type of elements to consume * @author Gili Tzabari */ public final class LastElementConsumer<T> implements Consumer<T> { private Optional<T> result = Optional.empty(); @Override public void accept(T t) { result = Optional.of(t); } /** * @return the last value that was consumed */ public Optional<T> get() { return result; } }
如果你运行:
String s = getLastElement(IntStream.range(0, 10_000_000).mapToObj(i-> { System.out.println("Potential heavy operation on " + i); return String.valueOf(i); }).parallel() ); System.out.println(s);
它将打印与Holger解决scheme相同的输出:
Potential heavy operation on 9999999 9999999
换句话说,它没有在第一个9999999元素上执行操作,而只在最后一个元素上执行。
这是另一个解决scheme(效率不高):
List<String> list = Arrays.asList("abc","ab","cc"); long count = list.stream().count(); list.stream().skip(count-1).findFirst().ifPresent(System.out::println);
使用“skip”方法的并行未分类stream是棘手的,@ Holger的实现给出了错误的答案。 另外@ Holger的实现有点慢,因为它使用迭代器。
@Holger的优化回答:
public static <T> Optional<T> last(Stream<? extends T> stream) { Objects.requireNonNull(stream, "stream"); Spliterator<? extends T> spliterator = stream.spliterator(); Spliterator<? extends T> lastSpliterator = spliterator; // Note that this method does not work very well with: // unsized parallel streams when used with skip methods. // on that cases it will answer Optional.empty. // Find the last spliterator with estimate size // Meaningfull only on unsized parallel streams if(spliterator.estimateSize() == Long.MAX_VALUE) { for (Spliterator<? extends T> prev = spliterator.trySplit(); prev != null; prev = spliterator.trySplit()) { lastSpliterator = prev; } } // Find the last spliterator on sized streams // Meaningfull only on parallel streams (note that unsized was transformed in sized) for (Spliterator<? extends T> prev = lastSpliterator.trySplit(); prev != null; prev = lastSpliterator.trySplit()) { if (lastSpliterator.estimateSize() == 0) { lastSpliterator = prev; break; } } // Find the last element of the last spliterator // Parallel streams only performs operation on one element AtomicReference<T> last = new AtomicReference<>(); lastSpliterator.forEachRemaining(last::set); return Optional.ofNullable(last.get()); }
使用junit进行unit testing5:
@Test @DisplayName("last sequential sized") void last_sequential_sized() throws Exception { long expected = 10_000_000L; AtomicLong count = new AtomicLong(); Stream<Long> stream = LongStream.rangeClosed(1, expected).boxed(); stream = stream.skip(50_000).peek(num -> count.getAndIncrement()); assertThat(Streams.last(stream)).hasValue(expected); assertThat(count).hasValue(9_950_000L); } @Test @DisplayName("last sequential unsized") void last_sequential_unsized() throws Exception { long expected = 10_000_000L; AtomicLong count = new AtomicLong(); Stream<Long> stream = LongStream.rangeClosed(1, expected).boxed(); stream = StreamSupport.stream(((Iterable<Long>) stream::iterator).spliterator(), stream.isParallel()); stream = stream.skip(50_000).peek(num -> count.getAndIncrement()); assertThat(Streams.last(stream)).hasValue(expected); assertThat(count).hasValue(9_950_000L); } @Test @DisplayName("last parallel sized") void last_parallel_sized() throws Exception { long expected = 10_000_000L; AtomicLong count = new AtomicLong(); Stream<Long> stream = LongStream.rangeClosed(1, expected).boxed().parallel(); stream = stream.skip(50_000).peek(num -> count.getAndIncrement()); assertThat(Streams.last(stream)).hasValue(expected); assertThat(count).hasValue(1); } @Test @DisplayName("getLast parallel unsized") void last_parallel_unsized() throws Exception { long expected = 10_000_000L; AtomicLong count = new AtomicLong(); Stream<Long> stream = LongStream.rangeClosed(1, expected).boxed().parallel(); stream = StreamSupport.stream(((Iterable<Long>) stream::iterator).spliterator(), stream.isParallel()); stream = stream.peek(num -> count.getAndIncrement()); assertThat(Streams.last(stream)).hasValue(expected); assertThat(count).hasValue(1); } @Test @DisplayName("last parallel unsized with skip") void last_parallel_unsized_with_skip() throws Exception { long expected = 10_000_000L; AtomicLong count = new AtomicLong(); Stream<Long> stream = LongStream.rangeClosed(1, expected).boxed().parallel(); stream = StreamSupport.stream(((Iterable<Long>) stream::iterator).spliterator(), stream.isParallel()); stream = stream.skip(50_000).peek(num -> count.getAndIncrement()); // Unfortunately unsized parallel streams does not work very well with skip //assertThat(Streams.last(stream)).hasValue(expected); //assertThat(count).hasValue(1); // @Holger implementation gives wrong answer!! //assertThat(Streams.getLast(stream)).hasValue(9_950_000L); //!!! //assertThat(count).hasValue(1); // This is also not a very good answer better assertThat(Streams.last(stream)).isEmpty(); assertThat(count).hasValue(0); }
支持这两种场景的唯一解决scheme是避免检测未经处理的并行stream上的最后一个分割器。 其结果是解决scheme将执行所有元素的操作,但它总是会给出正确的答案。
请注意,在顺序stream中,它将无论如何对所有元素执行操作。
public static <T> Optional<T> last(Stream<? extends T> stream) { Objects.requireNonNull(stream, "stream"); Spliterator<? extends T> spliterator = stream.spliterator(); // Find the last spliterator with estimate size (sized parallel streams) if(spliterator.hasCharacteristics(Spliterator.SIZED|Spliterator.SUBSIZED)) { // Find the last spliterator on sized streams (parallel streams) for (Spliterator<? extends T> prev = spliterator.trySplit(); prev != null; prev = spliterator.trySplit()) { if (spliterator.getExactSizeIfKnown() == 0) { spliterator = prev; break; } } } // Find the last element of the spliterator //AtomicReference<T> last = new AtomicReference<>(); //spliterator.forEachRemaining(last::set); //return Optional.ofNullable(last.get()); // A better one that supports native parallel streams return (Optional<T>) StreamSupport.stream(spliterator, stream.isParallel()) .reduce((a, b) -> b); }
关于该实现的unit testing,前三个testing是完全相同的(顺序和大小平行)。 未经平行处理的testing在这里:
@Test @DisplayName("last parallel unsized") void last_parallel_unsized() throws Exception { long expected = 10_000_000L; AtomicLong count = new AtomicLong(); Stream<Long> stream = LongStream.rangeClosed(1, expected).boxed().parallel(); stream = StreamSupport.stream(((Iterable<Long>) stream::iterator).spliterator(), stream.isParallel()); stream = stream.peek(num -> count.getAndIncrement()); assertThat(Streams.last(stream)).hasValue(expected); assertThat(count).hasValue(10_000_000L); } @Test @DisplayName("last parallel unsized with skip") void last_parallel_unsized_with_skip() throws Exception { long expected = 10_000_000L; AtomicLong count = new AtomicLong(); Stream<Long> stream = LongStream.rangeClosed(1, expected).boxed().parallel(); stream = StreamSupport.stream(((Iterable<Long>) stream::iterator).spliterator(), stream.isParallel()); stream = stream.skip(50_000).peek(num -> count.getAndIncrement()); assertThat(Streams.last(stream)).hasValue(expected); assertThat(count).hasValue(9_950_000L); }