如何打印RDD的内容?
我试图将集合的内容打印到Spark控制台。
我有一个types:
linesWithSessionId: org.apache.spark.rdd.RDD[String] = FilteredRDD[3]
我使用这个命令:
scala> linesWithSessionId.map(line => println(line))
但是这是印刷的:
res1:org.apache.spark.rdd.RDD [Unit] = MappedRDD [4] at map at:19
如何将RDD写入控制台或将其保存到磁盘,以便查看其内容?
如果你想查看一个RDD的内容,一种方法是使用collect()
:
myRDD.collect().foreach(println)
但是,当RDD拥有数十亿行时,这并不是一个好主意。 使用take()
只需要几个打印输出:
myRDD.take(n).foreach(println)
map
函数是一个转换 ,这意味着在您对其执行操作之前,Spark不会实际评估您的RDD。
要打印它,你可以使用foreach
(这是一个动作):
linesWithSessionId.foreach(println)
要将其写入磁盘,您可以使用RDD API中的一个saveAs...
函数(仍然是动作)
如果你在集群上运行这个,那么println
将不会打印回你的上下文。 您需要将RDD
数据带到会话中。 要做到这一点,你可以强制它到本地数组,然后打印出来:
linesWithSessionId.toArray().foreach(line => println(line))
你可以将你的RDD
转换成DataFrame
然后show()
它。
// For implicit conversion from RDD to DataFrame import sQLContext.implicits._ fruits = sc.parallelize([("apple", 1), ("banana", 2), ("orange", 17)]) // convert to DF then show it fruits.toDF().show()
这将显示您的数据的前20行,所以您的数据的大小不应该是一个问题。
+------+---+ | _1| _2| +------+---+ | apple| 1| |banana| 2| |orange| 17| +------+---+
在python中
linesWithSessionIdCollect = linesWithSessionId.collect() linesWithSessionIdCollect
这将打印RDD的所有内容
你也可以保存为一个文件: rdd.saveAsTextFile("alicia.txt")
在myRDD.foreach(println)
和myRDD.collect().foreach(println)
(不仅是“collect”,还有其他动作)之间可能存在许多架构差异。 我看到的一个差异是在执行myRDD.foreach(println)
,输出将以随机顺序进行。 例如:如果我的rdd来自每行都有一个数字的文本文件,则输出将具有不同的顺序。 但是当我做myRDD.collect().foreach(println)
,顺序就像文本文件一样。
你可以,而不是每次打字。
[1]在Spark Shell中创build一个通用的打印方法。
def p(rdd: org.apache.spark.rdd.RDD[_]) = rdd.foreach(println)
[2]甚至更好,使用implicits,你可以添加函数到RDD类来打印它的内容。
implicit class Printer(rdd: org.apache.spark.rdd.RDD[_]) { def print = rdd.foreach(println) }
用法示例:
val rdd = sc.parallelize(List(1,2,3,4)).map(_*2) p(rdd) // 1 rdd.print // 2
输出:
2 6 4 8
PS。 这些只有在本地模式和less量数据集的情况下才有意义。 否则,您将无法在客户端上看到结果,或者由于大数据集结果而导致内存不足。