如何打印RDD的内容?

我试图将集合的内容打印到Spark控制台。

我有一个types:

linesWithSessionId: org.apache.spark.rdd.RDD[String] = FilteredRDD[3] 

我使用这个命令:

 scala> linesWithSessionId.map(line => println(line)) 

但是这是印刷的:

res1:org.apache.spark.rdd.RDD [Unit] = MappedRDD [4] at map at:19

如何将RDD写入控制台或将其保存到磁盘,以便查看其内容?

如果你想查看一个RDD的内容,一种方法是使用collect()

 myRDD.collect().foreach(println) 

但是,当RDD拥有数十亿行时,这并不是一个好主意。 使用take()只需要几个打印输出:

 myRDD.take(n).foreach(println) 

map函数是一个转换 ,这意味着在您对其执行操作之前,Spark不会实际评估您的RDD。

要打印它,你可以使用foreach (这是一个动作):

 linesWithSessionId.foreach(println) 

要将其写入磁盘,您可以使用RDD API中的一个saveAs...函数(仍然是动作)

如果你在集群上运行这个,那么println将不会打印回你的上下文。 您需要将RDD数据带到会话中。 要做到这一点,你可以强制它到本地数组,然后打印出来:

 linesWithSessionId.toArray().foreach(line => println(line)) 

你可以将你的RDD转换成DataFrame然后show()它。

 // For implicit conversion from RDD to DataFrame import sQLContext.implicits._ fruits = sc.parallelize([("apple", 1), ("banana", 2), ("orange", 17)]) // convert to DF then show it fruits.toDF().show() 

这将显示您的数据的前20行,所以您的数据的大小不应该是一个问题。

 +------+---+ | _1| _2| +------+---+ | apple| 1| |banana| 2| |orange| 17| +------+---+ 

在python中

  linesWithSessionIdCollect = linesWithSessionId.collect() linesWithSessionIdCollect 

这将打印RDD的所有内容

你也可以保存为一个文件: rdd.saveAsTextFile("alicia.txt")

myRDD.foreach(println)myRDD.collect().foreach(println) (不仅是“collect”,还有其他动作)之间可能存在许多架构差异。 我看到的一个差异是在执行myRDD.foreach(println) ,输出将以随机顺序进行。 例如:如果我的rdd来自每行都有一个数字的文本文件,则输出将具有不同的顺序。 但是当我做myRDD.collect().foreach(println) ,顺序就像文本文件一样。

你可以,而不是每次打字。

[1]在Spark Shell中创build一个通用的打印方法。

 def p(rdd: org.apache.spark.rdd.RDD[_]) = rdd.foreach(println) 

[2]甚至更好,使用implicits,你可以添加函数到RDD类来打印它的内容。

 implicit class Printer(rdd: org.apache.spark.rdd.RDD[_]) { def print = rdd.foreach(println) } 

用法示例:

 val rdd = sc.parallelize(List(1,2,3,4)).map(_*2) p(rdd) // 1 rdd.print // 2 

输出:

 2 6 4 8 

PS。 这些只有在本地模式和less量数据集的情况下才有意义。 否则,您将无法在客户端上看到结果,或者由于大数据集结果而导致内存不足。