From e61f7f8a42811176ef1a454fed5753d189bf9b18 Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Wed, 7 May 2014 15:56:27 -0700 Subject: [PATCH 1/6] Catch UnsupportedOperationException when DAGScheduler tries to cancel a job on a SchedulerBackend that does not implement killTask --- .../apache/spark/scheduler/DAGScheduler.scala | 19 ++++++--- .../spark/scheduler/DAGSchedulerSuite.scala | 41 +++++++++++++++++++ 2 files changed, 55 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index b3ebaa547de0..630dfb58ded7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1062,10 +1062,15 @@ class DAGScheduler( // This is the only job that uses this stage, so fail the stage if it is running. val stage = stageIdToStage(stageId) if (runningStages.contains(stage)) { - taskScheduler.cancelTasks(stageId, shouldInterruptThread) - val stageInfo = stageToInfos(stage) - stageInfo.stageFailed(failureReason) - listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage))) + try { // cancelTasks will fail if a SchedulerBackend does not implement killTask + taskScheduler.cancelTasks(stageId, shouldInterruptThread) + } catch { + case e: UnsupportedOperationException => logInfo(s"Could not cancel tasks for stage $stageId", e) + } finally { + val stageInfo = stageToInfos(stage) + stageInfo.stageFailed(failureReason) + listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage))) + } } } } @@ -1155,7 +1160,11 @@ private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler) case x: Exception => logError("eventProcesserActor failed due to the error %s; shutting down SparkContext" .format(x.getMessage)) - dagScheduler.doCancelAllJobs() + try { + dagScheduler.doCancelAllJobs() + } catch { + case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t) + } dagScheduler.sc.stop() Stop } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 45368328297d..d6bc3a95b115 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -314,6 +314,47 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F assertDataStructuresEmpty } + test("job cancellation no-kill backend") { + val noKillTaskScheduler = new TaskScheduler() { + override def rootPool: Pool = null + override def schedulingMode: SchedulingMode = SchedulingMode.NONE + override def start() = {} + override def stop() = {} + override def submitTasks(taskSet: TaskSet) = { + // normally done by TaskSetManager + taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch) + taskSets += taskSet + } + override def cancelTasks(stageId: Int, interruptThread: Boolean) { + throw new UnsupportedOperationException + } + override def setDAGScheduler(dagScheduler: DAGScheduler) = {} + override def defaultParallelism() = 2 + } + val noKillScheduler = new DAGScheduler( + sc, + noKillTaskScheduler, + sc.listenerBus, + mapOutputTracker, + blockManagerMaster, + sc.env) { + override def runLocally(job: ActiveJob) { + // don't bother with the thread while unit testing + runLocallyWithinThread(job) + } + } + dagEventProcessTestActor = TestActorRef[DAGSchedulerEventProcessActor]( + Props(classOf[DAGSchedulerEventProcessActor], noKillScheduler))(system) + val rdd = makeRdd(1, Nil) + val jobId = submit(rdd, Array(0)) + cancel(jobId) + assert(failure.getMessage === s"Job $jobId cancelled ") + assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + assert(sparkListener.failedStages.contains(0)) + assert(sparkListener.failedStages.size === 1) + assertDataStructuresEmpty + } + test("run trivial shuffle") { val shuffleMapRdd = makeRdd(2, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) From cc353c83bb5ee55f35a96cfcee48fe4c32ada0ec Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Wed, 7 May 2014 16:08:58 -0700 Subject: [PATCH 2/6] scalastyle --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 630dfb58ded7..53e7896083c9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1065,7 +1065,8 @@ class DAGScheduler( try { // cancelTasks will fail if a SchedulerBackend does not implement killTask taskScheduler.cancelTasks(stageId, shouldInterruptThread) } catch { - case e: UnsupportedOperationException => logInfo(s"Could not cancel tasks for stage $stageId", e) + case e: UnsupportedOperationException => + logInfo(s"Could not cancel tasks for stage $stageId", e) } finally { val stageInfo = stageToInfos(stage) stageInfo.stageFailed(failureReason) From 9312baa7eb18ce20bd816a13f0b8752ccba435b5 Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Tue, 27 May 2014 15:16:34 -0700 Subject: [PATCH 3/6] code review update --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 7 +++---- .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 5 +++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 53e7896083c9..3a4c0e9e7c97 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1064,13 +1064,12 @@ class DAGScheduler( if (runningStages.contains(stage)) { try { // cancelTasks will fail if a SchedulerBackend does not implement killTask taskScheduler.cancelTasks(stageId, shouldInterruptThread) - } catch { - case e: UnsupportedOperationException => - logInfo(s"Could not cancel tasks for stage $stageId", e) - } finally { val stageInfo = stageToInfos(stage) stageInfo.stageFailed(failureReason) listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage))) + } catch { + case e: UnsupportedOperationException => + logInfo(s"Could not cancel tasks for stage $stageId", e) } } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index d6bc3a95b115..383e116bcecf 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -115,6 +115,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F sc = new SparkContext("local", "DAGSchedulerSuite") sparkListener.successfulStages.clear() sparkListener.failedStages.clear() + failure = null sc.addSparkListener(sparkListener) taskSets.clear() cancelledStages.clear() @@ -315,6 +316,8 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F } test("job cancellation no-kill backend") { + // make sure that the DAGScheduler doesn't crash when the TaskScheduler + // doesn't implement killTask() val noKillTaskScheduler = new TaskScheduler() { override def rootPool: Pool = null override def schedulingMode: SchedulingMode = SchedulingMode.NONE @@ -350,8 +353,6 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F cancel(jobId) assert(failure.getMessage === s"Job $jobId cancelled ") assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) - assert(sparkListener.failedStages.contains(0)) - assert(sparkListener.failedStages.size === 1) assertDataStructuresEmpty } From d156d33f390a63b198bbe415056f339b17ebbbad Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Fri, 20 Jun 2014 11:04:52 -0700 Subject: [PATCH 4/6] Do nothing in no-kill submitTasks --- .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 383e116bcecf..29ba47eeb801 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -323,11 +323,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F override def schedulingMode: SchedulingMode = SchedulingMode.NONE override def start() = {} override def stop() = {} - override def submitTasks(taskSet: TaskSet) = { - // normally done by TaskSetManager - taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch) - taskSets += taskSet - } + override def submitTasks(taskSet: TaskSet) = {} override def cancelTasks(stageId: Int, interruptThread: Boolean) { throw new UnsupportedOperationException } From 80b3205164f90373e273755f8d6ea12cc9bc1536 Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Wed, 25 Jun 2014 15:47:30 -0700 Subject: [PATCH 5/6] Don't notify listeners of job failure if it wasn't successfully cancelled. --- .../apache/spark/scheduler/DAGScheduler.scala | 11 +++++++---- .../spark/scheduler/DAGSchedulerSuite.scala | 16 +++++++++++++--- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 3a4c0e9e7c97..4aebfc78dc50 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1038,7 +1038,7 @@ class DAGScheduler( private def failJobAndIndependentStages(job: ActiveJob, failureReason: String, resultStage: Option[Stage]) { val error = new SparkException(failureReason) - job.listener.jobFailed(error) + var unableToCancelStages = false val shouldInterruptThread = if (job.properties == null) false @@ -1070,15 +1070,18 @@ class DAGScheduler( } catch { case e: UnsupportedOperationException => logInfo(s"Could not cancel tasks for stage $stageId", e) + unableToCancelStages = true } } } } } - cleanupStateForJobAndIndependentStages(job, resultStage) - - listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error))) + if (!unableToCancelStages) { + job.listener.jobFailed(error) + cleanupStateForJobAndIndependentStages(job, resultStage) + listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error))) + } } /** diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 29ba47eeb801..8dd2a9b9f737 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -323,7 +323,9 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F override def schedulingMode: SchedulingMode = SchedulingMode.NONE override def start() = {} override def stop() = {} - override def submitTasks(taskSet: TaskSet) = {} + override def submitTasks(taskSet: TaskSet) = { + taskSets += taskSet + } override def cancelTasks(stageId: Int, interruptThread: Boolean) { throw new UnsupportedOperationException } @@ -347,9 +349,17 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F val rdd = makeRdd(1, Nil) val jobId = submit(rdd, Array(0)) cancel(jobId) - assert(failure.getMessage === s"Job $jobId cancelled ") - assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + // Because the job wasn't actually cancelled, we shouldn't have received a failure message. + assert(failure === null) + + // When the task set completes normally, state should be correctly updated. + complete(taskSets(0), Seq((Success, 42))) + assert(results === Map(0 -> 42)) assertDataStructuresEmpty + + assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + assert(sparkListener.failedStages.isEmpty) + assert(sparkListener.successfulStages.contains(0)) } test("run trivial shuffle") { From 42dfa7e360989b7656c3dc8fa6885e6556e009db Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Wed, 25 Jun 2014 16:02:29 -0700 Subject: [PATCH 6/6] Got rid of terrible double-negative name --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 4aebfc78dc50..c8559a7a8286 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1038,7 +1038,7 @@ class DAGScheduler( private def failJobAndIndependentStages(job: ActiveJob, failureReason: String, resultStage: Option[Stage]) { val error = new SparkException(failureReason) - var unableToCancelStages = false + var ableToCancelStages = true val shouldInterruptThread = if (job.properties == null) false @@ -1070,14 +1070,14 @@ class DAGScheduler( } catch { case e: UnsupportedOperationException => logInfo(s"Could not cancel tasks for stage $stageId", e) - unableToCancelStages = true + ableToCancelStages = false } } } } } - if (!unableToCancelStages) { + if (ableToCancelStages) { job.listener.jobFailed(error) cleanupStateForJobAndIndependentStages(job, resultStage) listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error)))