从stream中收集连续的对

给定诸如{ 0, 1, 2, 3, 4 }

我怎样才能最优雅地将其转化为给定的forms:

{ new Pair(0, 1), new Pair(1, 2), new Pair(2, 3), new Pair(3, 4) }

(当然,我已经定义了类对)?

编辑:这不是严格整数或原始stream。 答案应该是一般的任何types的stream。

我扩展标准stream的StreamEx库为所有streamtypes提供了一个pairMap方法。 对于原始stream,它不会改变stream的types,但可以用来进行一些计算。 最常见的用法是计算差异:

 int[] pairwiseDiffs = IntStreamEx.of(input).pairMap((a, b) -> (ba)).toArray(); 

对于对象stream,您可以创build任何其他对象types。 我的库不提供任何新的用户可见的数据结构,如Pair (这是库概念的一部分)。 但是,如果您有自己的Pair类并想要使用它,则可以执行以下操作:

 Stream<Pair> pairs = IntStreamEx.of(input).boxed().pairMap(Pair::new); 

或者如果你已经有一些Stream

 Stream<Pair> pairs = StreamEx.of(stream).pairMap(Pair::new); 

该function是使用自定义分割器实现的。 它具有相当低的开销,可以很好地并行化。 当然,它可以和任何stream源一起工作,不像其他许多解决scheme那样只是随机访问列表/数组。 在许多testing中performance非常好。 这是一个JMH基准,我们使用不同的方法find所有的input值在更大的值之前(见这个问题)。

Java 8stream库主要用于将stream分割成更小的块进行并行处理,因此有状态的stream水线阶段非常有限,不支持获取当前stream元素的索引和访问相邻stream元素等。

解决这些问题的一个典型方法,当然有一些限制,就是通过索引来驱动stream,并且依赖于在一些随机访问数据结构中处理值,比如可以从中检索元素的ArrayList。 如果这些值在arrayList ,可以按照如下方式生成对:

  IntStream.range(1, arrayList.size()) .mapToObj(i -> new Pair(arrayList.get(i-1), arrayList.get(i))) .forEach(System.out::println); 

当然限制是input不能是无限的stream。 不过,这个pipe道可以并行运行。

