Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,8 @@ class DAGScheduler(

private[scheduler] def cleanUpAfterSchedulerStop() {
for (job <- activeJobs) {
val error = new SparkException("Job cancelled because SparkContext was shut down")
val error =
new SparkException(s"Job ${job.jobId} cancelled because SparkContext was shut down")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not necessary, but was helpful for some debugging, figured it couldn't hurt.

job.listener.jobFailed(error)
// Tell the listeners that all of the running stages have ended. Don't bother
// cancelling the stages because if the DAG scheduler is stopped, the entire application
Expand Down Expand Up @@ -1290,7 +1291,7 @@ class DAGScheduler(
case TaskResultLost =>
// Do nothing here; the TaskScheduler handles these failures and resubmits the task.

case other =>
case _: ExecutorLostFailure | TaskKilled | UnknownReason =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a related change? If we add another TaskEndReason in the future we might forget to add it here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so, I think the compiler will report uncompleted matching as a warning and the build will fail. Right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, that's fine

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was needed just for the added test case -- I needed a way to intentionally throw an error inside the event loop, and with this change if event.reason is null, you get an exception.

And yeah, if you add another type but dont' include it in this match, you'll get a fatal warning in compilation since its a sealed trait.

// Unrecognized failure - also do nothing. If the task fails repeatedly, the TaskScheduler
// will abort the job.
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler)
case NonFatal(e) => onError(e)
}
}

override def onError(e: Throwable): Unit = {
logError("Error in DAGSchedulerEventLoop: ", e)
dagScheduler.stop()
throw e
}

}

/**
Expand Down Expand Up @@ -297,13 +304,18 @@ class DAGSchedulerSuite

test("zero split job") {
var numResults = 0
var failureReason: Option[Exception] = None
val fakeListener = new JobListener() {
override def taskSucceeded(partition: Int, value: Any) = numResults += 1
override def jobFailed(exception: Exception) = throw exception
override def taskSucceeded(partition: Int, value: Any): Unit = numResults += 1
override def jobFailed(exception: Exception): Unit = {
failureReason = Some(exception)
}
}
val jobId = submit(new MyRDD(sc, 0, Nil), Array(), listener = fakeListener)
assert(numResults === 0)
cancel(jobId)
assert(failureReason.isDefined)
assert(failureReason.get.getMessage() === "Job 0 cancelled ")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test used to just log an exception on cancel(jobId). I'm not sure what it was supposed to be testing before. I made the minimal change here, by capturing the exception and checking it. But maybe cancel(jobId) should not be creating an exception? Is the idea that if you submit a job with no partitions, it will immediately stop? That way, if you try to cancel it, you'd just hit this case with a harmless logDebug? That suggests we should change handleJobSubmitted to handle empty jobs, the same way we handle it in submitMissingTasks for stages with no partitions.

}

test("run trivial job") {
Expand Down Expand Up @@ -1516,6 +1528,18 @@ class DAGSchedulerSuite
assert(stackTraceString.contains("org.scalatest.FunSuite"))
}

test("catch errors in event loop") {
// this is a test of our testing framework -- make sure errors in event loop don't get ignored

// just run some bad event that will throw an exception -- we'll give a null TaskEndReason
val rdd1 = new MyRDD(sc, 1, Nil)
submit(rdd1, Array(0))
intercept[Exception] {
complete(taskSets(0), Seq(
(null, makeMapStatus("hostA", 1))))
}
}

test("simple map stage submission") {
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1))
Expand Down