使用spark-csv编写单个CSV文件
我正在使用https://github.com/databricks/spark-csv ,我想写一个单一的CSV,但不能够,它正在做一个文件夹。
需要一个带path和文件名参数的scala函数,并写入该CSV文件。
它正在创build一个包含多个文件的文件夹,因为每个分区都是单独保存的。 如果您需要单个输出文件(仍在文件夹中),则可以在保存之前repartition
dataframe:
df // place all data in a single partition .coalesce(1) .write.format("com.databricks.spark.csv") .option("header", "true") .save("mydata.csv")
所有数据将被写入mydata.csv/part-00000
。 在使用此选项之前,请确保您了解正在进行的操作以及将所有数据传输到单个工作人员的成本是多less 。 如果将分布式文件系统与复制一起使用,则数据将被多次传输 – 首先将其提交给单个工作人员,然后分布在存储节点上。
或者,您可以保留getmerge
的代码,然后使用通用工具(如cat
或HDFS getmerge
简单地合并所有部分。
我可能会在这里稍微晚一些,但是使用coalesce(1)
或者repartition(1)
可能适用于小型数据集,但是大型数据集将全部投入到一个节点上的一个分区中。 这很可能会导致OOM错误,或者最多缓慢地进行。
我强烈build议您使用Hadoop API中的FileUtil.copyMerge()
函数。 这将把输出合并成一个文件。
编辑 – 这有效地将数据传递给驱动程序而不是执行程序节点。 如果一个执行程序比驱动程序有更多的内存供使用,那么Coalesce()
就可以了。
编辑2:copyMerge()在Hadoop 3.0中被删除。 有关如何使用最新版本的更多信息,请参阅以下堆栈溢出文章: Hadoop如何在Hadoop 3.0中执行CopyMerge
如果你正在用HDFS运行Spark,我通常通过编写csv文件来解决问题,并利用hdfs来进行合并。 我正在Spark(1.6)中直接执行此操作:
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ def merge(srcPath: String, dstPath: String): Unit = { val hadoopConfig = new Configuration() val hdfs = FileSystem.get(hadoopConfig) FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null) // the "true" setting deletes the source files once they are merged into the new output } val newData = << create your dataframe >> val outputfile = "/user/feeds/project/outputs/subject" var filename = "myinsights" var outputFileName = outputfile + "/temp_" + filename var mergedFileName = outputfile + "/merged_" + filename var mergeFindGlob = outputFileName newData.write .format("com.databricks.spark.csv") .option("header", "false") .mode("overwrite") .save(outputFileName) merge(mergeFindGlob, mergedFileName ) newData.unpersist()
不记得我在哪里学到这个技巧,但它可能适合你。 andrew,bytesumo.com
如果您正在使用Databricks,并且可以将所有数据放入到一名工作人员的RAM中(因此可以使用.coalesce(1)),则可以使用dbfs查找并移动生成的CSV文件:
val fileprefix= "/mnt/aws/path/file-prefix" dataset .coalesce(1) .write //.mode("overwrite") // I usually don't use this, but you may want to. .option("header", "true") .option("delimiter","\t") .csv(fileprefix+".tmp") val partition_path = dbutils.fs.ls(fileprefix+".tmp/") .filter(file=>file.name.endsWith(".csv"))(0).path dbutils.fs.cp(partition_path,fileprefix+".tab") dbutils.fs.rm(fileprefix+".tmp",recurse=true)
如果你的文件不适合工人的RAM,你可能要考虑混沌平衡的build议使用FileUtils.copyMerge() 。 我没有这样做,也不知道是否可以,例如在S3上。
这个答案build立在以前对这个问题的回答以及我自己对所提供的代码片段的testing上。 我最初把它发布到Databricks,并在这里重新发布。
dbfs的rmrecursion选项的最佳文档是在Databricks论坛上find的。
在保存之前重新分区/合并到1个分区(您仍然会得到一个文件夹,但是它将包含一个部分文件)
你可以使用rdd.coalesce(1, true).saveAsTextFile(path)
它会将数据作为单独文件存储在path/ part-00000中
还有一种使用Java的方法
import java.io._ def printToFile(f: java.io.File)(op: java.io.PrintWriter => Unit) { val p = new java.io.PrintWriter(f); try { op(p) } finally { p.close() } } printToFile(new File("C:/TEMP/df.csv")) { p => df.collect().foreach(p.println)}