Skip to content

Commit 2c45f78

Browse files
committed
track exceptions in dagscheduler event loop in tests
1 parent bb16405 commit 2c45f78

File tree

2 files changed

+28
-4
lines changed

2 files changed

+28
-4
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -700,7 +700,7 @@ class DAGScheduler(
700700

701701
private[scheduler] def cleanUpAfterSchedulerStop() {
702702
for (job <- activeJobs) {
703-
val error = new SparkException("Job cancelled because SparkContext was shut down")
703+
val error = new SparkException(s"Job ${job.jobId} cancelled because SparkContext was shut down")
704704
job.listener.jobFailed(error)
705705
// Tell the listeners that all of the running stages have ended. Don't bother
706706
// cancelling the stages because if the DAG scheduler is stopped, the entire application
@@ -1150,7 +1150,7 @@ class DAGScheduler(
11501150
case TaskResultLost =>
11511151
// Do nothing here; the TaskScheduler handles these failures and resubmits the task.
11521152

1153-
case other =>
1153+
case ExecutorLostFailure(_) | TaskKilled | UnknownReason =>
11541154
// Unrecognized failure - also do nothing. If the task fails repeatedly, the TaskScheduler
11551155
// will abort the job.
11561156
}

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,13 @@ class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler)
4343
case NonFatal(e) => onError(e)
4444
}
4545
}
46+
47+
override def onError(e: Throwable): Unit = {
48+
logError("Error in DAGSchedulerEventLoop: ", e)
49+
dagScheduler.stop()
50+
throw e
51+
}
52+
4653
}
4754

4855
/**
@@ -260,13 +267,18 @@ class DAGSchedulerSuite
260267

261268
test("zero split job") {
262269
var numResults = 0
270+
var failureReason: Option[Exception] = None
263271
val fakeListener = new JobListener() {
264-
override def taskSucceeded(partition: Int, value: Any) = numResults += 1
265-
override def jobFailed(exception: Exception) = throw exception
272+
override def taskSucceeded(partition: Int, value: Any): Unit = numResults += 1
273+
override def jobFailed(exception: Exception): Unit = {
274+
failureReason = Some(exception)
275+
}
266276
}
267277
val jobId = submit(new MyRDD(sc, 0, Nil), Array(), listener = fakeListener)
268278
assert(numResults === 0)
269279
cancel(jobId)
280+
assert(failureReason.isDefined)
281+
assert(failureReason.get.getMessage() === "Job 0 cancelled ")
270282
}
271283

272284
test("run trivial job") {
@@ -990,6 +1002,18 @@ class DAGSchedulerSuite
9901002
assert(stackTraceString.contains("org.scalatest.FunSuite"))
9911003
}
9921004

1005+
test("catch errors in event loop") {
1006+
// this is a test of our testing framework -- make sure errors in event loop don't get ignored
1007+
1008+
// just run some bad event that will throw an exception -- we'll give a null TaskEndReason
1009+
val rdd1 = new MyRDD(sc, 1, Nil)
1010+
submit(rdd1, Array(0))
1011+
intercept[Exception] {
1012+
complete(taskSets(0), Seq(
1013+
(null, makeMapStatus("hostA", 1))))
1014+
}
1015+
}
1016+
9931017
/**
9941018
* Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
9951019
* Note that this checks only the host and not the executor ID.

0 commit comments

Comments
 (0)