如何将多个文本文件读入单个RDD?

我想从hdfs位置读取一堆文本文件,并使用spark在迭代中对其执行映射。

JavaRDD<String> records = ctx.textFile(args[1], 1); 一次只能读取一个文件。

我想读取多个文件并将它们作为单个RDD进行处理。 怎么样?

您可以指定整个目录,使用通配符,甚至是目录和通配符的CSV。 例如:

 sc.textFile("/my/dir1,/my/paths/part-00[0-5]*,/another/dir,/a/specific/file") 

正如Nick Chammas指出的那样,这是Hadoop的FileInputFormat的暴露,因此这也适用于Hadoop(和Scalding)。

使用union如下:

 val sc = new SparkContext(...) val r1 = sc.textFile("xxx1") val r2 = sc.textFile("xxx2") ... val rdds = Seq(r1, r2, ...) val bigRdd = sc.union(rdds) 

然后bigRdd是所有文件的RDD。

您可以使用单个textFile调用来读取多个文件。 斯卡拉:

 sc.textFile(','.join(files)) 

你可以使用这个

首先你可以得到S3path的缓冲区/列表:

 import scala.collection.JavaConverters._ import java.util.ArrayList import com.amazonaws.services.s3.AmazonS3Client import com.amazonaws.services.s3.model.ObjectListing import com.amazonaws.services.s3.model.S3ObjectSummary import com.amazonaws.services.s3.model.ListObjectsRequest def listFiles(s3_bucket:String, base_prefix : String) = { var files = new ArrayList[String] //S3 Client and List Object Request var s3Client = new AmazonS3Client(); var objectListing: ObjectListing = null; var listObjectsRequest = new ListObjectsRequest(); //Your S3 Bucket listObjectsRequest.setBucketName(s3_bucket) //Your Folder path or Prefix listObjectsRequest.setPrefix(base_prefix) //Adding s3:// to the paths and adding to a list do { objectListing = s3Client.listObjects(listObjectsRequest); for (objectSummary <- objectListing.getObjectSummaries().asScala) { files.add("s3://" + s3_bucket + "/" + objectSummary.getKey()); } listObjectsRequest.setMarker(objectListing.getNextMarker()); } while (objectListing.isTruncated()); //Removing Base Directory Name files.remove(0) //Creating a Scala List for same files.asScala } 

现在将这个List对象传递给下面的一段代码,注意:sc是SQLContext的一个对象

 var df: DataFrame = null; for (file <- files) { val fileDf= sc.textFile(file) if (df!= null) { df= df.unionAll(fileDf) } else { df= fileDf } } 

现在你得到了一个最终的统一RDD即DF

可选,您也可以在一个BigRDD中重新分区

 val files = sc.textFile(filename, 1).repartition(1) 

重新分区总是有效的:D

在PySpark中,我发现了另一个有用的parsing文件的方法。 也许在Scala中有一个相同的地方,但是我不太愿意提出一个有效的翻译。 实际上,这是一个带有标签的textFile调用(在下面的例子中,key = filename,value = 1)。

“Labeled”textFile

input:

 import glob from pyspark import SparkContext SparkContext.stop(sc) sc = SparkContext("local","example") # if running locally sqlContext = SQLContext(sc) for filename in glob.glob(Data_File + "/*"): Spark_Full += sc.textFile(filename).keyBy(lambda x: filename) 

output:数组,每个条目包含使用filename-as-key和value =文件的每一行的元组。 (从技术上讲,使用这种方法,除了实际的文件path名,你也可以使用一个不同的键 – 也许是一个哈希表示来保存内存)。 即。

 [('/home/folder_with_text_files/file1.txt', 'file1_contents_line1'), ('/home/folder_with_text_files/file1.txt', 'file1_contents_line2'), ('/home/folder_with_text_files/file1.txt', 'file1_contents_line3'), ('/home/folder_with_text_files/file2.txt', 'file2_contents_line1'), ...] 

您也可以重新组合一行的列表:

Spark_Full.groupByKey().map(lambda x: (x[0], list(x[1]))).collect()

 [('/home/folder_with_text_files/file1.txt', ['file1_contents_line1', 'file1_contents_line2','file1_contents_line3']), ('/home/folder_with_text_files/file2.txt', ['file2_contents_line1'])] 

或者将整个文件重新组合成单​​个string(在这个例子中,结果与从整个文本文件中得到的结果是一样的,但是从文件path中去除了string“file:”)。

Spark_Full.groupByKey().map(lambda x: (x[0], ' '.join(list(x[1])))).collect()

有一个简单的清洁解决scheme可用。 使用wholeTextFiles()方法。 这将采取一个目录,并形成一个关键的价值对。 返回的RDD将是一对RDD。 从Spark文档中查找以下说明:

SparkContext.wholeTextFiles让你读取一个包含多个小文本文件的目录,并将它们作为(文件名,内容)对返回。 这与textFile相反,它将在每个文件中每行返回一个logging