这是Files.lines()中的错误,还是我误解了一些关于并行stream的问题?

环境:Ubuntu x86_64(14.10),Oracle JDK 1.8u25

我尝试使用Files.lines()并行stream,但我想.skip()的第一行(这是一个带有标题的CSV文件)。 所以我尝试这样做:

 try ( final Stream<String> stream = Files.lines(thePath, StandardCharsets.UTF_8) .skip(1L).parallel(); ) { // etc } 

但是,然后一列未能parsing为int …

所以我尝试了一些简单的代码。 该文件是问题是死的简单:

 $ cat info.csv startDate;treeDepth;nrMatchers;nrLines;nrChars;nrCodePoints;nrNodes 1422758875023;34;54;151;4375;4375;27486 $ 

代码同样简单:

 public static void main(final String... args) { final Path path = Paths.get("/home/fge/tmp/dd/info.csv"); Files.lines(path, StandardCharsets.UTF_8).skip(1L).parallel() .forEach(System.out::println); } 

系统地得到以下结果(好吧,我只运行了大约20次):

 startDate;treeDepth;nrMatchers;nrLines;nrChars;nrCodePoints;nrNodes 

我在这里错过了什么?


编辑看起来像这个问题,或误解,比这更深植根于下面的两个例子(FreeNode的## java的一个研究员煮熟):

 public static void main(final String... args) { new BufferedReader(new StringReader("Hello\nWorld")).lines() .skip(1L).parallel() .forEach(System.out::println); final Iterator<String> iter = Arrays.asList("Hello", "World").iterator(); final Spliterator<String> spliterator = Spliterators.spliteratorUnknownSize(iter, Spliterator.ORDERED); final Stream<String> s = StreamSupport.stream(spliterator, true); s.skip(1L).forEach(System.out::println); } 

这打印:

 Hello Hello 

呃。

@Holgerbuild议,这种情况发生在ORDERED任何stream,而不是与这个其他样本的SIZED

 Stream.of("Hello", "World") .filter(x -> true) .parallel() .skip(1L) .forEach(System.out::println); 

另外,它源于所有已经发生的讨论,如果是这样的话,那么就是.forEach() (就像@SotiriosDelimanolis首先指出的那样 )。

由于这个问题的现状与此前的陈述完全相反,所以应该注意的是,现在Brian Goetz已经明确声明了经过skip操作的无序特性的反向传播被认为是一个bug 。 此外还指出 ,现在认为terminal操作的有序性没有任何反向传播。

还有一个相关的错误报告,JDK-8129120,其状态是“在Java 9中修复”,并且被反向 移植到Java 8,更新60

我用jdk1.8.0_60做了一些testing,看来现在的实现确实performance出更直观的行为。

这个答案已经过时 – 请阅读这一个 instEAD!


要快速回答这个问题: 观察到的行为是有意的! 没有错误,根据文档发生的一切。 但是,应该说,这种行为应该被logging和传达得更好。 forEach忽略sorting应该更明显。

我将首先介绍允许观察到的行为的概念 。 这为解释问题中给出的例子之一提供了背景。 我会在高层次上做这个,然后再在非常低的层次上做

[TL; DR:自己阅读, 高级解释会给出一个粗略的答案。]

概念

我们不要谈论Stream ,它是与stream相关的方法操作或返回的types,我们来讨论stream操作streampipe线 。 该方法调用linesskipparallel是stream操作,它build立一个streampipe道[1],正如其他人已经注意到的那样,当terminal操作forEach被调用时,stream水线被作为一个整体来处理[2]。

stream水线可以被认为是一系列的操作,它们在整个stream中被执行(例如,过滤所有元素,将剩余的元素映射到数字,将所有数字相加)。 但这是误导! 一个更好的比喻是,terminal操作通过每个操作拉动单个元素[3](例如获取下一个未过滤的元素,映射它,将它加起来,请求下一个元素)。 一些中间操作可能需要在返回所请求的下一个元素之前遍历几个(例如skip )或者甚至所有(例如sort )元素,并且这是操作中状态的来源之一。

每个操作都通过这些StreamOpFlag表示其特征:

  • DISTINCT
  • SORTED
  • ORDERED
  • SIZED
  • SHORT_CIRCUIT

它们跨stream源,中间操作和terminal操作相结合,构成pipe道(整体)的特性,然后用于优化[4]。 同样,pipe道是否并行执行也是整个pipe道的一个属性[5]。

因此,无论何时您对这些特征做出假设,您都必须仔细考虑构buildpipe道的所有操作,而不考虑它们的应用顺序以及它们做出的保证。 这样做时请记住,terminal操作如何通过pipe道拉动每个单独的元素。

我们来看看这个特例:

 BufferedReader fooBarReader = new BufferedReader(new StringReader("Foo\nBar")); fooBarReader.lines() .skip(1L) .parallel() .forEach(System.out::println); 

高水平

无论您的stream源是否有序(通过调用forEach (而不是forEachOrdered )),您声明该顺序对您无关紧要 [6],这有效地减less了从“跳过前n个元素”跳转到“跳过任何n个元素”[7](因为没有命令前者变得毫无意义)。

因此,如果这个承诺提速,那么你就有权忽略这个命令。 对于并行执行,显然是这样认为的,这就是为什么你得到观察到的输出。 因此,你观察到的是预期的行为 ,没有错误。

请注意,这与skip状态不冲突 ! 如上所述,有状态并不意味着它以某种方式caching整个stream(减去跳过的元素),并且在这些元素上执行下面的所有内容。 这只意味着操作有一些状态 – 即跳过元素的数量(好吧,这实际上并不那么容易,但是由于我对事情的理解有限,我认为这是一个公平的简化)。

低级

我们来更详细地看一下:

  1. BufferedReader.lines创buildStream ,让它_lines
    • 创build一个有序的Spliterator
    • 把它交给StreamSupport.stream ,它创build一个ReferencePipeline.Head ,并将spliterator标志转换成一个streamop标志
  2. .skip创build一个新的Stream ,我们称之为_skip
    • 调用ReferencePipeline.skip
    • 它用SliceOps.makeRef构造“切片”操作(skip&limit的泛化)
    • 这将创build一个ReferencePipeline.StatefulOp的匿名实例,它引用_lines作为其源
  3. .parallel ,并行设置整个stream水线的并行标志
  4. .forEach实际上开始执行

那么让我们看看pipe道是如何执行的:

  1. 调用_skip.forEach创build一个ForEachOp (我们称之为_forEach )并将其传递给_skip.evaluate ,它有两个作用:
    1. 调用sourceSpliterator在这个stream水线阶段的源代码周围创build一个分割器:
      • 调用opEvaluateParallelLazy本身(事实certificate)
      • 这决定了stream是无序的,并创build了一个 _sliceSpliterator (让我们称之为_sliceSpliteratorskip = 1 ,没有限制。
    2. 调用_forEach.evaluateParallel创build一个ForEachTask (因为它是无序的;我们称之为_forEachTask )并调用它
  2. _forEachTask.compute ,任务从最初的1024行中分离出来,为它创build一个新的任务(我们称之为_forEachTask2 ),意识到没有剩下的行和结束。
  3. 在fork连接池中, _forEachTask2.compute被调用,妄图再次分裂, 最后通过调用_skip.copyInto 开始将其元素复制到接收器 (一个stream_skip.copyIntoSystem.out.println封装器)。
  4. 这实质上是将任务委托给指定的分割器。 这是上面创build的_sliceSpliterator 因此_sliceSpliterator.forEachRemaining负责将未跳过的元素传递给println-sink:
    1. 它获得一个块(在这里是全部)到一个缓冲区并对它们进行计数
    2. 它试图通过acquirePermits请求许多许可证(我假设由于并行化)
    3. 有两个元素在源代码中,一个元素被跳过,只有一个它获得的许可证(通常我们说n
    4. 它让缓冲区把前n个元素(所以在这种情况下只有第一个元素)放入接收器

所以UnorderedSliceSpliterator.OfRef.forEachRemaining是顺序最终被真正忽略的地方。 我没有把它和有序的变体做比较,但这是我为什么这样做的假设:

  • 在并行化过程中,将分割器的元素铲入缓冲区可能会与其他执行相同操作的任务交错
  • 这将使得跟踪他们的订单非常困难
  • 这样做或者防止交织会降低性能,而且如果顺序无关紧要则毫无意义
  • 如果订单丢失,除了处理前n个允许的元素外,没有什么可做的了

任何问题? ;)对不起,这么久了。 也许我应该忽略细节,并写一个博客文章….

来源

[1] java.util.stream – stream操作和stream水线 :

stream操作分为中间操作和terminal操作,并组合起来形成streampipe线

[2] java.util.stream – stream操作和stream水线 :

直到stream水线的terminal操作被执行,才开始stream水线源的穿越。

[3]这个比喻代表了我对溪stream的理解。 除代码之外,主要来源是java.util.stream – Stream操作和pipe道的引用(突出显示了我的):

缓慢处理stream可以显着提高效率; 在诸如上述filter-map-sum示例的stream水线中,可以将过滤,映射和求和融合为数据上的单次通过,并具有最小的中间状态。 懒惰也可以避免在没有必要的时候检查所有的数据。 对于“查找长度超过1000个字符的第一个string”等操作,只需要检查足够的string就可以find具有所需特征的string,而不必检查源中可用的所有string。

[4] java.util.stream.StreamOpFlag

在pipe道的每个阶段,可以计算出组合的stream和操作标志[… jadda,jadda,jadda关于如何在源,中间和terminal操作之间组合标志 ]来产生从pipe道输出的标志。 这些标志可以用来应用优化。

在代码中,您可以在AbstractPipeline.combinedFlags看到这一点,在构build期间(以及其他一些事件)通过组合前一个操作和新操作的标志来设置AbstractPipeline.combinedFlags

[5] java.util.stream – 并行性 (我不能直接链接 – 向下滚动):

当terminal操作启动时,根据stream调用的stream的方向顺序或并行地执行streampipe线。

在代码中,您可以看到这是在AbstractPipeline.sequentialparallelisParallel中设置/检查stream源上的布尔标志,使得在构造stream时调用setter时无关紧要。

[6] java.util.stream.Stream.forEach :

对此stream的每个元素执行操作。 […]这个行为的行为是明确的不确定的。

将其与java.util.stream.Stream.forEachOrdered对比:

为stream的每个元素执行操作,如果stream具有已定义的遇到顺序,则按stream的遇到顺序执行操作。

[7]这也没有明确的logging,但我对Stream.skip上的这个评论(我大大缩短了)的解释:

skip()[…]在有序并行stream水线上可能会相当昂贵,因为skip(n)被限制跳过不仅仅是任何n个元素,而是碰到次序中的前n个元素。 […] emovingsorting约束可能会导致并行pipe道skip()的显着加速

问题在于,您正在使用并行stream和forEach并且您期望跳过操作依赖于正确的元素顺序,这是不是这里的情况。 摘自forEach文档:

对于并行streampipe线来说,这个操作并不保证尊重stream的遇到顺序,因为这样做会牺牲并行性的好处。

我想基本上会发生什么是跳过操作首先在第二行,而不是第一行。 如果你使stream顺序或使用forEachOrdered你可以看到,然后它会产生预期的结果。 另一种方法是使用收集器 。

让我引用一些相关的内容 – skip的Javadoc:

虽然skip()对顺序streampipe线来说通常是一个廉价的操作,但是在有序的并行stream水线上,尤其是对于较大的n值,可能会相当昂贵,因为skip(n)被限制跳过不仅仅是任何n个元素,而是前n遇到命令中的元素。

现在可以肯定的是, Files.lines() 具有定义良好的碰到次序,并且是一个ORDEREDstream(如果不是这样,即使在顺序操作中碰到次序匹配文件次序也不能保证),因此它保证结果stream将确定性地包含您的示例中的第二行。

无论是否有其他的东西,保证肯定是存在的。

我有一个想法如何解决这个问题,这是我在前面的讨论中看不到的。 您可以重新创buildstream分割pipe道到两个pipe道,同时保持整个事情懒惰。

 public static <T> Stream<T> recreate(Stream<T> stream) { return StreamSupport.stream(stream.spliterator(), stream.isParallel()) .onClose(stream::close); } public static void main(String[] args) { recreate(new BufferedReader(new StringReader("JUNK\n1\n2\n3\n4\n5")).lines() .skip(1).parallel()).forEach(System.out::println); } 

当您从初始stream分割器重新创buildstream时,则可以有效地创build新的stream水线。 在大多数情况下, recreate将作为no-op ,但是第一个和第二个pipe道不共享parallelunordered状态。 所以,即使你正在使用forEach (或任何其他无序的terminal操作),只有第二个stream变成无序。

内部非常类似的事情是串stream你的stream与一个空的stream:

 Stream.concat(Stream.empty(), new BufferedReader(new StringReader("JUNK\n1\n2\n3\n4\n5")) .lines().skip(1).parallel()).forEach(System.out::println); 

虽然它有更多的开销。