Skip to content

Commit 3db335f

Browse files
committed
[SPARK-4835] Disable OutputSpec validation for streaming jobs.
1 parent ad0056b commit 3db335f

File tree

3 files changed

+21
-4
lines changed

3 files changed

+21
-4
lines changed

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import scala.collection.{Map, mutable}
2525
import scala.collection.JavaConversions._
2626
import scala.collection.mutable.ArrayBuffer
2727
import scala.reflect.ClassTag
28+
import scala.util.DynamicVariable
2829

2930
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
3031
import org.apache.hadoop.conf.{Configurable, Configuration}
@@ -961,7 +962,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
961962
val outfmt = job.getOutputFormatClass
962963
val jobFormat = outfmt.newInstance
963964

964-
if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
965+
val validationDisabled = PairRDDFunctions.disableOutputSpecValidation.value
966+
if (!validationDisabled && self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
965967
// FileOutputFormat ignores the filesystem parameter
966968
jobFormat.checkOutputSpecs(job)
967969
}
@@ -1039,7 +1041,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
10391041
logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
10401042
valueClass.getSimpleName + ")")
10411043

1042-
if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
1044+
val validationDisabled = PairRDDFunctions.disableOutputSpecValidation.value
1045+
if (!validationDisabled && self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
10431046
// FileOutputFormat ignores the filesystem parameter
10441047
val ignoredFs = FileSystem.get(hadoopConf)
10451048
hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf)
@@ -1118,4 +1121,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
11181121

11191122
private[spark] object PairRDDFunctions {
11201123
val RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES = 256
1124+
/**
1125+
* Used by Spark Streaming in order to bypass the `spark.hadoop.validateOutputSpecs` checks
1126+
* for save actions launched by Spark Streaming, since the validation may break Spark Streaming's
1127+
* ability to recover from checkpoints. See SPARK-4835 for more details.
1128+
*/
1129+
val disableOutputSpecValidation: DynamicVariable[Boolean] = new DynamicVariable[Boolean](false)
11211130
}

docs/configuration.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -709,7 +709,9 @@ Apart from these, the following properties are also available, and may be useful
709709
<td>If set to true, validates the output specification (e.g. checking if the output directory already exists)
710710
used in saveAsHadoopFile and other variants. This can be disabled to silence exceptions due to pre-existing
711711
output directories. We recommend that users do not disable this except if trying to achieve compatibility with
712-
previous versions of Spark. Simply use Hadoop's FileSystem API to delete output directories by hand.</td>
712+
previous versions of Spark. Simply use Hadoop's FileSystem API to delete output directories by hand.
713+
This setting is ignored for jobs launched by the Spark Streaming scheduler, since data may need
714+
to be written to a pre-existing output directory during checkpoint recovery.</td>
713715
</tr>
714716
<tr>
715717
<td><code>spark.hadoop.cloneConf</code></td>

streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import scala.collection.JavaConversions._
2222
import java.util.concurrent.{TimeUnit, ConcurrentHashMap, Executors}
2323
import akka.actor.{ActorRef, Actor, Props}
2424
import org.apache.spark.{SparkException, Logging, SparkEnv}
25+
import org.apache.spark.rdd.PairRDDFunctions
2526
import org.apache.spark.streaming._
2627

2728

@@ -168,7 +169,12 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
168169
private class JobHandler(job: Job) extends Runnable {
169170
def run() {
170171
eventActor ! JobStarted(job)
171-
job.run()
172+
// Disable checks for existing output directories in jobs launched by the streaming scheduler,
173+
// since we may need to write output to an existing directory during checkpoint recovery;
174+
// see SPARK-4835 for more details.
175+
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
176+
job.run()
177+
}
172178
eventActor ! JobCompleted(job)
173179
}
174180
}

0 commit comments

Comments
 (0)