函数式编程(特别是Scala和Scala API)中reduce和foldLeft / fold之间的区别?
为什么Scala和Spark和foldLeft
这样的框架同时具有reduce
和foldLeft
? 那么reduce
和fold
什么区别呢?
减lessvs foldLeft
一个很大的区别是,在这个问题上没有提到任何与这个主题相关的stackoverflow答案很清楚的是, reduce
应该给一个交换monoid ,即交换和联想的操作。 这意味着操作可以并行化。
这个区别对于大数据/ MPP /分布式计算是非常重要的,甚至是整个reduce
原因。 集合可以被reduce
可以在每个块上运行,然后reduce
可以对每个块的结果进行操作 – 事实上,chunking的级别不需要停止一个级别。 我们也可以切碎每块。 这就是为什么在给定无限数量的CPU的情况下,总结列表中的整数是O(log N)。
如果你只是看签名,没有理由reduce
存在,因为你可以用foldLeft
来reduce
你所能做的一切。 foldLeft
的function大于foldLeft
的function。
但是你不能并行化一个foldLeft
,所以它的运行时间总是O(N)(即使你input一个可交换的monoid)。 这是因为假定操作不是交换幺半群,所以累计值将通过一系列连续的聚合来计算。
foldLeft
不承担交换性和结合性。 这是关联性,使得能够切断集合,并且它的交换性使得累积变得容易,因为顺序并不重要(所以聚集每个块的每个结果的顺序并不重要)。 严格地说,并发性并不是必需的,例如分布式sortingalgorithm,它只是使逻辑更容易,因为你不需要给你的块sorting。
如果你看一下Spark文档中的reduce
它特别说“…交换和关联二元运算符”
http://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.rdd.RDD
这里certificatereduce
不仅仅是foldLeft
一个特例
scala> val intParList: ParSeq[Int] = (1 to 100000).map(_ => scala.util.Random.nextInt()).par scala> timeMany(1000, intParList.reduce(_ + _)) Took 462.395867 milli seconds scala> timeMany(1000, intParList.foldLeft(0)(_ + _)) Took 2589.363031 milli seconds
减less对折
现在这是它接近FP /math根源的地方,有点棘手的解释。 Reduce被正式定义为MapReduce范例的一部分,该范例处理无序集合(多重集合),Fold是用recursion的forms(参见catomorphism)来forms化定义的,因此假定集合的结构/序列。
Scalding中没有fold
方法,因为在(严格)MapReduce编程模型下,我们不能定义fold
因为块没有sorting, fold
只需要关联性而不是交换性。
简而言之,在没有积累顺序的情况下reduce
工作量, fold
需要积累顺序,并且需要零值的累积顺序不是存在区分它们的零值。 严格地说, reduce
应该工作在一个空的集合上,因为它的零值可以通过取一个任意的值x
来推导出来,然后求解x op y = x
,但是这不适用于非交换操作,因为可能存在一个左和正确的零值是不同的(即x op y != y op x
)。 当然,斯卡拉并不打算知道这个零值是什么,因为这需要做一些math(这可能是不可信的),所以只是抛出一个exception。
似乎(在词源学中经常是这样),这个原始的math意义已经失去了,因为编程中唯一明显的差别就是签名。 其结果是reduce
已经成为fold
的同义词,而不是保留它从MapReduce的原始意义。 现在这些术语经常交替使用,在大多数实现中performance相同(忽略空集合)。 我们现在要解释的怪异,就像斯巴克特有的奇怪现象一样。
所以Spark有一个fold
,但是子结果(每个分区一个)的组合顺序(在编写时)与任务完成的顺序是相同的,因此是非确定性的。 感谢@CafeFeed指出fold
使用runJob
,通过阅读代码,我意识到这是非确定性的。 Spark有一个treeReduce
但没有treeFold
,所以会产生更多的混淆。
结论
即使应用于非空序列, reduce
和fold
也有区别。 前者被定义为MapReduce编程范式的任意顺序集合( ~sergei/papers/soda10-mrc.pdf )的一部分,并且应该假设运算符除了存在联想以给出确定性的结果。 后者是根据变态来定义的,并且要求集合具有序列的概念(或者像recursion列表一样定义),因此不需要交换操作符。
在实践中,由于编程的非math性质, reduce
和fold
往往会以相同的方式运行,无论是正确的(如Scala)还是不正确的(如Spark)。
额外:我的意见在Spark API
我的观点是,如果在Spark中使用“ fold
”这个术语,那么混淆是可以避免的。 至less火花在他们的文档中有一个注释:
这与在Scala等函数式语言中为非分布式集合实现的折叠操作有些不同。
如果我没有弄错,即使Spark API不需要它,fold也需要f是可交换的。 因为分区将被聚合的顺序不能保证。 例如在下面的代码中,只有第一个打印输出是sorting的:
import org.apache.spark.{SparkConf, SparkContext} object FoldExample extends App{ val conf = new SparkConf() .setMaster("local[*]") .setAppName("Simple Application") implicit val sc = new SparkContext(conf) val range = ('a' to 'z').map(_.toString) val rdd = sc.parallelize(range) println(range.reduce(_ + _)) println(rdd.reduce(_ + _)) println(rdd.fold("")(_ + _)) }
打印:
ABCDEFGHIJKLMNOPQRSTUVWXYZ
abcghituvjklmwxyzqrsdefnop
defghinopjklmqrstuvabcwxyz
Scalding的另一个区别是在Hadoop中使用组合器。
想象一下,你的操作是交换性的monoid, 减less它将被应用在地图上,而不是将所有数据整理/sorting到reducer。 有了foldLeft ,情况并非如此。
pipe.groupBy('product) { _.reduce('price -> 'total){ (sum: Double, price: Double) => sum + price } // reduce is .mapReduceMap in disguise } pipe.groupBy('product) { _.foldLeft('price -> 'total)(0.0){ (sum: Double, price: Double) => sum + price } }
在Scalding中将操作定义为monoid始终是个好习惯。
在Apache Spark中fold
在非分布式集合上fold
。 事实上, 它需要交换function来产生确定性的结果:
这与在Scala等函数式语言中为非分布式集合实现的折叠操作有些不同。 这种折叠操作可以单独应用于分区,然后将这些结果折叠到最终结果中,而不是按照某种定义的顺序将折叠应用于每个元素。 对于不可交换的函数,结果可能与应用于非分布式集合的折叠结果不同。
Mishael Rosenthal 表示了这一点 , Make42在他的评论中提出了这个build议。
有人build议 ,观察到的行为与HashPartitioner
相关,实际上parallelize
不洗牌并且不使用HashPartitioner
。
import org.apache.spark.sql.SparkSession /* Note: standalone (non-local) mode */ val master = "spark://...:7077" val spark = SparkSession.builder.master(master).getOrCreate() /* Note: deterministic order */ val rdd = sc.parallelize(Seq("a", "b", "c", "d"), 4).sortBy(identity[String]) require(rdd.collect.sliding(2).forall { case Array(x, y) => x < y }) /* Note: all posible permutations */ require(Seq.fill(1000)(rdd.fold("")(_ + _)).toSet.size == 24)
解释:
RDD 的fold
结构
def fold(zeroValue: T)(op: (T, T) => T): T = withScope { var jobResult: T val cleanOp: (T, T) => T val foldPartition = Iterator[T] => T val mergeResult: (Int, T) => Unit sc.runJob(this, foldPartition, mergeResult) jobResult }
与RDD的结构相同:
def reduce(f: (T, T) => T): T = withScope { val cleanF: (T, T) => T val reducePartition: Iterator[T] => Option[T] var jobResult: Option[T] val mergeResult = (Int, Option[T]) => Unit sc.runJob(this, reducePartition, mergeResult) jobResult.getOrElse(throw new UnsupportedOperationException("empty collection")) }
其中runJob
是在不考虑分区顺序的情况下执行的,并且导致需要交换function。
foldPartition
和reducePartition
在处理顺序上是等价的,并且有效地(通过inheritance和委托)由reduceLeft
和foldLeft
对TraversableOnce
。
结论:RDD的fold
不能依赖于大块的顺序,需要交换性和结合性 。