From e61f7f8a42811176ef1a454fed5753d189bf9b18 Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Wed, 7 May 2014 15:56:27 -0700 Subject: [PATCH 1/4] Catch UnsupportedOperationException when DAGScheduler tries to cancel a job on a SchedulerBackend that does not implement killTask --- .../apache/spark/scheduler/DAGScheduler.scala | 19 ++++++--- .../spark/scheduler/DAGSchedulerSuite.scala | 41 +++++++++++++++++++ 2 files changed, 55 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index b3ebaa547de0d..630dfb58ded78 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1062,10 +1062,15 @@ class DAGScheduler( // This is the only job that uses this stage, so fail the stage if it is running. val stage = stageIdToStage(stageId) if (runningStages.contains(stage)) { - taskScheduler.cancelTasks(stageId, shouldInterruptThread) - val stageInfo = stageToInfos(stage) - stageInfo.stageFailed(failureReason) - listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage))) + try { // cancelTasks will fail if a SchedulerBackend does not implement killTask + taskScheduler.cancelTasks(stageId, shouldInterruptThread) + } catch { + case e: UnsupportedOperationException => logInfo(s"Could not cancel tasks for stage $stageId", e) + } finally { + val stageInfo = stageToInfos(stage) + stageInfo.stageFailed(failureReason) + listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage))) + } } } } @@ -1155,7 +1160,11 @@ private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler) case x: Exception => logError("eventProcesserActor failed due to the error %s; shutting down SparkContext" .format(x.getMessage)) - dagScheduler.doCancelAllJobs() + try { + dagScheduler.doCancelAllJobs() + } catch { + case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t) + } dagScheduler.sc.stop() Stop } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 45368328297d3..d6bc3a95b115f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -314,6 +314,47 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F assertDataStructuresEmpty } + test("job cancellation no-kill backend") { + val noKillTaskScheduler = new TaskScheduler() { + override def rootPool: Pool = null + override def schedulingMode: SchedulingMode = SchedulingMode.NONE + override def start() = {} + override def stop() = {} + override def submitTasks(taskSet: TaskSet) = { + // normally done by TaskSetManager + taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch) + taskSets += taskSet + } + override def cancelTasks(stageId: Int, interruptThread: Boolean) { + throw new UnsupportedOperationException + } + override def setDAGScheduler(dagScheduler: DAGScheduler) = {} + override def defaultParallelism() = 2 + } + val noKillScheduler = new DAGScheduler( + sc, + noKillTaskScheduler, + sc.listenerBus, + mapOutputTracker, + blockManagerMaster, + sc.env) { + override def runLocally(job: ActiveJob) { + // don't bother with the thread while unit testing + runLocallyWithinThread(job) + } + } + dagEventProcessTestActor = TestActorRef[DAGSchedulerEventProcessActor]( + Props(classOf[DAGSchedulerEventProcessActor], noKillScheduler))(system) + val rdd = makeRdd(1, Nil) + val jobId = submit(rdd, Array(0)) + cancel(jobId) + assert(failure.getMessage === s"Job $jobId cancelled ") + assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + assert(sparkListener.failedStages.contains(0)) + assert(sparkListener.failedStages.size === 1) + assertDataStructuresEmpty + } + test("run trivial shuffle") { val shuffleMapRdd = makeRdd(2, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) From cc353c83bb5ee55f35a96cfcee48fe4c32ada0ec Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Wed, 7 May 2014 16:08:58 -0700 Subject: [PATCH 2/4] scalastyle --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 630dfb58ded78..53e7896083c96 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1065,7 +1065,8 @@ class DAGScheduler( try { // cancelTasks will fail if a SchedulerBackend does not implement killTask taskScheduler.cancelTasks(stageId, shouldInterruptThread) } catch { - case e: UnsupportedOperationException => logInfo(s"Could not cancel tasks for stage $stageId", e) + case e: UnsupportedOperationException => + logInfo(s"Could not cancel tasks for stage $stageId", e) } finally { val stageInfo = stageToInfos(stage) stageInfo.stageFailed(failureReason) From 9312baa7eb18ce20bd816a13f0b8752ccba435b5 Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Tue, 27 May 2014 15:16:34 -0700 Subject: [PATCH 3/4] code review update --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 7 +++---- .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 5 +++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 53e7896083c96..3a4c0e9e7c97d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1064,13 +1064,12 @@ class DAGScheduler( if (runningStages.contains(stage)) { try { // cancelTasks will fail if a SchedulerBackend does not implement killTask taskScheduler.cancelTasks(stageId, shouldInterruptThread) - } catch { - case e: UnsupportedOperationException => - logInfo(s"Could not cancel tasks for stage $stageId", e) - } finally { val stageInfo = stageToInfos(stage) stageInfo.stageFailed(failureReason) listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage))) + } catch { + case e: UnsupportedOperationException => + logInfo(s"Could not cancel tasks for stage $stageId", e) } } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index d6bc3a95b115f..383e116bcecf2 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -115,6 +115,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F sc = new SparkContext("local", "DAGSchedulerSuite") sparkListener.successfulStages.clear() sparkListener.failedStages.clear() + failure = null sc.addSparkListener(sparkListener) taskSets.clear() cancelledStages.clear() @@ -315,6 +316,8 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F } test("job cancellation no-kill backend") { + // make sure that the DAGScheduler doesn't crash when the TaskScheduler + // doesn't implement killTask() val noKillTaskScheduler = new TaskScheduler() { override def rootPool: Pool = null override def schedulingMode: SchedulingMode = SchedulingMode.NONE @@ -350,8 +353,6 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F cancel(jobId) assert(failure.getMessage === s"Job $jobId cancelled ") assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) - assert(sparkListener.failedStages.contains(0)) - assert(sparkListener.failedStages.size === 1) assertDataStructuresEmpty } From d156d33f390a63b198bbe415056f339b17ebbbad Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Fri, 20 Jun 2014 11:04:52 -0700 Subject: [PATCH 4/4] Do nothing in no-kill submitTasks --- .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 383e116bcecf2..29ba47eeb801c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -323,11 +323,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F override def schedulingMode: SchedulingMode = SchedulingMode.NONE override def start() = {} override def stop() = {} - override def submitTasks(taskSet: TaskSet) = { - // normally done by TaskSetManager - taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch) - taskSets += taskSet - } + override def submitTasks(taskSet: TaskSet) = {} override def cancelTasks(stageId: Int, interruptThread: Boolean) { throw new UnsupportedOperationException }