如何将数据框转换回正常的RDD在pyspark?
我需要使用
(rdd.)partitionBy(npartitions, custom_partitioner)
方法在DataFrame上不可用。 所有的DataFrame方法只涉及DataFrame结果。 那么如何从DataFrame数据创build一个RDD呢?
注意:这是从1.2.0更改(在1.3.0)。
从@dpangmao的答案更新 :方法是.rdd。 我有兴趣了解是否(a)它是公开的和(b)什么是性能影响。
那么(a)是肯定的,(b) – 你可以看到这里有很大的性能影响:必须通过调用mapPartitions来创build一个新的RDD:
在dataframe.py (注意文件名也改变了(是sql.py):
@property def rdd(self): """ Return the content of the :class:`DataFrame` as an :class:`RDD` of :class:`Row` s. """ if not hasattr(self, '_lazy_rdd'): jrdd = self._jdf.javaToPython() rdd = RDD(jrdd, self.sql_ctx._sc, BatchedSerializer(PickleSerializer())) schema = self.schema def applySchema(it): cls = _create_cls(schema) return itertools.imap(cls, it) self._lazy_rdd = rdd.mapPartitions(applySchema) return self._lazy_rdd
使用这样的方法.rdd
:
rdd = df.rdd
@ dapangmao的答案是有效的,但是它并没有给出正则的spark RDD,它返回一个Row对象。 如果你想有常规的RDD格式。
尝试这个:
rdd = df.rdd.map(tuple)
要么
rdd = df.rdd.map(list)
- 我们如何使用SQL-esque“LIKE”标准来join两个Spark SQL数据框?
- 如何防止java.lang.OutOfMemoryError:在Scala编译PermGen空间?
- DataFrame(Spark 2.0中的DataSet )和Spark中的RDD之间的区别
- SPARK SQLreplace为mysql GROUP_CONCAT聚合函数
- DataFrame连接优化 – 广播散列连接
- 在Apache Spark Web UI中,“跳过舞台”是指什么?
- Spark:如何将Python与Scala或Java用户定义函数进行映射?
- 火花杀死运行应用程序
- PySpark DataFrames – 枚举的方式不转换为pandas?