HashPartitioner如何工作?
我阅读了HashPartitioner
的文档。 不幸的是,除了API调用之外,没有什么可解释的。 我假设HashPartitioner
根据密钥的哈希来分配分布式集合。 例如,如果我的数据是
(1,1), (1,2), (1,3), (2,1), (2,2), (2,3)
所以分区器会把这个分区放到不同的分区中,同一个分区中的键也是一样的。 但是我不明白构造函数参数的意义
new HashPartitoner(numPartitions) //What does numPartitions do?
对于上面的数据集,如果我做了,结果会有什么不同
new HashPartitoner(1) new HashPartitoner(2) new HashPartitoner(10)
那么HashPartitioner
究竟如何工作呢?
那么,让我们让你的数据集更有趣:
val rdd = sc.parallelize(for { x <- 1 to 3 y <- 1 to 2 } yield (x, None), 8)
我们有六个要素:
rdd.count
Long = 6
没有分区:
rdd.partitioner
Option[org.apache.spark.Partitioner] = None
和八个分区:
rdd.partitions.length
Int = 8
现在让我们定义小助手来计算每个分区的元素数量:
def countByPartition(rdd: RDD[(Int, None.type)]) = { rdd.mapPartitions(iter => Iterator(iter.length)) }
由于我们没有partitioner,所以我们的数据集在分区之间是均匀分布的( Spark中的Default Partitioning Scheme ):
countByPartition(rdd).collect()
Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1)
现在让我们重新分区我们的数据集:
import org.apache.spark.HashPartitioner val rddOneP = rdd.partitionBy(new HashPartitioner(1))
由于传递给HashPartitioner
参数定义了一个分区的分区数量:
rddOneP.partitions.length
Int = 1
由于我们只有一个分区,它包含所有元素:
countByPartition(rddOneP).collect
Array[Int] = Array(6)
请注意,shuffle之后的值的顺序是非确定性的。
同样的方法,如果我们使用HashPartitioner(2)
val rddTwoP = rdd.partitionBy(new HashPartitioner(2))
我们会得到2个分区:
rddTwoP.partitions.length
Int = 2
由于rdd
被关键数据分割,所以不会再统一分配:
countByPartition(rddTwoP).collect()
Array[Int] = Array(2, 4)
因为有三个键和只有两个不同的hashCode
mod numPartitions
值,这里没有什么意外的:
(1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2))
scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1))
只是为了确认以上:
rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect()
Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3))
最后用HashPartitioner(7)
得到七个分区,三个非空以及两个元素:
val rddSevenP = rdd.partitionBy(new HashPartitioner(7)) rddSevenP.partitions.length
Int = 7
countByPartition(rddTenP).collect()
Array[Int] = Array(0, 2, 2, 2, 0, 0, 0)
摘要和注释
-
HashPartitioner
采用一个定义分区数量的参数 -
值使用
hash
分配给分区。hash
函数可能因语言而异(Scala RDD可能使用hashCode
,DataSets
使用MurmurHash 3,PySpark,portable_hash
)。在这种简单的情况下,key是一个小整数,你可以假设
hash
是一个标识(i = hash(i)
)。Scala API使用
nonNegativeMod
根据计算出的散列来确定分区, -
如果密钥分配不统一,那么当组件的一部分空闲时,可能会导致密钥分配不均
-
密钥必须是可散列的。 你可以检查我的答案列表作为PySpark的reduceByKey的关键阅读PySpark的具体问题。 HashPartitioner文档强调了另一个可能的问题:
Java数组具有基于数组身份而不是其内容的哈希码,所以尝试使用HashPartitioner对RDD [Array [ ]]或RDD [(Array [ ],_)]进行分区将产生意外或不正确的结果。
-
在Python 3中,你必须确保哈希是一致的。 请看exception:string散列的随机性应该通过pyspark中的PYTHONHASHSEED平均值来禁用?
-
散列分区器既不是内在的也不是完全的。 可以将多个键分配给单个分区,并且一些分区可以保持为空。
-
请注意,当与基于REPL定义的案例类( Apache Spark中的案例类相等 )结合使用时,基于散列的方法在Scala中不起作用。
-
HashPartitioner
(或任何其他Partitioner
)混洗数据。 除非在多个操作之间重复使用分区,否则不会减less要混洗的数据量。
RDD是分布式的,这意味着它被分割成若干部分。 每个分区都可能位于不同的机器上。 带分隔numPartitions
分区器numPartitions
在放置哪个分区对(key, value)
- 创build完全
numPartitions
分区。 - 分区中的位置
(key, value)
,分区号为Hash(key) % numPartitions
HashPartitioner.getPartition
方法将一个键作为参数,并返回该键所属的分区的索引 。 分区器必须知道有效索引是什么,所以它返回正确范围内的数字。 分区数量通过numPartitions
构造函数参数指定。
该实现大致返回key.hashCode() % numPartitions
。 有关更多详细信息,请参阅Partitioner.scala 。