如何从Spark的CSV文件中跳过标题?

假设我给三个文件path来读取一个Spark上下文,并且每个文件在第一行都有一个模式。 我们怎样才能从头文件中跳过模式行?

val rdd=sc.textFile("file1,file2,file3") 

现在,我们如何跳过这个rdd的标题行?

如果第一个logging中只有一个标题行,那么过滤出来的最有效的方法是:

 rdd.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter } 

如果当然有许多头文件里面有很多文件的话,这没有帮助。 事实上,你可以将三个RDD联合起来。

你也可以写一个filter ,只匹配一个可能是头的行。 这很简单,但效率较低。

 data = sc.textFile('path_to_data') header = data.first() #extract header data = data.filter(row => row != header) #filter out header 

在Spark 2.0中,CSV阅读器被构build到Spark中,因此您可以轻松加载CSV文件,如下所示:

 spark.read.option("header","true").csv("filePath") 

Spark 2.0开始,你可以使用SparkSession来完成这个工作:

 val spark = SparkSession.builder.config(conf).getOrCreate() 

然后像@SandeepPurohit所说:

 val dataFrame = spark.read.format("CSV").option("header","true").load(csvfilePath) 

我希望它解决了你的问题!

PS:SparkSession是Spark 2.0中引入的新入门点,可以在spark_sql包中find

您可以分别加载每个文件,使用file.zipWithIndex().filter(_._2 > 0)过滤它们,然后合并所有文件RDD。

如果文件数量太大,联合可能会抛出一个StackOverflowExeption

使用PySpark中的filter()方法,通过筛选出第一个列名来删除头:

 # Read file (change format for other file formats) contentRDD = sc.textfile(<filepath>) # Filter out first column of the header filterDD = contentRDD.filter(lambda l: not l.startswith(<first column name>) # Check your result for i in filterDD.take(5) : print (i) 

这应该工作得很好

 def dropHeader(data: RDD[String]): RDD[String] = { data.filter(r => r!=data.first) } 

另外,你也可以使用spark-csv包(或者在Spark 2.0中,这个或多或less可以用CSV方式)。 请注意,这需要在每个文件的头(如你所愿):

 schema = StructType([ StructField('lat',DoubleType(),True), StructField('lng',DoubleType(),True)]) df = sqlContext.read.format('com.databricks.spark.csv'). \ options(header='true', delimiter="\t", treatEmptyValuesAsNulls=True, mode="DROPMALFORMED").load(input_file,schema=schema) 
 //Find header from the files lying in the directory val fileNameHeader = sc.binaryFiles("E:\\sss\\*.txt",1).map{ case (fileName, stream)=> val header = new BufferedReader(new InputStreamReader(stream.open())).readLine() (fileName, header) }.collect().toMap val fileNameHeaderBr = sc.broadcast(fileNameHeader) // Now let's skip the header. mapPartition will ensure the header // can only be the first line of the partition sc.textFile("E:\\sss\\*.txt",1).mapPartitions(iter => if(iter.hasNext){ val firstLine = iter.next() println(s"Comparing with firstLine $firstLine") if(firstLine == fileNameHeaderBr.value.head._2) new WrappedIterator(null, iter) else new WrappedIterator(firstLine, iter) } else { iter } ).collect().foreach(println) class WrappedIterator(firstLine:String,iter:Iterator[String]) extends Iterator[String]{ var isFirstIteration = true override def hasNext: Boolean = { if (isFirstIteration && firstLine != null){ true } else{ iter.hasNext } } override def next(): String = { if (isFirstIteration){ println(s"For the first time $firstLine") isFirstIteration = false if (firstLine != null){ firstLine } else{ println(s"Every time $firstLine") iter.next() } } else { iter.next() } } } 

对于python开发者。 我已经用spark2.0进行了testing。 假设你想删除前14行。

 sc = spark.sparkContext lines = sc.textFile("s3://folder_location_of_csv/") parts = lines.map(lambda l: l.split(",")) parts.zipWithIndex().filter(lambda tup: tup[1] > 14).map(lambda x:x[0]) 

withColumn是df函数。 所以下面将不会像上面使用的RDD风格。

 parts.withColumn("index",monotonically_increasing_id()).filter(index > 14)