函数式编程(特别是Scala和Scala API)中reduce和foldLeft / fold之间的区别?

为什么Scala和Spark和foldLeft这样的框架同时具有reducefoldLeft ? 那么reducefold什么区别呢?

减lessvs foldLeft

一个很大的区别是,在这个问题上没有提到任何与这个主题相关的stackoverflow答案很清楚的是, reduce应该给一个交换monoid ,即交换和联想的操作。 这意味着操作可以并行化。

这个区别对于大数据/ MPP /分布式计算是非常重要的,甚至是整个reduce原因。 集合可以被reduce可以在每个块上运行,然后reduce可以对每个块的结果进行操作 – 事实上,chunking的级别不需要停止一个级别。 我们也可以切碎每块。 这就是为什么在给定无限数量的CPU的情况下,总结列表中的整数是O(log N)。

如果你只是看签名,没有理由reduce存在,因为你可以用foldLeftreduce你所能做的一切。 foldLeft的function大于foldLeft的function。

但是你不能并行化一个foldLeft ,所以它的运行时间总是O(N)(即使你input一个可交换的monoid)。 这是因为假定操作不是交换幺半群,所以累计值将通过一系列连续的聚合来计算。

foldLeft不承担交换性和结合性。 这是关联性,使得能够切断集合,并且它的交换性使得累积变得容易,因为顺序并不重要(所以聚集每个块的每个结果的顺序并不重要)。 严格地说,并发性并不是必需的,例如分布式sortingalgorithm,它只是使逻辑更容易,因为你不需要给你的块sorting。

如果你看一下Spark文档中的reduce它特别说“…交换和关联二元运算符”

http://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.rdd.RDD

这里certificatereduce不仅仅是foldLeft一个特例

 scala> val intParList: ParSeq[Int] = (1 to 100000).map(_ => scala.util.Random.nextInt()).par scala> timeMany(1000, intParList.reduce(_ + _)) Took 462.395867 milli seconds scala> timeMany(1000, intParList.foldLeft(0)(_ + _)) Took 2589.363031 milli seconds 

减less对折

现在这是它接近FP /math根源的地方,有点棘手的解释。 Reduce被正式定义为MapReduce范例的一部分,该范例处理无序集合(多重集合),Fold是用recursion的forms(参见catomorphism)来forms化定义的,因此假定集合的结构/序列。

Scalding中没有fold方法,因为在(严格)MapReduce编程模型下,我们不能定义fold因为块没有sorting, fold只需要关联性而不是交换性。

简而言之,在没有积累顺序的情况下reduce工作量, fold需要积累顺序,并且需要零值的累积顺序不是存在区分它们的零值。 严格地说, reduce 应该工作在一个空的集合上,因为它的零值可以通过取一个任意的值x来推导出来,然后求解x op y = x ,但是这不适用于非交换操作,因为可能存在一个左和正确的零值是不同的(即x op y != y op x )。 当然,斯卡拉并不打算知道这个零值是什么,因为这需要做一些math(这可能是不可信的),所以只是抛出一个exception。

似乎(在词源学中经常是这样),这个原始的math意义已经失去了,因为编程中唯一明显的差别就是签名。 其结果是reduce已经成为fold的同义词,而不是保留它从MapReduce的原始意义。 现在这些术语经常交替使用,在大多数实现中performance相同(忽略空集合)。 我们现在要解释的怪异,就像斯巴克特有的奇怪现象一样。

所以Spark有一个fold ,但是子结果(每个分区一个)的组合顺序(在编写时)与任务完成的顺序是相同的,因此是非确定性的。 感谢@CafeFeed指出fold使用runJob ,通过阅读代码,我意识到这是非确定性的。 Spark有一个treeReduce但没有treeFold ,所以会产生更多的混淆。

结论

即使应用于非空序列, reducefold也有区别。 前者被定义为MapReduce编程范式的任意顺序集合( ~sergei/papers/soda10-mrc.pdf )的一部分,并且应该假设运算符除了存在联想以给出确定性的结果。 后者是根据变态来定义的,并且要求集合具有序列的概念(或者像recursion列表一样定义),因此不需要交换操作符。

