Skip to content

Commit fb02e4e

Browse files
squitoAndrew Or
authored andcommitted
[SPARK-10248][CORE] track exceptions in dagscheduler event loop in tests
`DAGSchedulerEventLoop` normally only logs errors (so it can continue to process more events, from other jobs). However, this is not desirable in the tests -- the tests should be able to easily detect any exception, and also shouldn't silently succeed if there is an exception. This was suggested by mateiz on #7699. It may have already turned up an issue in "zero split job". Author: Imran Rashid <[email protected]> Closes #8466 from squito/SPARK-10248. (cherry picked from commit 38d9795) Signed-off-by: Andrew Or <[email protected]>
1 parent 638b89b commit fb02e4e

File tree

2 files changed

+29
-4
lines changed

2 files changed

+29
-4
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -802,7 +802,8 @@ class DAGScheduler(
802802

803803
private[scheduler] def cleanUpAfterSchedulerStop() {
804804
for (job <- activeJobs) {
805-
val error = new SparkException("Job cancelled because SparkContext was shut down")
805+
val error =
806+
new SparkException(s"Job ${job.jobId} cancelled because SparkContext was shut down")
806807
job.listener.jobFailed(error)
807808
// Tell the listeners that all of the running stages have ended. Don't bother
808809
// cancelling the stages because if the DAG scheduler is stopped, the entire application
@@ -1291,7 +1292,7 @@ class DAGScheduler(
12911292
case TaskResultLost =>
12921293
// Do nothing here; the TaskScheduler handles these failures and resubmits the task.
12931294

1294-
case other =>
1295+
case _: ExecutorLostFailure | TaskKilled | UnknownReason =>
12951296
// Unrecognized failure - also do nothing. If the task fails repeatedly, the TaskScheduler
12961297
// will abort the job.
12971298
}

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,13 @@ class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler)
4545
case NonFatal(e) => onError(e)
4646
}
4747
}
48+
49+
override def onError(e: Throwable): Unit = {
50+
logError("Error in DAGSchedulerEventLoop: ", e)
51+
dagScheduler.stop()
52+
throw e
53+
}
54+
4855
}
4956

5057
/**
@@ -300,13 +307,18 @@ class DAGSchedulerSuite
300307

301308
test("zero split job") {
302309
var numResults = 0
310+
var failureReason: Option[Exception] = None
303311
val fakeListener = new JobListener() {
304-
override def taskSucceeded(partition: Int, value: Any) = numResults += 1
305-
override def jobFailed(exception: Exception) = throw exception
312+
override def taskSucceeded(partition: Int, value: Any): Unit = numResults += 1
313+
override def jobFailed(exception: Exception): Unit = {
314+
failureReason = Some(exception)
315+
}
306316
}
307317
val jobId = submit(new MyRDD(sc, 0, Nil), Array(), listener = fakeListener)
308318
assert(numResults === 0)
309319
cancel(jobId)
320+
assert(failureReason.isDefined)
321+
assert(failureReason.get.getMessage() === "Job 0 cancelled ")
310322
}
311323

312324
test("run trivial job") {
@@ -1675,6 +1687,18 @@ class DAGSchedulerSuite
16751687
assert(stackTraceString.contains("org.scalatest.FunSuite"))
16761688
}
16771689

1690+
test("catch errors in event loop") {
1691+
// this is a test of our testing framework -- make sure errors in event loop don't get ignored
1692+
1693+
// just run some bad event that will throw an exception -- we'll give a null TaskEndReason
1694+
val rdd1 = new MyRDD(sc, 1, Nil)
1695+
submit(rdd1, Array(0))
1696+
intercept[Exception] {
1697+
complete(taskSets(0), Seq(
1698+
(null, makeMapStatus("hostA", 1))))
1699+
}
1700+
}
1701+
16781702
test("simple map stage submission") {
16791703
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
16801704
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1))

0 commit comments

Comments
 (0)