如何将多个文本文件读入单个RDD?
我想从hdfs位置读取一堆文本文件,并使用spark在迭代中对其执行映射。
JavaRDD<String> records = ctx.textFile(args[1], 1);
一次只能读取一个文件。
我想读取多个文件并将它们作为单个RDD进行处理。 怎么样?
您可以指定整个目录,使用通配符,甚至是目录和通配符的CSV。 例如:
sc.textFile("/my/dir1,/my/paths/part-00[0-5]*,/another/dir,/a/specific/file")
正如Nick Chammas指出的那样,这是Hadoop的FileInputFormat
的暴露,因此这也适用于Hadoop(和Scalding)。
使用union
如下:
val sc = new SparkContext(...) val r1 = sc.textFile("xxx1") val r2 = sc.textFile("xxx2") ... val rdds = Seq(r1, r2, ...) val bigRdd = sc.union(rdds)
然后bigRdd
是所有文件的RDD。
您可以使用单个textFile调用来读取多个文件。 斯卡拉:
sc.textFile(','.join(files))
你可以使用这个
首先你可以得到S3path的缓冲区/列表:
import scala.collection.JavaConverters._ import java.util.ArrayList import com.amazonaws.services.s3.AmazonS3Client import com.amazonaws.services.s3.model.ObjectListing import com.amazonaws.services.s3.model.S3ObjectSummary import com.amazonaws.services.s3.model.ListObjectsRequest def listFiles(s3_bucket:String, base_prefix : String) = { var files = new ArrayList[String] //S3 Client and List Object Request var s3Client = new AmazonS3Client(); var objectListing: ObjectListing = null; var listObjectsRequest = new ListObjectsRequest(); //Your S3 Bucket listObjectsRequest.setBucketName(s3_bucket) //Your Folder path or Prefix listObjectsRequest.setPrefix(base_prefix) //Adding s3:// to the paths and adding to a list do { objectListing = s3Client.listObjects(listObjectsRequest); for (objectSummary <- objectListing.getObjectSummaries().asScala) { files.add("s3://" + s3_bucket + "/" + objectSummary.getKey()); } listObjectsRequest.setMarker(objectListing.getNextMarker()); } while (objectListing.isTruncated()); //Removing Base Directory Name files.remove(0) //Creating a Scala List for same files.asScala }
现在将这个List对象传递给下面的一段代码,注意:sc是SQLContext的一个对象
var df: DataFrame = null; for (file <- files) { val fileDf= sc.textFile(file) if (df!= null) { df= df.unionAll(fileDf) } else { df= fileDf } }
现在你得到了一个最终的统一RDD即DF
可选,您也可以在一个BigRDD中重新分区
val files = sc.textFile(filename, 1).repartition(1)
重新分区总是有效的:D
在PySpark中,我发现了另一个有用的parsing文件的方法。 也许在Scala中有一个相同的地方,但是我不太愿意提出一个有效的翻译。 实际上,这是一个带有标签的textFile调用(在下面的例子中,key = filename,value = 1)。
“Labeled”textFile
input:
import glob from pyspark import SparkContext SparkContext.stop(sc) sc = SparkContext("local","example") # if running locally sqlContext = SQLContext(sc) for filename in glob.glob(Data_File + "/*"): Spark_Full += sc.textFile(filename).keyBy(lambda x: filename)
output:数组,每个条目包含使用filename-as-key和value =文件的每一行的元组。 (从技术上讲,使用这种方法,除了实际的文件path名,你也可以使用一个不同的键 – 也许是一个哈希表示来保存内存)。 即。
[('/home/folder_with_text_files/file1.txt', 'file1_contents_line1'), ('/home/folder_with_text_files/file1.txt', 'file1_contents_line2'), ('/home/folder_with_text_files/file1.txt', 'file1_contents_line3'), ('/home/folder_with_text_files/file2.txt', 'file2_contents_line1'), ...]
您也可以重新组合一行的列表:
Spark_Full.groupByKey().map(lambda x: (x[0], list(x[1]))).collect()
[('/home/folder_with_text_files/file1.txt', ['file1_contents_line1', 'file1_contents_line2','file1_contents_line3']), ('/home/folder_with_text_files/file2.txt', ['file2_contents_line1'])]
或者将整个文件重新组合成单个string(在这个例子中,结果与从整个文本文件中得到的结果是一样的,但是从文件path中去除了string“file:”)。
Spark_Full.groupByKey().map(lambda x: (x[0], ' '.join(list(x[1])))).collect()
有一个简单的清洁解决scheme可用。 使用wholeTextFiles()方法。 这将采取一个目录,并形成一个关键的价值对。 返回的RDD将是一对RDD。 从Spark文档中查找以下说明:
SparkContext.wholeTextFiles让你读取一个包含多个小文本文件的目录,并将它们作为(文件名,内容)对返回。 这与textFile相反,它将在每个文件中每行返回一个logging