Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val outfmt = job.getOutputFormatClass
val jobFormat = outfmt.newInstance

if (jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) {
if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) &&
jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) {
// FileOutputFormat ignores the filesystem parameter
jobFormat.checkOutputSpecs(job)
}
Expand Down Expand Up @@ -755,7 +756,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
valueClass.getSimpleName + ")")

if (outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) {
if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey the fact that self.conf and conf have the same name is pretty bad and it could easily lead to issues down the road. I realize it's not part of your patch, but would you mind changing the input to be called hadoopConf so that there is no overloading?

def saveAsNewAPIHadoopDataset(hadoopConf: Configuration) {

hadoopConf: JobConf = new JobConf(self.context.hadoopConfiguration),

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually maybe that can be in a separate patch. I can merge this and we can add it later.

outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) {
// FileOutputFormat ignores the filesystem parameter
val ignoredFs = FileSystem.get(conf)
conf.getOutputFormat.checkOutputSpecs(ignoredFs, conf)
Expand Down
22 changes: 22 additions & 0 deletions core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,17 @@ class FileSuite extends FunSuite with LocalSparkContext {
}
}

test ("allow user to disable the output directory existence checking (old Hadoop API") {
val sf = new SparkConf()
sf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false")
sc = new SparkContext(sf)
val randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1)
randomRDD.saveAsTextFile(tempDir.getPath + "/output")
assert(new File(tempDir.getPath + "/output/part-00000").exists() === true)
randomRDD.saveAsTextFile(tempDir.getPath + "/output")
assert(new File(tempDir.getPath + "/output/part-00000").exists() === true)
}

test ("prevent user from overwriting the empty directory (new Hadoop API)") {
sc = new SparkContext("local", "test")
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
Expand All @@ -248,6 +259,17 @@ class FileSuite extends FunSuite with LocalSparkContext {
}
}

test ("allow user to disable the output directory existence checking (new Hadoop API") {
val sf = new SparkConf()
sf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false")
sc = new SparkContext(sf)
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath + "/output")
assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true)
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath + "/output")
assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true)
}

test ("save Hadoop Dataset through old Hadoop API") {
sc = new SparkContext("local", "test")
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
Expand Down
8 changes: 8 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,14 @@ Apart from these, the following properties are also available, and may be useful
this duration will be cleared as well.
</td>
</tr>
<tr>
<td>spark.hadoop.validateOutputSpecs</td>
<td>true</td>
<td>If set to true, validates the output specification (e.g. checking if the output directory already exists)
used in saveAsHadoopFile and other variants. This can be disabled to silence exceptions due to pre-existing
output directories. We recommend that users do not disable this except if trying to achieve compatibility with
previous versions of Spark. Simply use Hadoop's FileSystem API to delete output directories by hand.</td>
</tr>
</table>

#### Networking
Expand Down