From 416708e4de04ad6dcc4ce767b42865481ad68dff Mon Sep 17 00:00:00 2001 From: Tom Graves Date: Tue, 26 Jan 2016 09:31:20 -0600 Subject: [PATCH 1/7] [SPARK-11701] YARN - dynamic allocation and speculation active task accounting wrong --- .../apache/spark/mapred/SparkHadoopMapRedUtil.scala | 9 +++++++-- .../org/apache/spark/scheduler/DAGScheduler.scala | 11 ++++++++++- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala index 6841485f4b93..8910922b9d95 100644 --- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala +++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala @@ -90,8 +90,13 @@ object SparkHadoopMapRedUtil extends Logging { performCommit() } } else { - // Some other attempt committed the output, so we do nothing and signal success - logInfo(s"No need to commit output of task because needsTaskCommit=false: $mrTaskAttemptID") + // Some other attempt committed the output, this generally means speculation, we need to mark + // this task as failure so accounting work correctly + val taskAttemptNumber = TaskContext.get().attemptNumber() + val message = + s"No need to commit output of task because needsTaskCommit=false: $mrTaskAttemptID" + logInfo(message) + throw new CommitDeniedException(message, jobId, splitId, taskAttemptNumber) } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 6b01a10fc136..72e181cbc6de 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1134,7 +1134,16 @@ class DAGScheduler( } if (!stageIdToStage.contains(task.stageId)) { - // Skip all the actions if the stage has been cancelled. + logInfo("skip normal actions as stage cancelled") + // Need to handle tasks coming in late (speculative and jobs killed) + // post a task end event so accounting for things manually tracking tasks work. + // This really should be something other then success since the other speculative task + // finished first. + if (event.reason == Success) { + val attemptId = task.stageAttemptId + listenerBus.post(SparkListenerTaskEnd(stageId, attemptId, taskType, + event.reason, event.taskInfo, event.taskMetrics)) + } return } From 2ab1c902c5e792ad788a673959772ce8f4194124 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Wed, 27 Jan 2016 19:23:47 +0000 Subject: [PATCH 2/7] Add test for DAGScheduler changes --- .../spark/scheduler/DAGSchedulerSuite.scala | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 370a284d2950..f378238905ae 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -134,6 +134,7 @@ class DAGSchedulerSuite val successfulStages = new HashSet[Int] val failedStages = new ArrayBuffer[Int] val stageByOrderOfExecution = new ArrayBuffer[Int] + var endedTasks = new HashSet[Long] override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { submittedStageInfos += stageSubmitted.stageInfo @@ -148,6 +149,10 @@ class DAGSchedulerSuite failedStages += stageInfo.stageId } } + + override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { + endedTasks += taskEnd.taskInfo.taskId + } } var mapOutputTracker: MapOutputTrackerMaster = null @@ -194,6 +199,7 @@ class DAGSchedulerSuite sparkListener.submittedStageInfos.clear() sparkListener.successfulStages.clear() sparkListener.failedStages.clear() + sparkListener.endedTasks.clear() failure = null sc.addSparkListener(sparkListener) taskSets.clear() @@ -987,6 +993,46 @@ class DAGSchedulerSuite assert(countSubmittedMapStageAttempts() === 2) } + test("late task events posted") { + val baseRdd = new MyRDD(sc, 4, Nil) + val finalRdd = new MyRDD(sc, 4, List(new OneToOneDependency(baseRdd))) + submit(finalRdd, Array(0,1,2,3)) + + // complete two tasks + runEvent(CompletionEvent( + taskSets(0).tasks(0), Success, 42, null, createFakeTaskInfoWithId(0), null)) + runEvent(CompletionEvent( + taskSets(0).tasks(1), Success, 42, null, createFakeTaskInfoWithId(1), null)) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + // verify stage exists + assert(scheduler.stageIdToStage.contains(0)) + assert(sparkListener.endedTasks.size == 2) + + // finish other 2 tasks + runEvent(CompletionEvent( + taskSets(0).tasks(2), Success, 42, null, createFakeTaskInfoWithId(2), null)) + runEvent(CompletionEvent( + taskSets(0).tasks(3), Success, 42, null, createFakeTaskInfoWithId(3), null)) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + assert(sparkListener.endedTasks.size == 4) + + // verify the stage is done + assert(!scheduler.stageIdToStage.contains(0)) + + // stage should be complete finish one other Successful task to simulate what can happen + // with a speculative task and make sure the event is sent out + runEvent(CompletionEvent( + taskSets(0).tasks(3), Success, 42, null, createFakeTaskInfoWithId(5), null)) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + assert(sparkListener.endedTasks.size == 5) + + // make sure non successful tasks also send out event + runEvent(CompletionEvent( + taskSets(0).tasks(3), UnknownReason, 42, null, createFakeTaskInfoWithId(6), null)) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + assert(sparkListener.endedTasks.size == 6) + } + test("ignore late map task completions") { val shuffleMapRdd = new MyRDD(sc, 2, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) @@ -1962,5 +2008,11 @@ class DAGSchedulerSuite info } + private def createFakeTaskInfoWithId(taskId: Long): TaskInfo = { + val info = new TaskInfo(taskId, 0, 0, 0L, "", "", TaskLocality.ANY, false) + info.finishTime = 1 // to prevent spurious errors in JobProgressListener + info + } + } From 249fc78fd0fe7b3cbe5430a075ab5f9e281c015c Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Wed, 27 Jan 2016 20:29:30 +0000 Subject: [PATCH 3/7] fix scala tyle --- .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index f6c387ddcfb0..4ad44182b173 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -991,7 +991,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou test("late task events posted") { val baseRdd = new MyRDD(sc, 4, Nil) val finalRdd = new MyRDD(sc, 4, List(new OneToOneDependency(baseRdd))) - submit(finalRdd, Array(0,1,2,3)) + submit(finalRdd, Array(0, 1, 2, 3)) // complete two tasks runEvent(makeCompletionEvent( @@ -1002,7 +1002,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(1))) sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) // verify stage exists - assert(scheduler.stageIdToStage.contains(0)) + assert(scheduler.stageIdToStage.contains(0)) assert(sparkListener.endedTasks.size == 2) // finish other 2 tasks @@ -1016,7 +1016,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou assert(sparkListener.endedTasks.size == 4) // verify the stage is done - assert(!scheduler.stageIdToStage.contains(0)) + assert(!scheduler.stageIdToStage.contains(0)) // stage should be complete finish one other Successful task to simulate what can happen // with a speculative task and make sure the event is sent out From 5fc19c7b292365644e8e615227f2cfa0b211d261 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Wed, 10 Feb 2016 21:49:58 +0000 Subject: [PATCH 4/7] include changes to commonize TaskEnd call from SPARK-13054 and rework --- .../apache/spark/scheduler/DAGScheduler.scala | 26 +++++-------------- .../spark/scheduler/DAGSchedulerSuite.scala | 2 +- 2 files changed, 8 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index f275e00f065a..ecef083be43f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1144,33 +1144,21 @@ class DAGScheduler( null } - // The success case is dealt with separately below. - // TODO: Why post it only for failed tasks in cancelled stages? Clarify semantics here. - if (event.reason != Success) { - val attemptId = task.stageAttemptId - listenerBus.post(SparkListenerTaskEnd( - stageId, attemptId, taskType, event.reason, event.taskInfo, taskMetrics)) - } + // Note: this stage may already have been canceled, in which case this task end event + // maybe posted after the stage completed event. There's not much we can do here without + // introducing additional complexity in the scheduler to wait for all the task end events + // before posting the stage completed event. + listenerBus.post(SparkListenerTaskEnd( + stageId, task.stageAttemptId, taskType, event.reason, event.taskInfo, taskMetrics)) if (!stageIdToStage.contains(task.stageId)) { - logInfo("skip normal actions as stage cancelled") - // Need to handle tasks coming in late (speculative and jobs killed) - // post a task end event so accounting for things manually tracking tasks work. - // This really should be something other then success since the other speculative task - // finished first. - if (event.reason == Success) { - val attemptId = task.stageAttemptId - listenerBus.post(SparkListenerTaskEnd(stageId, attemptId, taskType, - event.reason, event.taskInfo, taskMetrics)) - } + // Skip all the actions if the stage has been cancelled. return } val stage = stageIdToStage(task.stageId) event.reason match { case Success => - listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType, - event.reason, event.taskInfo, taskMetrics)) stage.pendingPartitions -= task.partitionId task match { case rt: ResultTask[_, _] => diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 4ad44182b173..0e4c1b264f9d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -988,7 +988,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou assert(countSubmittedMapStageAttempts() === 2) } - test("late task events posted") { + test("task events always posted in speculation / when stage is killed") { val baseRdd = new MyRDD(sc, 4, Nil) val finalRdd = new MyRDD(sc, 4, List(new OneToOneDependency(baseRdd))) submit(finalRdd, Array(0, 1, 2, 3)) From 7ea0b3a1b38545cb9efd2415aa3af47366f04641 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Tue, 16 Feb 2016 19:16:28 +0000 Subject: [PATCH 5/7] Change back to just log when needsTaskCommit=false --- .../org/apache/spark/mapred/SparkHadoopMapRedUtil.scala | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala index 8910922b9d95..6841485f4b93 100644 --- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala +++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala @@ -90,13 +90,8 @@ object SparkHadoopMapRedUtil extends Logging { performCommit() } } else { - // Some other attempt committed the output, this generally means speculation, we need to mark - // this task as failure so accounting work correctly - val taskAttemptNumber = TaskContext.get().attemptNumber() - val message = - s"No need to commit output of task because needsTaskCommit=false: $mrTaskAttemptID" - logInfo(message) - throw new CommitDeniedException(message, jobId, splitId, taskAttemptNumber) + // Some other attempt committed the output, so we do nothing and signal success + logInfo(s"No need to commit output of task because needsTaskCommit=false: $mrTaskAttemptID") } } } From 4d5e402d8781b9ab95d5d7e3e630056729257bc1 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Mon, 14 Mar 2016 18:20:52 +0000 Subject: [PATCH 6/7] Minor comments from review --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 9 +++++---- .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 4 ++-- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index ecef083be43f..0ed108aace55 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1144,10 +1144,11 @@ class DAGScheduler( null } - // Note: this stage may already have been canceled, in which case this task end event - // maybe posted after the stage completed event. There's not much we can do here without - // introducing additional complexity in the scheduler to wait for all the task end events - // before posting the stage completed event. + // The stage may have already finished when we get this event -- eg. maybe it was a + // speculative task. It is important that we send the TaskEnd event in any case, so listeners + // are properly notified and can chose to handle it. For instance, some listeners are + // doing their own accounting and if they don't get the task end event they think + // tasks are still running when they really aren't. listenerBus.post(SparkListenerTaskEnd( stageId, task.stageAttemptId, taskType, event.reason, event.taskInfo, taskMetrics)) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 0e4c1b264f9d..2a24659a51e7 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -134,7 +134,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou val successfulStages = new HashSet[Int] val failedStages = new ArrayBuffer[Int] val stageByOrderOfExecution = new ArrayBuffer[Int] - var endedTasks = new HashSet[Long] + val endedTasks = new HashSet[Long] override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { submittedStageInfos += stageSubmitted.stageInfo @@ -1018,7 +1018,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou // verify the stage is done assert(!scheduler.stageIdToStage.contains(0)) - // stage should be complete finish one other Successful task to simulate what can happen + // Stage should be complete. Finish one other Successful task to simulate what can happen // with a speculative task and make sure the event is sent out runEvent(makeCompletionEvent( taskSets(0).tasks(3), Success, 42, From 68b64ae9355308934487a11dd3ff8af703be3450 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Mon, 14 Mar 2016 18:21:28 +0000 Subject: [PATCH 7/7] remove extra spaces --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 0ed108aace55..6bdda7ae6cb5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1144,10 +1144,10 @@ class DAGScheduler( null } - // The stage may have already finished when we get this event -- eg. maybe it was a - // speculative task. It is important that we send the TaskEnd event in any case, so listeners + // The stage may have already finished when we get this event -- eg. maybe it was a + // speculative task. It is important that we send the TaskEnd event in any case, so listeners // are properly notified and can chose to handle it. For instance, some listeners are - // doing their own accounting and if they don't get the task end event they think + // doing their own accounting and if they don't get the task end event they think // tasks are still running when they really aren't. listenerBus.post(SparkListenerTaskEnd( stageId, task.stageAttemptId, taskType, event.reason, event.taskInfo, taskMetrics))