Skip to content

Commit 0b5abbf

Browse files
committed
[SPARK-8606] Prevent exceptions in RDD.getPreferredLocations() from crashing DAGScheduler
If `RDD.getPreferredLocations()` throws an exception it may crash the DAGScheduler and SparkContext. This patch addresses this by adding a try-catch block. Author: Josh Rosen <[email protected]> Closes #7023 from JoshRosen/SPARK-8606 and squashes the following commits: 770b169 [Josh Rosen] Fix getPreferredLocations() DAGScheduler crash with try block. 44a9b55 [Josh Rosen] Add test of a buggy getPartitions() method 19aa9f7 [Josh Rosen] Add (failing) regression test for getPreferredLocations() DAGScheduler crash
1 parent 4153776 commit 0b5abbf

File tree

2 files changed

+53
-15
lines changed

2 files changed

+53
-15
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -907,22 +907,29 @@ class DAGScheduler(
907907
return
908908
}
909909

910-
val tasks: Seq[Task[_]] = stage match {
911-
case stage: ShuffleMapStage =>
912-
partitionsToCompute.map { id =>
913-
val locs = getPreferredLocs(stage.rdd, id)
914-
val part = stage.rdd.partitions(id)
915-
new ShuffleMapTask(stage.id, taskBinary, part, locs)
916-
}
910+
val tasks: Seq[Task[_]] = try {
911+
stage match {
912+
case stage: ShuffleMapStage =>
913+
partitionsToCompute.map { id =>
914+
val locs = getPreferredLocs(stage.rdd, id)
915+
val part = stage.rdd.partitions(id)
916+
new ShuffleMapTask(stage.id, taskBinary, part, locs)
917+
}
917918

918-
case stage: ResultStage =>
919-
val job = stage.resultOfJob.get
920-
partitionsToCompute.map { id =>
921-
val p: Int = job.partitions(id)
922-
val part = stage.rdd.partitions(p)
923-
val locs = getPreferredLocs(stage.rdd, p)
924-
new ResultTask(stage.id, taskBinary, part, locs, id)
925-
}
919+
case stage: ResultStage =>
920+
val job = stage.resultOfJob.get
921+
partitionsToCompute.map { id =>
922+
val p: Int = job.partitions(id)
923+
val part = stage.rdd.partitions(p)
924+
val locs = getPreferredLocs(stage.rdd, p)
925+
new ResultTask(stage.id, taskBinary, part, locs, id)
926+
}
927+
}
928+
} catch {
929+
case NonFatal(e) =>
930+
abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}")
931+
runningStages -= stage
932+
return
926933
}
927934

928935
if (tasks.size > 0) {

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -784,6 +784,37 @@ class DAGSchedulerSuite
784784
assert(sc.parallelize(1 to 10, 2).first() === 1)
785785
}
786786

787+
test("getPartitions exceptions should not crash DAGScheduler and SparkContext (SPARK-8606)") {
788+
val e1 = intercept[DAGSchedulerSuiteDummyException] {
789+
val rdd = new MyRDD(sc, 2, Nil) {
790+
override def getPartitions: Array[Partition] = {
791+
throw new DAGSchedulerSuiteDummyException
792+
}
793+
}
794+
rdd.reduceByKey(_ + _, 1).count()
795+
}
796+
797+
// Make sure we can still run local commands as well as cluster commands.
798+
assert(sc.parallelize(1 to 10, 2).count() === 10)
799+
assert(sc.parallelize(1 to 10, 2).first() === 1)
800+
}
801+
802+
test("getPreferredLocations errors should not crash DAGScheduler and SparkContext (SPARK-8606)") {
803+
val e1 = intercept[SparkException] {
804+
val rdd = new MyRDD(sc, 2, Nil) {
805+
override def getPreferredLocations(split: Partition): Seq[String] = {
806+
throw new DAGSchedulerSuiteDummyException
807+
}
808+
}
809+
rdd.count()
810+
}
811+
assert(e1.getMessage.contains(classOf[DAGSchedulerSuiteDummyException].getName))
812+
813+
// Make sure we can still run local commands as well as cluster commands.
814+
assert(sc.parallelize(1 to 10, 2).count() === 10)
815+
assert(sc.parallelize(1 to 10, 2).first() === 1)
816+
}
817+
787818
test("accumulator not calculated for resubmitted result stage") {
788819
// just for register
789820
val accum = new Accumulator[Int](0, AccumulatorParam.IntAccumulatorParam)

0 commit comments

Comments
 (0)