如何覆盖spark中的输出目录
我有一个火花stream应用程序,每分钟产生一个数据集。 我需要保存/覆盖处理的数据的结果。
当我试图覆盖数据集org.apache.hadoop.mapred.FileAlreadyExistsException停止执行。
我设置了Spark属性set("spark.files.overwrite","true")
,但没有运气。
如何覆盖或预先删除火花文件?
参数spark.files.overwrite
的文档说:“是否覆盖通过SparkContext.addFile()
添加的文件,当目标文件存在,并且其内容与源文件不匹配”。 所以它对saveAsTextFiles方法没有影响。
你可以在保存文件之前做到这一点:
val hadoopConf = new org.apache.hadoop.conf.Configuration() val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://localhost:9000"), hadoopConf) try { hdfs.delete(new org.apache.hadoop.fs.Path(filepath), true) } catch { case _ : Throwable => { } }
Aas在这里解释: http : //apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-td6696。 HTML
更新:build议使用数据Dataframes
,加上类似... .write.mode(SaveMode.Overwrite) ...
对于旧版本尝试
yourSparkConf.set("spark.hadoop.validateOutputSpecs", "false") val sc = SparkContext(yourSparkConf)
在1.1.0中,您可以使用带有–conf标志的spark-submit脚本来设置conf设置。
警告:根据@piggybox在Spark中有一个错误,它将只覆盖需要写入文件的文件,任何其他文件都不会被删除。
从pyspark.sql.DataFrame.save文档(当前位于1.3.1),您可以在保存DataFrame时指定mode='overwrite'
:
myDataFrame.save(path='myPath', source='parquet', mode='overwrite')
我已经validation过,这甚至会删除分区文件。 因此,如果您最初说了10个分区/文件,然后用只有6个分区的DataFrame覆盖了该文件夹,则生成的文件夹将具有6个分区/文件。
有关模式选项的更多信息,请参阅Spark SQL文档 。
由于df.save(path, source, mode)
已被弃用( http://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.sql.DataFrame )
使用df.write.format(source).mode("overwrite").save(path)
其中df.write是DataFrameWriter
'source'可以是(“com.databricks.spark.avro”|“parquet”|“json”)
df.write.mode('overwrite')。parquet(“/ output / folder / path”)如果你想用python覆盖parquet文件, 这是在火花1.6.2。 API可能在更高版本中有所不同
val jobName = "WordCount"; //overwrite the output directory in spark set("spark.hadoop.validateOutputSpecs", "false") val conf = new SparkConf().setAppName(jobName).set("spark.hadoop.validateOutputSpecs", "false"); val sc = new SparkContext(conf)
这个保存function的重载版本适用于我:
yourDF.save(outputPath,org.apache.spark.sql.SaveMode.valueOf(“覆盖”))
上面的例子会覆盖现有的文件夹。 savemode也可以使用这些参数( https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/SaveMode.html ):
追加 :追加模式意味着将DataFrame保存到数据源时,如果data / table已经存在,则DataFrame的内容被追加到现有的数据中。
ErrorIfExists :ErrorIfExists模式意味着将DataFrame保存到数据源时,如果数据已经存在,则预计会抛出exception。
忽略 :忽略模式意味着将DataFrame保存到数据源时,如果数据已经存在,则保存操作不会保存DataFrame的内容,也不会更改现有数据。
如果您愿意使用自己的自定义输出格式,那么您也可以使用RDD获得所需的行为。
看看下面的类: FileOutputFormat , FileOutputCommitter
在文件输出格式中,您有一个名为checkOutputSpecs的方法,它检查输出目录是否存在。 在FileOutputCommitter中,您有通常将数据从临时目录传输到最终位置的commitJob。
我还没有能够validation它(这样做,只要我有几分钟的空闲时间),但理论上:如果我扩展FileOutputFormat并覆盖checkOutputSpecs一个方法,不抛出exception的目录已经存在,并调整我的自定义输出提交程序的commitJob方法来执行我想要的任何逻辑(例如,覆盖一些文件,追加其他),而我也可以用RDD实现所需的行为。
输出格式被传递给:saveAsNewAPIHadoopFile(这也是saveAsTextFile调用的方法,以实际保存文件)。 输出提交者在应用程序级别configuration。