Apache Spark:map vs mapPartitions?
RDD的 map
和mapPartitions
方法有什么mapPartitions
? mapPartitions
map
行为像map
或像mapPartitions
? 谢谢。
(编辑)即两者之间有什么区别(语义上或执行上)
def map[A, B](rdd: RDD[A], fn: (A => B)) (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = { rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) }, preservesPartitioning = true) }
和:
def map[A, B](rdd: RDD[A], fn: (A => B)) (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = { rdd.map(fn) }
RDD的地图和mapPartitions方法有什么区别?
方法映射通过应用函数将源RDD的每个元素转换为结果RDD的单个元素。 mapPartitions将源RDD的每个分区转换为结果的多个元素(可能没有)。
平面地图的行为像地图或像mapPartitions?
flatMap也不能处理单个元素(如map
)并产生多个结果元素(如mapPartitions
)。
进出口。 小费 :
每当你进行重量级的初始化时,应该为许多
RDD
元素执行一次,而不是每个RDD
元素执行一次,如果这个初始化(例如从第三方库创build对象)不能被序列化(这样Spark就可以通过集群到工作节点),请使用mapPartitions()
而不是map()
。mapPartitions()
为每个工作任务/线程/分区提供一次初始化,而不是每个RDD
数据元素执行一次,如下所示。
val newRd = myRdd.mapPartitions(partition => { val connection = new DbConnection /*creates a db connection per partition*/ val newPartition = partition.map(record => { readMatchingFromDB(record, connection) }).toList // consumes the iterator, thus calls readMatchingFromDB connection.close() // close dbconnection here newPartition.iterator // create a new iterator })
Q2。
flatMap
行为如同map还是像mapPartitions
?
是。 请参阅flatmap
示例2 ..其自我解释。
Q1。 RDD的
map
和mapPartitions
什么区别mapPartitions
map
用于在每个元素级使用的函数,而mapPartitions
在分区级执行函数。
示例场景 :如果我们在一个特定的RDD
分区中有100K个元素,那么当我们使用map
时,我们将会使用映射转换使用的函数100K次。
相反,如果我们使用mapPartitions
那么我们将只调用一次特定函数,但是我们将传入所有100Klogging,并在一个函数调用中返回所有响应。
因为map
对某个特定的函数有很多次的工作,所以会有性能上的提升,特别是如果函数在每次传入所有元素(如果是mappartitions
)的时候不需要做的话,那么这个函数会花费更多的代价。
地图
在RDD的每个项目上应用转换函数,并将结果作为新的RDD返回。
列出变体
def map [U:ClassTag](f:T => U):RDD [U]
例如:
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) val b = a.map(_.length) val c = a.zip(b) c.collect res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))
mapPartitions
这是一个专门的地图,每个分区只调用一次。 通过input参数(Iterarator [T]),各个分区的全部内容都可以作为值的顺序stream。 自定义函数必须返回另一个Iterator [U]。 组合的结果迭代器会自动转换为新的RDD。 请注意,由于我们select的分区,元组(3,4)和(6,7)由于下面的结果而丢失。
preservesPartitioning
分区表示input函数是否保留分区程序,除非这是一个RDD对,并且input函数不修改键,否则该分区程序应该是false
。列出变体
def mapPartitions [U:ClassTag](f:Iterator [T] => Iterator [U],preservesPartitioning:Boolean = false):RDD [U]
例1
val a = sc.parallelize(1 to 9, 3) def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = { var res = List[(T, T)]() var pre = iter.next while (iter.hasNext) { val cur = iter.next; res .::= (pre, cur) pre = cur; } res.iterator } a.mapPartitions(myfunc).collect res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
例2
val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3) def myfunc(iter: Iterator[Int]) : Iterator[Int] = { var res = List[Int]() while (iter.hasNext) { val cur = iter.next; res = res ::: List.fill(scala.util.Random.nextInt(10))(cur) } res.iterator } x.mapPartitions(myfunc).collect // some of the number are not outputted at all. This is because the random number generated for it is zero. res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10)
上面的程序也可以用flatMap写成如下。
例2使用平面图
val x = sc.parallelize(1 to 10, 3) x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10)
结论:
mapPartitions
转换比map
更快,因为它调用你的函数一次/分区,而不是一次/元素..
地图 :
- 它一次处理一行,与MapReduce的map()方法非常相似。
- 你从每一行之后的转换中返回。
MapPartitions
- 它一次处理完整的分区。
- 处理整个分区后,只能从函数返回一次。
- 所有的中间结果都需要在内存中保存,直到处理完整个分区。
- 提供像MapReduce的setup()map()和cleanup()函数
Map Vs mapPartitions
http://bytepadding.com/big-data/spark/spark-map-vs-mappartitions/
Spark Map
http://bytepadding.com/big-data/spark/spark-map/
Spark mapPartitions
http://bytepadding.com/big-data/spark/spark-mappartitions/