Skip to content

Commit a0bb88e

Browse files
JoshRosentdas
authored andcommitted
[SPARK-4835] Disable validateOutputSpecs for Spark Streaming jobs
This patch disables output spec. validation for jobs launched through Spark Streaming, since this interferes with checkpoint recovery. Hadoop OutputFormats have a `checkOutputSpecs` method which performs certain checks prior to writing output, such as checking whether the output directory already exists. SPARK-1100 added checks for FileOutputFormat, SPARK-1677 (#947) added a SparkConf configuration to disable these checks, and SPARK-2309 (#1088) extended these checks to run for all OutputFormats, not just FileOutputFormat. In Spark Streaming, we might have to re-process a batch during checkpoint recovery, so `save` actions may be called multiple times. In addition to `DStream`'s own save actions, users might use `transform` or `foreachRDD` and call the `RDD` and `PairRDD` save actions. When output spec. validation is enabled, the second calls to these actions will fail due to existing output. This patch automatically disables output spec. validation for jobs submitted by the Spark Streaming scheduler. This is done by using Scala's `DynamicVariable` to propagate the bypass setting without having to mutate SparkConf or introduce a global variable. Author: Josh Rosen <[email protected]> Closes #3832 from JoshRosen/SPARK-4835 and squashes the following commits: 36eaf35 [Josh Rosen] Add comment explaining use of transform() in test. 6485cf8 [Josh Rosen] Add test case in Streaming; fix bug for transform() 7b3e06a [Josh Rosen] Remove Streaming-specific setting to undo this change; update conf. guide bf9094d [Josh Rosen] Revise disableOutputSpecValidation() comment to not refer to Spark Streaming. e581d17 [Josh Rosen] Deduplicate isOutputSpecValidationEnabled logic. 762e473 [Josh Rosen] [SPARK-4835] Disable validateOutputSpecs for Spark Streaming jobs. (cherry picked from commit 939ba1f) Signed-off-by: Tathagata Das <[email protected]>
1 parent 67e2eb6 commit a0bb88e

File tree

6 files changed

+75
-7
lines changed

6 files changed

+75
-7
lines changed

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import scala.collection.{Map, mutable}
2525
import scala.collection.JavaConversions._
2626
import scala.collection.mutable.ArrayBuffer
2727
import scala.reflect.ClassTag
28+
import scala.util.DynamicVariable
2829

2930
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
3031
import org.apache.hadoop.conf.{Configurable, Configuration}
@@ -960,7 +961,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
960961
val outfmt = job.getOutputFormatClass
961962
val jobFormat = outfmt.newInstance
962963

963-
if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
964+
if (isOutputSpecValidationEnabled) {
964965
// FileOutputFormat ignores the filesystem parameter
965966
jobFormat.checkOutputSpecs(job)
966967
}
@@ -1038,7 +1039,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
10381039
logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
10391040
valueClass.getSimpleName + ")")
10401041

1041-
if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
1042+
if (isOutputSpecValidationEnabled) {
10421043
// FileOutputFormat ignores the filesystem parameter
10431044
val ignoredFs = FileSystem.get(hadoopConf)
10441045
hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf)
@@ -1113,8 +1114,22 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
11131114
private[spark] def valueClass: Class[_] = vt.runtimeClass
11141115

11151116
private[spark] def keyOrdering: Option[Ordering[K]] = Option(ord)
1117+
1118+
// Note: this needs to be a function instead of a 'val' so that the disableOutputSpecValidation
1119+
// setting can take effect:
1120+
private def isOutputSpecValidationEnabled: Boolean = {
1121+
val validationDisabled = PairRDDFunctions.disableOutputSpecValidation.value
1122+
val enabledInConf = self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)
1123+
enabledInConf && !validationDisabled
1124+
}
11161125
}
11171126

11181127
private[spark] object PairRDDFunctions {
11191128
val RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES = 256
1129+
1130+
/**
1131+
* Allows for the `spark.hadoop.validateOutputSpecs` checks to be disabled on a case-by-case
1132+
* basis; see SPARK-4835 for more details.
1133+
*/
1134+
val disableOutputSpecValidation: DynamicVariable[Boolean] = new DynamicVariable[Boolean](false)
11201135
}

docs/configuration.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -709,7 +709,9 @@ Apart from these, the following properties are also available, and may be useful
709709
<td>If set to true, validates the output specification (e.g. checking if the output directory already exists)
710710
used in saveAsHadoopFile and other variants. This can be disabled to silence exceptions due to pre-existing
711711
output directories. We recommend that users do not disable this except if trying to achieve compatibility with
712-
previous versions of Spark. Simply use Hadoop's FileSystem API to delete output directories by hand.</td>
712+
previous versions of Spark. Simply use Hadoop's FileSystem API to delete output directories by hand.
713+
This setting is ignored for jobs generated through Spark Streaming's StreamingContext, since
714+
data may need to be rewritten to pre-existing output directories during checkpoint recovery.</td>
713715
</tr>
714716
<tr>
715717
<td><code>spark.hadoop.cloneConf</code></td>

streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import scala.reflect.ClassTag
2626
import scala.util.matching.Regex
2727

2828
import org.apache.spark.{Logging, SparkException}
29-
import org.apache.spark.rdd.{BlockRDD, RDD}
29+
import org.apache.spark.rdd.{BlockRDD, PairRDDFunctions, RDD}
3030
import org.apache.spark.storage.StorageLevel
3131
import org.apache.spark.streaming._
3232
import org.apache.spark.streaming.StreamingContext._
@@ -293,7 +293,13 @@ abstract class DStream[T: ClassTag] (
293293
// set this DStream's creation site, generate RDDs and then restore the previous call site.
294294
val prevCallSite = ssc.sparkContext.getCallSite()
295295
ssc.sparkContext.setCallSite(creationSite)
296-
val rddOption = compute(time)
296+
// Disable checks for existing output directories in jobs launched by the streaming
297+
// scheduler, since we may need to write output to an existing directory during checkpoint
298+
// recovery; see SPARK-4835 for more details. We need to have this call here because
299+
// compute() might cause Spark jobs to be launched.
300+
val rddOption = PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
301+
compute(time)
302+
}
297303
ssc.sparkContext.setCallSite(prevCallSite)
298304

299305
rddOption.foreach { case newRDD =>

streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.streaming.dstream
1919

20-
import org.apache.spark.rdd.RDD
20+
import org.apache.spark.rdd.{PairRDDFunctions, RDD}
2121
import org.apache.spark.streaming.{Duration, Time}
2222
import scala.reflect.ClassTag
2323

streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import scala.collection.JavaConversions._
2222
import java.util.concurrent.{TimeUnit, ConcurrentHashMap, Executors}
2323
import akka.actor.{ActorRef, Actor, Props}
2424
import org.apache.spark.{SparkException, Logging, SparkEnv}
25+
import org.apache.spark.rdd.PairRDDFunctions
2526
import org.apache.spark.streaming._
2627

2728

@@ -168,7 +169,12 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
168169
private class JobHandler(job: Job) extends Runnable {
169170
def run() {
170171
eventActor ! JobStarted(job)
171-
job.run()
172+
// Disable checks for existing output directories in jobs launched by the streaming scheduler,
173+
// since we may need to write output to an existing directory during checkpoint recovery;
174+
// see SPARK-4835 for more details.
175+
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
176+
job.run()
177+
}
172178
eventActor ! JobCompleted(job)
173179
}
174180
}

streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,45 @@ class CheckpointSuite extends TestSuiteBase {
256256
}
257257
}
258258

259+
test("recovery with saveAsHadoopFile inside transform operation") {
260+
// Regression test for SPARK-4835.
261+
//
262+
// In that issue, the problem was that `saveAsHadoopFile(s)` would fail when the last batch
263+
// was restarted from a checkpoint since the output directory would already exist. However,
264+
// the other saveAsHadoopFile* tests couldn't catch this because they only tested whether the
265+
// output matched correctly and not whether the post-restart batch had successfully finished
266+
// without throwing any errors. The following test reproduces the same bug with a test that
267+
// actually fails because the error in saveAsHadoopFile causes transform() to fail, which
268+
// prevents the expected output from being written to the output stream.
269+
//
270+
// This is not actually a valid use of transform, but it's being used here so that we can test
271+
// the fix for SPARK-4835 independently of additional test cleanup.
272+
//
273+
// After SPARK-5079 is addressed, should be able to remove this test since a strengthened
274+
// version of the other saveAsHadoopFile* tests would prevent regressions for this issue.
275+
val tempDir = Files.createTempDir()
276+
try {
277+
testCheckpointedOperation(
278+
Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq()),
279+
(s: DStream[String]) => {
280+
s.transform { (rdd, time) =>
281+
val output = rdd.map(x => (x, 1)).reduceByKey(_ + _)
282+
output.saveAsHadoopFile(
283+
new File(tempDir, "result-" + time.milliseconds).getAbsolutePath,
284+
classOf[Text],
285+
classOf[IntWritable],
286+
classOf[TextOutputFormat[Text, IntWritable]])
287+
output
288+
}
289+
},
290+
Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
291+
3
292+
)
293+
} finally {
294+
Utils.deleteRecursively(tempDir)
295+
}
296+
}
297+
259298
// This tests whether the StateDStream's RDD checkpoints works correctly such
260299
// that the system can recover from a master failure. This assumes as reliable,
261300
// replayable input source - TestInputDStream.

0 commit comments

Comments
 (0)