在实践中,由于编程的非math性质, reducefold往往会以相同的方式运行,无论是正确的(如Scala)还是不正确的(如Spark)。

额外:我的意见在Spark API

我的观点是,如果在Spark中使用“ fold ”这个术语,那么混淆是可以避免的。 至less火花在他们的文档中有一个注释:

这与在Scala等函数式语言中为非分布式集合实现的折叠操作有些不同。

如果我没有弄错,即使Spark API不需要它,fold也需要f是可交换的。 因为分区将被聚合的顺序不能保证。 例如在下面的代码中,只有第一个打印输出是sorting的:

 import org.apache.spark.{SparkConf, SparkContext} object FoldExample extends App{ val conf = new SparkConf() .setMaster("local[*]") .setAppName("Simple Application") implicit val sc = new SparkContext(conf) val range = ('a' to 'z').map(_.toString) val rdd = sc.parallelize(range) println(range.reduce(_ + _)) println(rdd.reduce(_ + _)) println(rdd.fold("")(_ + _)) } 

打印:

ABCDEFGHIJKLMNOPQRSTUVWXYZ

abcghituvjklmwxyzqrsdefnop

defghinopjklmqrstuvabcwxyz

Scalding的另一个区别是在Hadoop中使用组合器。

想象一下,你的操作是交换性的monoid, 减less它将被应用在地图上,而不是将所有数据整理/sorting到reducer。 有了foldLeft ,情况并非如此。

 pipe.groupBy('product) { _.reduce('price -> 'total){ (sum: Double, price: Double) => sum + price } // reduce is .mapReduceMap in disguise } pipe.groupBy('product) { _.foldLeft('price -> 'total)(0.0){ (sum: Double, price: Double) => sum + price } } 

在Scalding中将操作定义为monoid始终是个好习惯。

在Apache Spark中fold在非分布式集合上fold 。 事实上, 它需要交换function来产生确定性的结果:

这与在Scala等函数式语言中为非分布式集合实现的折叠操作有些不同。 这种折叠操作可以单独应用于分区,然后将这些结果折叠到最终结果中,而不是按照某种定义的顺序将折叠应用于每个元素。 对于不可交换的函数,结果可能与应用于非分布式集合的折叠结果不同。

Mishael Rosenthal 表示了这一点 , Make42在他的评论中提出了这个build议。

有人build议 ,观察到的行为与HashPartitioner相关,实际上parallelize不洗牌并且不使用HashPartitioner

 import org.apache.spark.sql.SparkSession /* Note: standalone (non-local) mode */ val master = "spark://...:7077" val spark = SparkSession.builder.master(master).getOrCreate() /* Note: deterministic order */ val rdd = sc.parallelize(Seq("a", "b", "c", "d"), 4).sortBy(identity[String]) require(rdd.collect.sliding(2).forall { case Array(x, y) => x < y }) /* Note: all posible permutations */ require(Seq.fill(1000)(rdd.fold("")(_ + _)).toSet.size == 24) 

解释:

RDD 的fold结构

 def fold(zeroValue: T)(op: (T, T) => T): T = withScope { var jobResult: T val cleanOp: (T, T) => T val foldPartition = Iterator[T] => T val mergeResult: (Int, T) => Unit sc.runJob(this, foldPartition, mergeResult) jobResult } 

与RDD的结构相同:

 def reduce(f: (T, T) => T): T = withScope { val cleanF: (T, T) => T val reducePartition: Iterator[T] => Option[T] var jobResult: Option[T] val mergeResult = (Int, Option[T]) => Unit sc.runJob(this, reducePartition, mergeResult) jobResult.getOrElse(throw new UnsupportedOperationException("empty collection")) } 

其中runJob是在不考虑分区顺序的情况下执行的,并且导致需要交换function。

foldPartitionreducePartition在处理顺序上是等价的,并且有效地(通过inheritance和委托)由reduceLeftfoldLeftTraversableOnce

结论:RDD的fold不能依赖于大块的顺序,需要交换性和结合性