Spark – 重新分区()vs coalesce()
根据学习星火
请记住,重新分区您的数据是一个相当昂贵的操作。 Spark还有一个名为coalesce()的repartition()的优化版本,它允许避免数据移动,但只有在减lessRDD分区的数量的时候。
我得到的一个区别是,重新分区()分区的数量可以增加/减less,但与coalesce()分区的数量只能减less。
如果分区分散在多台机器上,并且运行coalesce(),它如何避免数据移动?
它避免了一个完整的洗牌。 如果知道数量正在减less,那么执行者可以安全地将数据保留在最小数量的分区上,只将数据从额外的节点移出到我们保存的节点上。
所以,它会是这样的:
Node 1 = 1,2,3 Node 2 = 4,5,6 Node 3 = 7,8,9 Node 4 = 10,11,12
然后coalesce
到2个分区:
Node 1 = 1,2,3 + (10,11,12) Node 3 = 7,8,9 + (4,5,6)
请注意,节点1和节点3不需要移动原始数据。
贾斯汀的回答非常好,而且这种回应更深入。
repartition
algorithm会进行完全混洗,并创build新的分区,数据分布均匀。 我们来创build一个数字从1到12的数据框。
val x = (1 to 12).toList val numbersDf = x.toDF("number")
numbersDf
在我的机器上包含4个分区。
numbersDf.rdd.partitions.size // => 4
这里是数据在分区上的划分方式:
Partition 00000: 1, 2, 3 Partition 00001: 4, 5, 6 Partition 00002: 7, 8, 9 Partition 00003: 10, 11, 12
让我们用repartition
方法进行全面洗牌,并在两个节点上获取这些数据。
val numbersDfR = numbersDf.repartition(2)
以下是我的机器上如何分配numbersDfR
数据:
Partition A: 1, 3, 4, 6, 7, 9, 10, 12 Partition B: 2, 5, 8, 11
repartition
方法会创build新分区,并将数据均匀分配到新分区中(对于较大的数据集,数据分配更均匀)。
coalesce
和repartition
之间的区别
coalesce
使用现有的分区来减less混洗的数据量。 repartition
会创build新的分区,并进行全面的洗牌。 coalesce
导致具有不同数据量的分区(有时分区大小不同), repartition
导致大小相同的分区。
coalesce
还是repartition
更快?
coalesce
可能会比repartition
运行得更快,但不同大小的分区通常比同等大小的分区要慢。 过滤大数据集后,通常需要重新分区数据集。 我发现repartition
速度更快,因为Spark是build立在相同大小的分区上的。
阅读这篇博客文章,如果你想了解更多的细节。
另外需要指出的是,Spark RDD的基本原理是不可变的。 重新分配或合并将创build新的RDD。 基础RDD将继续存在与其原始数量的分区。 如果用例要求在caching中保存RDD,则对于新创build的RDD也必须做同样的事情。
scala> pairMrkt.repartition(10) res16: org.apache.spark.rdd.RDD[(String, Array[String])] =MapPartitionsRDD[11] at repartition at <console>:26 scala> res16.partitions.length res17: Int = 10 scala> pairMrkt.partitions.length res20: Int = 2
所有的答案都为这个经常被问到的问题添加了一些很棒的知识。
所以,按照这个问题的时间表传统,这里是我的2美分。
在非常具体的情况下,我发现重新分配比合并更快 。
在我的应用程序中,当我们估计的文件数量低于一定的阈值时,重新分区工作会更快。
这是我的意思
if(numFiles > 20) df.coalesce(numFiles).write.mode(SaveMode.Overwrite).parquet(dest) else df.repartition(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)
在上面的代码片段中,如果我的文件less于20个,那么coalesce会花费很长时间来完成,而重新分配的速度要快得多,所以上面的代码也是如此。
当然,这个数字(20)将取决于工人的数量和数据量。
希望有所帮助。
以简单的方式COALESCE: – 仅用于减less分区的数量,不使用数据混洗来压缩分区
REPARTITION: – 用于增加和减less分区的数量,但是洗牌发生
例:-
val rdd = sc.textFile("path",7) rdd.repartition(10) rdd.repartition(2)
两者都很好
但是,当我们需要在一个集群中看到输出时,我们通常会去做这两件事情,我们一起去。