From 94d1b3bef2945eaf53c6801b2e476ae7628c64a4 Mon Sep 17 00:00:00 2001 From: Bijay Singh Bisht Date: Tue, 3 Jun 2014 09:35:01 -0700 Subject: [PATCH] Added java system variable spark.hadoop.checkoutputspec. Set it to false via SPARK_JAVA_OPTS="-Dspark.hadoop.checkoutputspec=false" to overwrite the output directory. This is required by the restart to re reprocess the input after a job has failed without commiting the ouput directory. --- .../org/apache/spark/rdd/PairRDDFunctions.scala | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 223fef79261d..82454110b1f0 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -688,10 +688,13 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) val wrappedConf = new SerializableWritable(job.getConfiguration) val outfmt = job.getOutputFormatClass val jobFormat = outfmt.newInstance + val env = SparkEnv.get if (jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) { // FileOutputFormat ignores the filesystem parameter - jobFormat.checkOutputSpecs(job) + if(env.conf.getBoolean("spark.hadoop.checkoutputspec", true)) { + jobFormat.checkOutputSpecs(job) + } } def writeShard(context: TaskContext, iter: Iterator[(K,V)]): Int = { @@ -741,6 +744,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) val outputFormatInstance = conf.getOutputFormat val keyClass = conf.getOutputKeyClass val valueClass = conf.getOutputValueClass + val env = SparkEnv.get if (outputFormatInstance == null) { throw new SparkException("Output format class not set") } @@ -757,8 +761,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) if (outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) { // FileOutputFormat ignores the filesystem parameter - val ignoredFs = FileSystem.get(conf) - conf.getOutputFormat.checkOutputSpecs(ignoredFs, conf) + if(env.conf.getBoolean("spark.hadoop.checkoutputspec", true)) { + val ignoredFs = FileSystem.get(conf) + conf.getOutputFormat.checkOutputSpecs(ignoredFs, conf) + } } val writer = new SparkHadoopWriter(conf)