没有优雅,这是一个黑客的解决scheme,但适用于无限的stream

 Stream<Pair> pairStream = Stream.iterate(0, (i) -> i + 1).map( // natural numbers new Function<Integer, Pair>() { Integer previous; @Override public Pair apply(Integer integer) { Pair pair = null; if (previous != null) pair = new Pair(previous, integer); previous = integer; return pair; } }).skip(1); // drop first null 

现在,只要你愿意,你可以限制你的信息stream

 pairStream.limit(1_000_000).forEach(i -> System.out.println(i)); 

PS我希望有更好的解决scheme,像clojure (partition 2 1 stream)

我已经实现了一个spliterator包装器,它从原来的分割器中获取每个n元素T ,并生成List<T>

 public class ConsecutiveSpliterator<T> implements Spliterator<List<T>> { private final Spliterator<T> wrappedSpliterator; private final int n; private final Deque<T> deque; private final Consumer<T> dequeConsumer; public ConsecutiveSpliterator(Spliterator<T> wrappedSpliterator, int n) { this.wrappedSpliterator = wrappedSpliterator; this.n = n; this.deque = new ArrayDeque<>(); this.dequeConsumer = deque::addLast; } @Override public boolean tryAdvance(Consumer<? super List<T>> action) { deque.pollFirst(); fillDeque(); if (deque.size() == n) { List<T> list = new ArrayList<>(deque); action.accept(list); return true; } else { return false; } } private void fillDeque() { while (deque.size() < n && wrappedSpliterator.tryAdvance(dequeConsumer)) ; } @Override public Spliterator<List<T>> trySplit() { return null; } @Override public long estimateSize() { return wrappedSpliterator.estimateSize(); } @Override public int characteristics() { return wrappedSpliterator.characteristics(); } } 

下面的方法可以用来创build一个连续的stream:

 public <E> Stream<List<E>> consecutiveStream(Stream<E> stream, int n) { Spliterator<E> spliterator = stream.spliterator(); Spliterator<List<E>> wrapper = new ConsecutiveSpliterator<>(spliterator, n); return StreamSupport.stream(wrapper, false); } 

示例用法:

 consecutiveStream(Stream.of(0, 1, 2, 3, 4, 5), 2).map( new Function<List<Integer>, Pair>() { public Pair apply(List<Integer> list) { return new Pair(list.get(0), list.get(1)); } }).forEach(System.out::println); 

你可以用cyclops-react (我对这个库有贡献)做这个,使用滑动操作符。

  LazyFutureStream.of( 0, 1, 2, 3, 4 ) .sliding(2) .map(Pair::new); 

要么

  ReactiveSeq.of( 0, 1, 2, 3, 4 ) .sliding(2) .map(Pair::new); 

假设Pair构造函数可以接受2个元素的集合。

如果你想按4分组,并增加2也支持。

  ReactiveSeq.rangeLong( 0L,Long.MAX_VALUE) .sliding(4,2) .forEach(System.out::println); 

在CyclopsstreamStreamUtils类中还提供了通过java.util.stream.Stream创build滑动视图的等效静态方法。

  StreamUtils.sliding(Stream.of(1,2,3,4),2) .map(Pair::new); 

注意: – 对于单线程操作ReactiveSeq会更合适。 LazyFutureStream扩展了ReactiveSeq,但主要是针对并行/并行使用(这是一个期货stream)。

LazyFutureStream扩展了ReactiveSeq,它从扩展了java.util.stream.Stream的awesomeJOOλ扩展了Seq,所以Lukas提出的解决scheme也可以用于Streamtypes。 对于任何感兴趣的人来说,窗口/滑动操作符之间的主要区别在于显而易见的相对功率/复杂度折衷以及适用于无限stream(滑动并不消耗stream,而是stream动时缓冲)。

质子包库提供了窗口function。 给定一个Pair类和一个Stream,你可以这样做:

 Stream<Integer> st = Stream.iterate(0 , x -> x + 1); Stream<Pair<Integer, Integer>> pairs = StreamUtils.windowed(st, 2, 1) .map(l -> new Pair<>(l.get(0), l.get(1))) .moreStreamOps(...); 

现在对stream包含:

 (0, 1) (1, 2) (2, 3) (3, 4) (4, ...) and so on 

find连续的对

如果您愿意使用第三方库并且不需要并行性,那么jOOλ将提供SQL风格的窗口函数,如下所示

 System.out.println( Seq.of(0, 1, 2, 3, 4) .window() .filter(w -> w.lead().isPresent()) .map(w -> tuple(w.value(), w.lead().get())) // alternatively, use your new Pair() class .toList() ); 

生产

 [(0, 1), (1, 2), (2, 3), (3, 4)] 

lead()函数以遍历顺序从窗口访问下一个值。

查找连续的三元组/四元组/ n元组

在评论中的一个问题是要求一个更一般的解决scheme,其中不应该收集对,而是n元组(或可能列出)。 这是一个替代方法:

 int n = 3; System.out.println( Seq.of(0, 1, 2, 3, 4) .window(0, n - 1) .filter(w -> w.count() == n) .map(w -> w.window().toList()) .toList() ); 

产生列表的列表

 [[0, 1, 2], [1, 2, 3], [2, 3, 4]] 

没有filter(w -> w.count() == n) ,结果会是

 [[0, 1, 2], [1, 2, 3], [2, 3, 4], [3, 4], [4]] 

免责声明:我为jOOλ背后的公司工作

我们可以使用RxJava (非常强大的反应扩展库)

 IntStream intStream = IntStream.iterate(1, n -> n + 1); Observable<List<Integer>> pairObservable = Observable.from(intStream::iterator).buffer(2,1); pairObservable.take(10).forEach(b -> { b.forEach(n -> System.out.println(n)); System.out.println(); }); 

缓冲区 操作符将发射项目的Observable转换为Observable,发射这些项目的缓冲集合。

你可以用Stream.reduce()方法来做到这一点(我还没有看到使用这种技术的其他答案)。

 public static <T> List<Pair<T, T>> consecutive(List<T> list) { List<Pair<T, T>> pairs = new LinkedList<>(); list.stream().reduce((a, b) -> { pairs.add(new Pair<>(a, b)); return b; }); return pairs; } 

在你的情况下,我会写我的自定义IntFunction跟踪最后一个int传递和使用它来映射原始的IntStream。

 import java.util.function.IntFunction; import java.util.stream.IntStream; public class PairFunction implements IntFunction<PairFunction.Pair> { public static class Pair { private final int first; private final int second; public Pair(int first, int second) { this.first = first; this.second = second; } @Override public String toString() { return "[" + first + "|" + second + "]"; } } private int last; private boolean first = true; @Override public Pair apply(int value) { Pair pair = !first ? new Pair(last, value) : null; last = value; first = false; return pair; } public static void main(String[] args) { IntStream intStream = IntStream.of(0, 1, 2, 3, 4); final PairFunction pairFunction = new PairFunction(); intStream.mapToObj(pairFunction) .filter(p -> p != null) // filter out the null .forEach(System.out::println); // display each Pair } } 

这个操作本质上是有状态的,所以并不是真正意味着要解决什么stream – 请参阅javadoc中的“无状态行为”部分:

最好的方法是避免有状态的行为参数完全stream式操作

这里的一个解决scheme是通过外部计数器在你的数据stream中引入状态,虽然它只能用于连续的数据stream。

 public static void main(String[] args) { Stream<String> strings = Stream.of("a", "b", "c", "c"); AtomicReference<String> previous = new AtomicReference<>(); List<Pair> collect = strings.map(n -> { String p = previous.getAndSet(n); return p == null ? null : new Pair(p, n); }) .filter(p -> p != null) .collect(toList()); System.out.println(collect); } static class Pair<T> { private T left, right; Pair(T left, T right) { this.left = left; this.right = right; } @Override public String toString() { return "{" + left + "," + right + '}'; } } 

为了计算时间序列的时间(x值)的连续差异,我使用stream的collect(…)方法:

 final List< Long > intervals = timeSeries.data().stream() .map( TimeSeries.Datum::x ) .collect( DifferenceCollector::new, DifferenceCollector::accept, DifferenceCollector::combine ) .intervals(); 

其中的DifferenceCollector是这样的:

 pubic class DifferenceCollector implements LongConsumer { private final List< Long > intervals = new ArrayList<>(); private Long lastTime; @Override public void accept( final long time ) { if( Objects.isNull( lastTime ) ) { lastTime = time; } else { intervals.add( time - lastTime ); lastTime = time; } } public void combine( final DifferenceCollector other ) { intervals.addAll( other.intervals ); lastTime = other.lastTime; } public List< Long > intervals() { return intervals; } } 

你可以修改这个以适应你的需求。

一个优雅的解决scheme是使用zip 。 就像是:

 List<Integer> input = Arrays.asList(0, 1, 2, 3, 4); Stream<Pair> pairStream = Streams.zip(input.stream(), input.stream().substream(1), (a, b) -> new Pair(a, b) ); 

这是非常简洁和优雅,但它使用列表作为input。 无限的stream源不能被这样处理。

另一个(更麻烦的问题)是zip和整个Streams类一起被最近从API中删除。 上面的代码只适用于b95或更老版本。 所以用最新的JDK我会说没有优雅的FP风格的解决scheme,现在我们只能希望以某种方式将zip重新引入API。

这是一个有趣的问题。 我的混合尝试低于任何好处?

 public static void main(String[] args) { List<Integer> list = Arrays.asList(1, 2, 3); Iterator<Integer> first = list.iterator(); first.next(); if (first.hasNext()) list.stream() .skip(1) .map(v -> new Pair(first.next(), v)) .forEach(System.out::println); } 

我相信这不适合平行处理,因此可能会被取消资格。

正如其他人所看到的那样,由于问题的性质,需要一些有状态的东西。

我遇到了一个类似的问题,在这个问题中我想要的是Oracle SQL函数LEAD。 我的尝试如下。

 /** * Stream that pairs each element in the stream with the next subsequent element. * The final pair will have only the first item, the second will be null. */ <T> Spliterator<Pair<T>> lead(final Stream<T> stream) { final Iterator<T> input = stream.sequential().iterator(); final Iterable<Pair<T>> iterable = () -> { return new Iterator<Pair<T>>() { Optional<T> current = getOptionalNext(input); @Override public boolean hasNext() { return current.isPresent(); } @Override public Pair<T> next() { Optional<T> next = getOptionalNext(input); final Pair<T> pair = next.isPresent() ? new Pair(current.get(), next.get()) : new Pair(current.get(), null); current = next; return pair; } }; }; return iterable.spliterator(); } private <T> Optional<T> getOptionalNext(final Iterator<T> iterator) { return iterator.hasNext() ? Optional.of(iterator.next()) : Optional.empty(); } 

你可以通过使用一个有界的队列来存储在stream中stream动的元素(这是基于我在这里详细描述的思想: 是否有可能获得stream中的下一个元素? )

Belows示例首先定义了BoundedQueue类的实例,它将存储通过stream的元素(如果您不喜欢扩展LinkedList的想法,请参阅上面提到的用于替代和更通用的方法的链接)。 稍后,您只需将两个后续元素组合到Pair实例中:

 public class TwoSubsequentElems { public static void main(String[] args) { List<Integer> input = new ArrayList<Integer>(asList(0, 1, 2, 3, 4)); class BoundedQueue<T> extends LinkedList<T> { public BoundedQueue<T> save(T curElem) { if (size() == 2) { // we need to know only two subsequent elements pollLast(); // remove last to keep only requested number of elements } offerFirst(curElem); return this; } public T getPrevious() { return (size() < 2) ? null : getLast(); } public T getCurrent() { return (size() == 0) ? null : getFirst(); } } BoundedQueue<Integer> streamHistory = new BoundedQueue<Integer>(); final List<Pair<Integer>> answer = input.stream() .map(i -> streamHistory.save(i)) .filter(e -> e.getPrevious() != null) .map(e -> new Pair<Integer>(e.getPrevious(), e.getCurrent())) .collect(Collectors.toList()); answer.forEach(System.out::println); } } 

我同意@aepurniet,而是映射你必须使用mapToObj

 range(0, 100).mapToObj((i) -> new Pair(i, i+1)).forEach(System.out::println); 

运行一个从0到length-1for循环

 for(int i = 0 ; i < stream.length-1 ; i++) { Pair pair = new Pair(stream[i], stream[i+1]); // then add your pair to an array }