用Spark加载CSV文件

我是Spark的新手,我试图用Spark从文件中读取CSV数据。 这是我在做什么:

sc.textFile('file.csv') .map(lambda line: (line.split(',')[0], line.split(',')[1])) .collect() 

我希望这个调用给我一个我的文件的两个第一列的列表,但我得到这个错误:

 File "<ipython-input-60-73ea98550983>", line 1, in <lambda> IndexError: list index out of range 

虽然我的CSV文件不止一列。

你确定所有的行至less有2列? 你可以尝试一下,只是为了检查?

 sc.textFile("file.csv") \ .map(lambda line: line.split(",")) \ .filter(lambda line: len(line)>1) \ .map(lambda line: (line[0],line[1])) \ .collect() 

或者,你可以打印罪魁祸首(如果有的话):

 sc.textFile("file.csv") \ .map(lambda line: line.split(",")) \ .filter(lambda line: len(line)<=1) \ .collect() 

Spark 2.0.0+

您可以直接使用内置的csv数据源:

 spark.read.csv( "some_input_file.csv", header=True, mode="DROPMALFORMED", schema=schema ) 

要么

 (spark.read .schema(schema) .option("header", "true") .option("mode", "DROPMALFORMED") .csv("some_input_file.csv")) 

不包括任何外部依赖。

Spark <2.0.0

而不是手动parsing,这在一般情况下是不平凡的,我会build议spark-csv

确保Spark CSV包含在path中( --packages ,– --jars ,– --driver-class-path

并加载您的数据如下:

 (df = sqlContext .read.format("com.databricks.spark.csv") .option("header", "true") .option("inferschema", "true") .option("mode", "DROPMALFORMED") .load("some_input_file.csv")) 

它可以处理加载,模式推断,删除格式不正确的行,并且不需要将数据从Python传递给JVM。

注意

如果您知道该模式,最好避免模式推断并将其传递给DataFrameReader 。 假设你有三列 – 整数,双精度和string:

 from pyspark.sql.types import StructType, StructField from pyspark.sql.types import DoubleType, IntegerType, StringType schema = StructType([ StructField("A", IntegerType()), StructField("B", DoubleType()), StructField("C", StringType()) ]) (sqlContext .read .format("com.databricks.spark.csv") .schema(schema) .option("header", "true") .option("mode", "DROPMALFORMED") .load("some_input_file.csv")) 

简单地用逗号拆分也会拆分字段内的逗号(例如a,b,"1,2,3",c ),所以不build议这样做。 如果你想使用DataFrames API, zero323的答案是好的,但是如果你想坚持基于Spark,你可以用csv模块parsing基本Python中的csvs:

 # works for both python 2 and 3 import csv rdd = sc.textFile("file.csv") rdd = rdd.mapPartitions(lambda x: csv.reader(x)) 

编辑:@muon在评论中提到,这将像对待任何其他行的标题,所以你需要手动提取它。 例如, header = rdd.first(); rdd = rdd.filter(lambda x: x != header) header = rdd.first(); rdd = rdd.filter(lambda x: x != header) (确保在filter评估之前不要修改header )。 但是在这一点上,你最好使用内置的csvparsing器。

还有另外一个select是使用Pandas读取CSV文件,然后将Pandas DataFrame导入到Spark中。

例如:

 from pyspark import SparkContext from pyspark.sql import SQLContext import pandas as pd sc = SparkContext('local','example') # if using locally sql_sc = SQLContext(sc) pandas_df = pd.read_csv('file.csv') # assuming the file contains a header # pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) # if no header s_df = sql_sc.createDataFrame(pandas_df) 
 from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Python Spark SQL basic example") \ .config("spark.some.config.option", "some-value") \ .getOrCreate() df = spark.read.csv("/home/stp/test1.csv",header=True,separator="|"); print(df.collect()) 

这与JP Mercier最初提出的关于使用pandas的build议是一致的,但有一个重要的修改:如果你把数据读入大pandas,它应该更具有可塑性。 意思是说,你可以parsing一个比Pandas实际上可以处理的文件大得多的文件,并将其传递给Spark。 (这也回答了为什么一个人想要使用Spark的评论,如果他们可以将任何东西加载到Pandas中)。

 from pyspark import SparkContext from pyspark.sql import SQLContext import pandas as pd sc = SparkContext('local','example') # if using locally sql_sc = SQLContext(sc) Spark_Full = sc.emptyRDD() chunk_100k = pd.read_csv("Your_Data_File.csv", chunksize=100000) # if you have headers in your csv file: headers = list(pd.read_csv("Your_Data_File.csv", nrows=0).columns) for chunky in chunk_100k: Spark_Full += sc.parallelize(chunky.values.tolist()) YourSparkDataFrame = Spark_Full.toDF(headers) # if you do not have headers, leave empty instead: # YourSparkDataFrame = Spark_Full.toDF() YourSparkDataFrame.show() 

如果你的csv数据在任何字段中都不包含换行符,你可以使用textFile()加载你的数据并parsing它

 import csv import StringIO def loadRecord(line): input = StringIO.StringIO(line) reader = csv.DictReader(input, fieldnames=["name1", "name2"]) return reader.next() input = sc.textFile(inputFile).map(loadRecord) 

现在,对于任何一般的csv文件还有另一个选项: https : //github.com/seahboonsiew/pyspark-csv ,如下所示:

假设我们有以下的上下文

 sc = SparkContext sqlCtx = SQLContext or HiveContext 

首先,使用SparkContext将pyspark-csv.py分发给执行者

 import pyspark_csv as pycsv sc.addPyFile('pyspark_csv.py') 

通过SparkContext读取csv数据并将其转换为DataFrame

 plaintext_rdd = sc.textFile('hdfs://xxxx/blah.csv') dataframe = pycsv.csvToDataFrame(sqlCtx, plaintext_rdd) 

您通常不想尝试手动parsingCSV。 这是一个无依赖的解决scheme,可以正确处理像引用string一样的任何转义。

 import csv # Python standard CSV library def csv_to_rdd(csv_filename): return sc.textFile(csv_filename) \ .map(lambda line: tuple(list(csv.reader([line]))[0])) 

如果你想加载csv作为一个数据框,那么你可以做到以下几点:

 from pyspark.sql import SQLContext sqlContext = SQLContext(sc) df = sqlContext.read.format('com.databricks.spark.csv') \ .options(header='true', inferschema='true') \ .load('sampleFile.csv') # this is your csv file 

它为我工作得很好。

 import pandas as pd data1 = pd.read_csv("test1.csv") data2 = pd.read_csv("train1.csv")