并行stream,收集器和线程安全

请参阅下面的简单示例,其中列出了每个单词的出现次数:

Stream<String> words = Stream.of("a", "b", "a", "c"); Map<String, Integer> wordsCount = words.collect(toMap(s -> s, s -> 1, (i, j) -> i + j)); 

最后, wordsCount{a=2, b=1, c=1}

但是我的stream量非常大,我想平行工作,所以我写道:

 Map<String, Integer> wordsCount = words.parallel() .collect(toMap(s -> s, s -> 1, (i, j) -> i + j)); 

不过,我注意到wordsCount是一个简单的HashMap所以我不知道是否需要明确要求一个并发映射来确保线程安全:

 Map<String, Integer> wordsCount = words.parallel() .collect(toConcurrentMap(s -> s, s -> 1, (i, j) -> i + j)); 

非并发收集器可以安全地用于并行stream吗?还是仅在从并行stream收集时使用并发版本?

非并发收集器可以安全地用于并行stream吗?还是仅在从并行stream收集时使用并发版本?

在并行stream的collect操作中使用非并发收集器是安全的。

Collector界面的规范中,有六个要点的部分是这样的:

对于非并发收集器,从结果供应者,累加器或组合器函数返回的任何结果都必须串行限制线程。 这使收集能够并行进行,而收集器不需要实施任何额外的同步。 Reduce实现必须pipe理input是否正确分区,分区是否被隔离处理,并且只有在累积完成后才会发生组合。

这意味着Collectors类提供的各种实现可以用于并行stream,即使其中一些实现可能不是并发收集器。 这也适用于您可能实现的任何您自己的非并发收集器。 如果您的collections家不干扰stream源,不受副作用,独立订单等的影响,他们可以安全地使用平行stream。

我也推荐阅读java.util.stream包文档的Mutable Reduction部分。 在本节的中间是一个声明为可并行化的示例,但它将结果收集到不是线程安全的ArrayList

这种方式的工作原理是以并行收集器结尾的并行stream确保不同的线程始终在中间结果集合的不同实例上运行。 这就是为什么一个收集器有一个Supplier函数,用于创build与线程一样多的中间集合,所以每个线程都可以累积到自己的集合中。 当中间结果被合并时,它们在线程之间被安全地切换,并且在任何时候只有一个线程合并任何一对中间结果。

所有收集者如果遵循规范中的规则,则可以安全地并行或顺序运行。 平行准备是这里devise的关键部分。

并发和非并发收集器之间的区别与并行化的方法有关。

普通(非并发)收集器通过合并子结果来运行。 因此,源代码被分割成一堆块,每个块被收集到一个结果容器(如列表或地图)中,然后将子结果合并到一个更大的结果容器中。 这是安全的,保存顺序,但对于某些types的容器,特别是地图,可能是昂贵的,因为按键合并两个地图通常是昂贵的。

一个并发的收集器会创build一个结果容器,其插入操作保证是线程安全的,并且从多个线程中向其发送元素。 使用像ConcurrentHashMap这样高度并发的结果容器,这种方法可能比合并普通HashMap更好。

所以,并发collections者对他们的普通collections者是严格的优化。 而且他们不是没有成本的; 因为元素被从多个线程中抨击,并发的收集者一般不能保存遇到的命令。 (但是,你通常不关心 – 创build一个字数直方图时,你不关心你首先计算哪个“foo”实例。)

使用并行stream的非并发集合和非primefaces计数器是安全的。

如果你看一下Stream :: collect的文档,你会发现下面的段落:

reduce(Object, BinaryOperator) ,可以并行收集操作,而不需要额外的同步。

而对于Stream :: reduce方法:

虽然这可能看起来是一个更简单的方法来执行聚合,而不是简单地将循环中的运行总量进行变异,但是减less操作可以更加平滑地进行并行处理,而不需要额外的同步,并且大大降低了数据竞争的风险。

这可能有点令人惊讶。 但是请注意, 并行stream是基于fork-join模型的 。 这意味着并发执行如下:

  • 将序列拆分成大小相同的两部分
  • 分别处理每个部分
  • 收集两部分的结果并将其结合成一个结果

在第二步中,三个步骤recursion地应用于子序列。

一个例子应该说清楚。 该

 IntStream.range(0, 4) .parallel() .collect(Trace::new, Trace::accumulate, Trace::combine); 

Trace类的唯一目的是logging构造函数和方法调用。 如果你执行这个语句,它会打印下面几行:

 thread: 9 / operation: new thread: 10 / operation: new thread: 10 / operation: accumulate thread: 1 / operation: new thread: 1 / operation: accumulate thread: 1 / operation: combine thread: 11 / operation: new thread: 11 / operation: accumulate thread: 9 / operation: accumulate thread: 9 / operation: combine thread: 9 / operation: combine 

你可以看到,已经创build了四个Trace对象, 累积在每个对象上被调用一次,并且已经使用了三次将这四个对象合并为一个。 每个对象一次只能被一个线程访问。 这使得代码是线程安全的,这同样适用于Collectors :: toMap方法。