我如何将RDD分成两个或更多的RDD?
我正在寻找一种将RDD分成两个或更多RDD的方法。 我见过的最接近的是斯卡拉星火:分解成几个RDD? 这仍然是一个单一的RDD。
如果你对SAS很熟悉,像这样:
data work.split1, work.split2; set work.preSplit; if (condition1) output work.split1 else if (condition2) output work.split2 run;
这导致了两个不同的数据集。 这将不得不立即坚持得到我想要的结果…
从一次转换中产生多个RDD是不可能的。 如果你想拆分一个RDD,你必须为每个拆分条件应用一个filter
。 例如:
def even(x): return x % 2 == 0 def odd(x): return not even(x) rdd = sc.parallelize(range(20)) rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))
如果你只有一个二进制条件,计算昂贵,你可能更喜欢这样的事情:
kv_rdd = rdd.map(lambda x: (x, odd(x))) kv_rdd.cache() rdd_odd = kv_rdd.filter(lambda kv: kv[1]).keys() rdd_even = kv_rdd.filter(lambda kv: not kv[1]).keys()
这意味着只有一个谓词计算,但需要额外的传递所有数据。
重要的是要注意,只要input的RDD被正确地caching,并且没有关于数据分布的附加假设,那么在重复的filter和具有嵌套的if-else的循环之间的时间复杂度方面,没有显着差异。
在N个元素和M个条件下,您必须执行的操作数量显然与N倍M成正比。在for循环的情况下,它应该更接近(N + MN)/ 2,重复的filter恰好是NM,但是在那一天它不是别的,就是O(NM)。 你可以看看我和Jason Lenderman的讨论**来阅读一些优点和缺点。
在很高的层次上,你应该考虑两件事:
-
Spark转换是懒惰的,直到你执行一个动作,你的RDD没有实现
为什么这有关系? 回到我的例子:
rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))
如果以后我决定只需要
rdd_odd
那么没有理由实现rdd_even
。如果你看看你的SAS例子来计算
work.split2
你需要实现input数据和work.split1
。 -
RDD提供了一个声明性的API。 当您使用
filter
或map
,完全取决于Spark引擎如何执行此操作。 只要传递给转换的函数是副作用,就会创build多种可能性来优化整个stream水线。
在这一天结束的时候,这个案子并不足以certificate自己的转变。
这个带滤波器模式的映射实际上是用在一个核心Spark中的。 请参阅我的答案火花RDD.randomSplit实际上是如何分割RDD和randomSplit
方法的相关部分的。
如果唯一的目标是在input上实现分割,则可以使用DataFrameWriter
partitionBy
子句,其中的文本输出格式为:
def makePairs(row: T): (String, String) = ??? data .map(makePairs).toDF("key", "value") .write.partitionBy($"key").format("text").save(...)
* Spark中只有3种基本types的转换:
- RDD [T] => RDD [T]
- RDD [T] => RDD [U]
- (RDD [T],RDD [U])=> RDD [W]
其中T,U,W可以是primefacestypes或产品 /元组(K,V)。 任何其他操作都必须使用上述的组合来表示。 您可以查看原始RDD纸张了解更多详情。
** http://chat.stackoverflow.com/rooms/91928/discussion-between-zero323-and-jason-lenderman
***另请参见斯卡拉星火:分解收集到几个RDD?
正如上面提到的其他海报,没有单独的本地RDD转换将RDD分割,但是这里有一些“多重”操作,可以高效地模拟RDD上的各种“分割”, 而无需多次读取:
http://silex.freevariable.com/latest/api/#com.redhat.et.silex.rdd.multiplex.MuxRDDFunctions
一些特定于随机分割的方法:
http://silex.freevariable.com/latest/api/#com.redhat.et.silex.sample.split.SplitSampleRDDFunctions
方法可从开源的silex项目中获得:
https://github.com/willb/silex
博客文章解释他们如何工作:
http://erikerlandson.github.io/blog/2016/02/08/efficient-multiplexing-for-spark-rdds/
def muxPartitions[U :ClassTag](n: Int, f: (Int, Iterator[T]) => Seq[U], persist: StorageLevel): Seq[RDD[U]] = { val mux = self.mapPartitionsWithIndex { case (id, itr) => Iterator.single(f(id, itr)) }.persist(persist) Vector.tabulate(n) { j => mux.mapPartitions { itr => Iterator.single(itr.next()(j)) } } } def flatMuxPartitions[U :ClassTag](n: Int, f: (Int, Iterator[T]) => Seq[TraversableOnce[U]], persist: StorageLevel): Seq[RDD[U]] = { val mux = self.mapPartitionsWithIndex { case (id, itr) => Iterator.single(f(id, itr)) }.persist(persist) Vector.tabulate(n) { j => mux.mapPartitions { itr => itr.next()(j).toIterator } } }
正如其他地方所提到的,这些方法确实涉及对内存速度的权衡,因为它们是通过“急切地”而不是“懒惰地”计算整个分区结果来进行操作的。 因此,这些方法可能会遇到大型分区上的内存问题,而传统的惰性转换则不会。
如果使用randomSplit API调用来分割RDD,则会返回一个RDD数组。
如果您想要返回5个RDD,请input5个重量值。
例如
val sourceRDD = val sourceRDD = sc.parallelize(1 to 100, 4) val seedValue = 5 val splitRDD = sourceRDD.randomSplit(Array(1.0,1.0,1.0,1.0,1.0), seedValue) splitRDD(1).collect() res7: Array[Int] = Array(1, 6, 11, 12, 20, 29, 40, 62, 64, 75, 77, 83, 94, 96, 100)
一种方法是使用自定义分区程序根据筛选条件对数据进行分区。 这可以通过扩展Partitioner
并实现与RangePartitioner
类似的东西来RangePartitioner
。
然后可以使用地图分区从分区的RDD构build多个RDD,而无需读取所有数据。
val filtered = partitioned.mapPartitions { iter => { new Iterator[Int](){ override def hasNext: Boolean = { if(rangeOfPartitionsToKeep.contains(TaskContext.get().partitionId)) { false } else { iter.hasNext } } override def next():Int = iter.next() }
请注意,已过滤的RDD中的分区数量将与已分区的RDD中的分区数量相同,因此应该使用合并来减less此分区并删除空分区。