Spark – 重新分区()vs coalesce()

根据学习星火

请记住,重新分区您的数据是一个相当昂贵的操作。 Spark还有一个名为coalesce()的repartition()的优化版本,它允许避免数据移动,但只有在减lessRDD分区的数量的时候。

我得到的一个区别是,重新分区()分区的数量可以增加/减less,但与coalesce()分区的数量只能减less。

如果分区分散在多台机器上,并且运行coalesce(),它如何避免数据移动?

它避免了一个完整的洗牌。 如果知道数量正在减less,那么执行者可以安全地将数据保留在最小数量的分区上,只将数据从额外的节点移出到我们保存的节点上。

所以,它会是这样的:

Node 1 = 1,2,3 Node 2 = 4,5,6 Node 3 = 7,8,9 Node 4 = 10,11,12 

然后coalesce到2个分区:

 Node 1 = 1,2,3 + (10,11,12) Node 3 = 7,8,9 + (4,5,6) 

请注意,节点1和节点3不需要移动原始数据。

贾斯汀的回答非常好,而且这种回应更深入。

repartitionalgorithm会进行完全混洗,并创build新的分区,数据分布均匀。 我们来创build一个数字从1到12的数据框。

 val x = (1 to 12).toList val numbersDf = x.toDF("number") 

numbersDf在我的机器上包含4个分区。

 numbersDf.rdd.partitions.size // => 4 

这里是数据在分区上的划分方式:

 Partition 00000: 1, 2, 3 Partition 00001: 4, 5, 6 Partition 00002: 7, 8, 9 Partition 00003: 10, 11, 12 

让我们用repartition方法进行全面洗牌,并在两个节点上获取这些数据。

 val numbersDfR = numbersDf.repartition(2) 

以下是我的机器上如何分配numbersDfR数据:

 Partition A: 1, 3, 4, 6, 7, 9, 10, 12 Partition B: 2, 5, 8, 11 

repartition方法会创build新分区,并将数据均匀分配到新分区中(对于较大的数据集,数据分配更均匀)。

coalescerepartition之间的区别

coalesce使用现有的分区来减less混洗的数据量。 repartition会创build新的分区,并进行全面的洗牌。 coalesce导致具有不同数据量的分区(有时分区大小不同), repartition导致大小相同的分区。

coalesce还是repartition更快?

coalesce可能会比repartition运行得更快,但不同大小的分区通常比同等大小的分区要慢。 过滤大数据集后,通常需要重新分区数据集。 我发现repartition速度更快,因为Spark是build立在相同大小的分区上的。

阅读这篇博客文章,如果你想了解更多的细节。

另外需要指出的是,Spark RDD的基本原理是不可变的。 重新分配或合并将创build新的RDD。 基础RDD将继续存在与其原始数量的分区。 如果用例要求在caching中保存RDD,则对于新创build的RDD也必须做同样的事情。

 scala> pairMrkt.repartition(10) res16: org.apache.spark.rdd.RDD[(String, Array[String])] =MapPartitionsRDD[11] at repartition at <console>:26 scala> res16.partitions.length res17: Int = 10 scala> pairMrkt.partitions.length res20: Int = 2 

所有的答案都为这个经常被问到的问题添加了一些很棒的知识。

所以,按照这个问题的时间表传统,这里是我的2美分。

在非常具体的情况下,我发现重新分配比合并更快

在我的应用程序中,当我们估计的文件数量低于一定的阈值时,重新分区工作会更快。

这是我的意思

 if(numFiles > 20) df.coalesce(numFiles).write.mode(SaveMode.Overwrite).parquet(dest) else df.repartition(numFiles).write.mode(SaveMode.Overwrite).parquet(dest) 

在上面的代码片段中,如果我的文件less于20个,那么coalesce会花费很长时间来完成,而重新分配的速度要快得多,所以上面的代码也是如此。

当然,这个数字(20)将取决于工人的数量和数据量。

希望有所帮助。

以简单的方式COALESCE: – 仅用于减less分区的数量,不使用数据混洗来压缩分区

REPARTITION: – 用于增加和减less分区的数量,但是洗牌发生

例:-

 val rdd = sc.textFile("path",7) rdd.repartition(10) rdd.repartition(2) 

两者都很好

但是,当我们需要在一个集群中看到输出时,我们通常会去做这两件事情,我们一起去。