From a9bf31f1ea29d128ebc58663a000e7d249acfb5a Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 9 Jun 2015 15:34:20 -0500 Subject: [PATCH 01/32] wip --- .../org/apache/spark/SparkException.scala | 10 ++++++++++ .../org/apache/spark/TaskContextImpl.scala | 1 + .../apache/spark/scheduler/DAGScheduler.scala | 20 ++++++++++++------- .../apache/spark/scheduler/ResultTask.scala | 3 ++- .../spark/scheduler/ShuffleMapTask.scala | 5 +++-- .../org/apache/spark/scheduler/Task.scala | 6 +++++- .../spark/scheduler/TaskSchedulerImpl.scala | 6 ++++++ .../java/org/apache/spark/JavaAPISuite.java | 2 +- .../org/apache/spark/scheduler/FakeTask.scala | 2 +- .../scheduler/NotSerializableFakeTask.scala | 2 +- .../spark/scheduler/TaskContextSuite.scala | 4 ++-- .../spark/scheduler/TaskSetManagerSuite.scala | 2 +- 12 files changed, 46 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkException.scala b/core/src/main/scala/org/apache/spark/SparkException.scala index 2ebd7a7151a59..b7c2386fd7d87 100644 --- a/core/src/main/scala/org/apache/spark/SparkException.scala +++ b/core/src/main/scala/org/apache/spark/SparkException.scala @@ -30,3 +30,13 @@ class SparkException(message: String, cause: Throwable) */ private[spark] class SparkDriverExecutionException(cause: Throwable) extends SparkException("Execution error", cause) + +/** + * Exception indicating an error internal to Spark -- it is in an inconsistent state, not due + * to any error by the user + */ +class SparkIllegalStateException(message: String, cause: Throwable) + extends SparkException(message, cause) { + + def this(message: String) = this(message, null) +} diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala index b4d572cb52313..077fa038bbe69 100644 --- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala +++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala @@ -30,6 +30,7 @@ private[spark] class TaskContextImpl( override val attemptNumber: Int, override val taskMemoryManager: TaskMemoryManager, val runningLocally: Boolean = false, + val stageAttemptId: Int = 0, // for testing val taskMetrics: TaskMetrics = TaskMetrics.empty) extends TaskContext with Logging { diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 75a567fb31520..026fef7a6618f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -834,7 +834,6 @@ class DAGScheduler( // Get our pending tasks and remember them in our pendingTasks entry stage.pendingTasks.clear() - // First figure out the indexes of partition ids to compute. val partitionsToCompute: Seq[Int] = { stage match { @@ -894,7 +893,7 @@ class DAGScheduler( partitionsToCompute.map { id => val locs = getPreferredLocs(stage.rdd, id) val part = stage.rdd.partitions(id) - new ShuffleMapTask(stage.id, taskBinary, part, locs) + new ShuffleMapTask(stage.id, stage.attemptId, taskBinary, part, locs) } case stage: ResultStage => @@ -903,7 +902,7 @@ class DAGScheduler( val p: Int = job.partitions(id) val part = stage.rdd.partitions(p) val locs = getPreferredLocs(stage.rdd, p) - new ResultTask(stage.id, taskBinary, part, locs, id) + new ResultTask(stage.id, stage.attemptId, taskBinary, part, locs, id) } } @@ -977,6 +976,7 @@ class DAGScheduler( val stageId = task.stageId val taskType = Utils.getFormattedClassName(task) + // REVIEWERS: does this need special handling for multiple completions of the same task? outputCommitCoordinator.taskCompleted(stageId, task.partitionId, event.taskInfo.attempt, event.reason) @@ -1039,10 +1039,11 @@ class DAGScheduler( val execId = status.location.executorId logDebug("ShuffleMapTask finished on " + execId) if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { - logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId) + logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") } else { shuffleStage.addOutputLoc(smt.partitionId, status) } + if (runningStages.contains(shuffleStage) && shuffleStage.pendingTasks.isEmpty) { markStageAsFinished(shuffleStage) logInfo("looking for newly runnable stages") @@ -1106,9 +1107,14 @@ class DAGScheduler( // multiple tasks running concurrently on different executors). In that case, it is possible // the fetch failure has already been handled by the scheduler. if (runningStages.contains(failedStage)) { - logInfo(s"Marking $failedStage (${failedStage.name}) as failed " + - s"due to a fetch failure from $mapStage (${mapStage.name})") - markStageAsFinished(failedStage, Some(failureMessage)) + if (failedStage.attemptId - 1 > task.stageAttemptId) { + logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" + + s" ${task.stageAttemptId}, which has already failed") + } else { + logInfo(s"Marking $failedStage (${failedStage.name}) as failed " + + s"due to a fetch failure from $mapStage (${mapStage.name})") + markStageAsFinished(failedStage, Some(failureMessage)) + } } if (disallowStageRetryForTest) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala index c9a124113961f..9c2606e278c54 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala @@ -41,11 +41,12 @@ import org.apache.spark.rdd.RDD */ private[spark] class ResultTask[T, U]( stageId: Int, + stageAttemptId: Int, taskBinary: Broadcast[Array[Byte]], partition: Partition, @transient locs: Seq[TaskLocation], val outputId: Int) - extends Task[U](stageId, partition.index) with Serializable { + extends Task[U](stageId, stageAttemptId, partition.index) with Serializable { @transient private[this] val preferredLocs: Seq[TaskLocation] = { if (locs == null) Nil else locs.toSet.toSeq diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index bd3dd23dfe1ac..14c8c00961487 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -40,14 +40,15 @@ import org.apache.spark.shuffle.ShuffleWriter */ private[spark] class ShuffleMapTask( stageId: Int, + stageAttemptId: Int, taskBinary: Broadcast[Array[Byte]], partition: Partition, @transient private var locs: Seq[TaskLocation]) - extends Task[MapStatus](stageId, partition.index) with Logging { + extends Task[MapStatus](stageId, stageAttemptId, partition.index) with Logging { /** A constructor used only in test suites. This does not require passing in an RDD. */ def this(partitionId: Int) { - this(0, null, new Partition { override def index: Int = 0 }, null) + this(0, 0, null, new Partition { override def index: Int = 0 }, null) } @transient private val preferredLocs: Seq[TaskLocation] = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 15101c64f0503..1f87977af9260 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -43,7 +43,10 @@ import org.apache.spark.util.Utils * @param stageId id of the stage this task belongs to * @param partitionId index of the number in the RDD */ -private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) extends Serializable { +private[spark] abstract class Task[T]( + val stageId: Int, + val stageAttemptId: Int, + var partitionId: Int) extends Serializable { /** * Called by [[Executor]] to run this task. @@ -55,6 +58,7 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex final def run(taskAttemptId: Long, attemptNumber: Int): T = { context = new TaskContextImpl( stageId = stageId, + stageAttemptId = stageAttemptId, partitionId = partitionId, taskAttemptId = taskAttemptId, attemptNumber = attemptNumber, diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index ed3dde0fc3055..60173e21b64a8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -163,6 +163,12 @@ private[spark] class TaskSchedulerImpl( this.synchronized { val manager = createTaskSetManager(taskSet, maxTaskFailures) activeTaskSets(taskSet.id) = manager + val taskSetsPerStage = activeTaskSets.values.filterNot(_.isZombie).groupBy(_.stageId) + taskSetsPerStage.foreach { case (stage, taskSets) => + if (taskSets.size > 1) { + throw new SparkIllegalStateException("more than one active taskSet for stage " + stage) + } + } schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) if (!isLocal && !hasReceivedTask) { diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index dfd86d3e51e7d..4152f96cf4def 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -1011,7 +1011,7 @@ public void persist() { @Test public void iterator() { JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2); - TaskContext context = new TaskContextImpl(0, 0, 0L, 0, null, false, new TaskMetrics()); + TaskContext context = new TaskContextImpl(0, 0, 0L, 0, null, false, 0, new TaskMetrics()); Assert.assertEquals(1, rdd.iterator(rdd.partitions().get(0), context).next().intValue()); } diff --git a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala index 0a7cb69416a08..188dded7c02f7 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala @@ -19,7 +19,7 @@ package org.apache.spark.scheduler import org.apache.spark.TaskContext -class FakeTask(stageId: Int, prefLocs: Seq[TaskLocation] = Nil) extends Task[Int](stageId, 0) { +class FakeTask(stageId: Int, prefLocs: Seq[TaskLocation] = Nil) extends Task[Int](stageId, 0, 0) { override def runTask(context: TaskContext): Int = 0 override def preferredLocations: Seq[TaskLocation] = prefLocs diff --git a/core/src/test/scala/org/apache/spark/scheduler/NotSerializableFakeTask.scala b/core/src/test/scala/org/apache/spark/scheduler/NotSerializableFakeTask.scala index 9b92f8de56759..383855caefa2f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/NotSerializableFakeTask.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/NotSerializableFakeTask.scala @@ -25,7 +25,7 @@ import org.apache.spark.TaskContext * A Task implementation that fails to serialize. */ private[spark] class NotSerializableFakeTask(myId: Int, stageId: Int) - extends Task[Array[Byte]](stageId, 0) { + extends Task[Array[Byte]](stageId, 0, 0) { override def runTask(context: TaskContext): Array[Byte] = Array.empty[Byte] override def preferredLocations: Seq[TaskLocation] = Seq[TaskLocation]() diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala index 7c1adc1aef1b6..b9b0eccb0d834 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala @@ -41,8 +41,8 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark } val closureSerializer = SparkEnv.get.closureSerializer.newInstance() val func = (c: TaskContext, i: Iterator[String]) => i.next() - val task = new ResultTask[String, String]( - 0, sc.broadcast(closureSerializer.serialize((rdd, func)).array), rdd.partitions(0), Seq(), 0) + val task = new ResultTask[String, String](0, 0, + sc.broadcast(closureSerializer.serialize((rdd, func)).array), rdd.partitions(0), Seq(), 0) intercept[RuntimeException] { task.run(0, 0) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 0060f3396dcde..a5a9c32aea0a1 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -135,7 +135,7 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex /** * A Task implementation that results in a large serialized task. */ -class LargeTask(stageId: Int) extends Task[Array[Byte]](stageId, 0) { +class LargeTask(stageId: Int) extends Task[Array[Byte]](stageId, 0, 0) { val randomBuffer = new Array[Byte](TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024) val random = new Random(0) random.nextBytes(randomBuffer) From 28d70aac00a3fd56f6bee07c9955f5940397c7d3 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 9 Jun 2015 16:02:46 -0500 Subject: [PATCH 02/32] wip on getting a better test case ... --- .../DAGSchedulerFailureRecoverySuite.scala | 105 ++++++++++++++++++ 1 file changed, 105 insertions(+) create mode 100644 core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerFailureRecoverySuite.scala diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerFailureRecoverySuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerFailureRecoverySuite.scala new file mode 100644 index 0000000000000..fe4ef2deb735d --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerFailureRecoverySuite.scala @@ -0,0 +1,105 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.spark.scheduler + +import java.util.Date + +import scala.collection.mutable.{ArrayBuffer, HashMap} + +import org.apache.spark.shuffle.FetchFailedException +import org.apache.spark.storage.BlockManagerId +import org.apache.spark._ + +class DAGSchedulerFailureRecoverySuite extends SparkFunSuite with Logging { + + // TODO we should run this with a matrix of configurations: different shufflers, + // external shuffle service, etc. But that is really pushing the question of how to run + // such a long test ... + + ignore("no concurrent retries for stage attempts (SPARK-7308)") { + // see SPARK-7308 for a detailed description of the conditions this is trying to recreate. + // note that this is somewhat convoluted for a test case, but isn't actually very unusual + // under a real workload. We only fail the first attempt of stage 2, but that + // could be enough to cause havoc. + + (0 until 100).foreach { idx => + println(new Date() + "\ttrial " + idx) + logInfo(new Date() + "\ttrial " + idx) + + val conf = new SparkConf().set("spark.executor.memory", "100m") + val clusterSc = new SparkContext("local-cluster[5,4,100]", "test-cluster", conf) + val bms = ArrayBuffer[BlockManagerId]() + val stageFailureCount = HashMap[Int, Int]() + clusterSc.addSparkListener(new SparkListener { + override def onBlockManagerAdded(bmAdded: SparkListenerBlockManagerAdded): Unit = { + bms += bmAdded.blockManagerId + } + + override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + if (stageCompleted.stageInfo.failureReason.isDefined) { + val stage = stageCompleted.stageInfo.stageId + stageFailureCount(stage) = stageFailureCount.getOrElse(stage, 0) + 1 + val reason = stageCompleted.stageInfo.failureReason.get + println("stage " + stage + " failed: " + stageFailureCount(stage)) + } + } + }) + try { + val rawData = clusterSc.parallelize(1 to 1e6.toInt, 20).map { x => (x % 100) -> x }.cache() + rawData.count() + + // choose any executor block manager for the fetch failures. Just can't be driver + // to avoid broadcast failures + val someBlockManager = bms.filter{!_.isDriver}(0) + + val shuffled = rawData.groupByKey(100).mapPartitionsWithIndex { case (idx, itr) => + // we want one failure quickly, and more failures after stage 0 has finished its + // second attempt + val stageAttemptId = TaskContext.get().asInstanceOf[TaskContextImpl].stageAttemptId + if (stageAttemptId == 0) { + if (idx == 0) { + throw new FetchFailedException(someBlockManager, 0, 0, idx, + cause = new RuntimeException("simulated fetch failure")) + } else if (idx > 0 && math.random < 0.2) { + Thread.sleep(5000) + throw new FetchFailedException(someBlockManager, 0, 0, idx, + cause = new RuntimeException("simulated fetch failure")) + } else { + // want to make sure plenty of these finish after task 0 fails, and some even finish + // after the previous stage is retried and this stage retry is started + Thread.sleep((500 + math.random * 5000).toLong) + } + } + itr.map { x => ((x._1 + 5) % 100) -> x._2 } + } + val data = shuffled.mapPartitions { itr => itr.flatMap(_._2) }.collect() + val count = data.size + assert(count === 1e6.toInt) + assert(data.toSet === (1 to 1e6.toInt).toSet) + + assert(stageFailureCount.getOrElse(1, 0) === 0) + assert(stageFailureCount.getOrElse(2, 0) == 1) + assert(stageFailureCount.getOrElse(3, 0) == 0) + } finally { + clusterSc.stop() + } + } + } + + + +} From c443def131fb36a4c915448581a2486802e9ee67 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Wed, 10 Jun 2015 14:20:40 -0500 Subject: [PATCH 03/32] better fix and simpler test case --- .../apache/spark/scheduler/DAGScheduler.scala | 65 ++++++++++--------- .../DAGSchedulerFailureRecoverySuite.scala | 51 ++++++++------- 2 files changed, 60 insertions(+), 56 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 026fef7a6618f..e7226a63793a0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1102,44 +1102,47 @@ class DAGScheduler( case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) => val failedStage = stageIdToStage(task.stageId) val mapStage = shuffleToMapStage(shuffleId) + if (failedStage.attemptId - 1 > task.stageAttemptId) { + logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" + + s" ${task.stageAttemptId}, which has already failed") + } else { - // It is likely that we receive multiple FetchFailed for a single stage (because we have - // multiple tasks running concurrently on different executors). In that case, it is possible - // the fetch failure has already been handled by the scheduler. - if (runningStages.contains(failedStage)) { - if (failedStage.attemptId - 1 > task.stageAttemptId) { - logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" + - s" ${task.stageAttemptId}, which has already failed") - } else { + // It is likely that we receive multiple FetchFailed for a single stage (because we have + // multiple tasks running concurrently on different executors). In that case, it is possible + // the fetch failure has already been handled by the scheduler. + if (runningStages.contains(failedStage)) { logInfo(s"Marking $failedStage (${failedStage.name}) as failed " + s"due to a fetch failure from $mapStage (${mapStage.name})") markStageAsFinished(failedStage, Some(failureMessage)) + } else { + logInfo(s"Ignoring fetch failure from $task as it's from $failedStage, " + + s"which is no longer running") } - } - if (disallowStageRetryForTest) { - abortStage(failedStage, "Fetch failure will not retry stage due to testing config") - } else if (failedStages.isEmpty) { - // Don't schedule an event to resubmit failed stages if failed isn't empty, because - // in that case the event will already have been scheduled. - // TODO: Cancel running tasks in the stage - logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " + - s"$failedStage (${failedStage.name}) due to fetch failure") - messageScheduler.schedule(new Runnable { - override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) - }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS) - } - failedStages += failedStage - failedStages += mapStage - // Mark the map whose fetch failed as broken in the map stage - if (mapId != -1) { - mapStage.removeOutputLoc(mapId, bmAddress) - mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress) - } + if (disallowStageRetryForTest) { + abortStage(failedStage, "Fetch failure will not retry stage due to testing config") + } else if (failedStages.isEmpty) { + // Don't schedule an event to resubmit failed stages if failed isn't empty, because + // in that case the event will already have been scheduled. + // TODO: Cancel running tasks in the stage + logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " + + s"$failedStage (${failedStage.name}) due to fetch failure") + messageScheduler.schedule(new Runnable { + override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) + }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS) + } + failedStages += failedStage + failedStages += mapStage + // Mark the map whose fetch failed as broken in the map stage + if (mapId != -1) { + mapStage.removeOutputLoc(mapId, bmAddress) + mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress) + } - // TODO: mark the executor as failed only if there were lots of fetch failures on it - if (bmAddress != null) { - handleExecutorLost(bmAddress.executorId, fetchFailed = true, Some(task.epoch)) + // TODO: mark the executor as failed only if there were lots of fetch failures on it + if (bmAddress != null) { + handleExecutorLost(bmAddress.executorId, fetchFailed = true, Some(task.epoch)) + } } case commitDenied: TaskCommitDenied => diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerFailureRecoverySuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerFailureRecoverySuite.scala index fe4ef2deb735d..f330ef622f7d4 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerFailureRecoverySuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerFailureRecoverySuite.scala @@ -26,35 +26,33 @@ import org.apache.spark._ class DAGSchedulerFailureRecoverySuite extends SparkFunSuite with Logging { - // TODO we should run this with a matrix of configurations: different shufflers, - // external shuffle service, etc. But that is really pushing the question of how to run - // such a long test ... - - ignore("no concurrent retries for stage attempts (SPARK-7308)") { - // see SPARK-7308 for a detailed description of the conditions this is trying to recreate. - // note that this is somewhat convoluted for a test case, but isn't actually very unusual - // under a real workload. We only fail the first attempt of stage 2, but that - // could be enough to cause havoc. - - (0 until 100).foreach { idx => - println(new Date() + "\ttrial " + idx) + test("no concurrent retries for stage attempts (SPARK-8103)") { + // make sure that if we get fetch failures after the retry has started, we ignore them, + // and so don't end up submitting multiple concurrent attempts for the same stage + + (0 until 20).foreach { idx => logInfo(new Date() + "\ttrial " + idx) val conf = new SparkConf().set("spark.executor.memory", "100m") - val clusterSc = new SparkContext("local-cluster[5,4,100]", "test-cluster", conf) + val clusterSc = new SparkContext("local-cluster[2,2,100]", "test-cluster", conf) val bms = ArrayBuffer[BlockManagerId]() val stageFailureCount = HashMap[Int, Int]() + val stageSubmissionCount = HashMap[Int, Int]() clusterSc.addSparkListener(new SparkListener { override def onBlockManagerAdded(bmAdded: SparkListenerBlockManagerAdded): Unit = { bms += bmAdded.blockManagerId } + override def onStageSubmitted(stageSubmited: SparkListenerStageSubmitted): Unit = { + val stage = stageSubmited.stageInfo.stageId + stageSubmissionCount(stage) = stageSubmissionCount.getOrElse(stage, 0) + 1 + } + + override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { if (stageCompleted.stageInfo.failureReason.isDefined) { val stage = stageCompleted.stageInfo.stageId stageFailureCount(stage) = stageFailureCount.getOrElse(stage, 0) + 1 - val reason = stageCompleted.stageInfo.failureReason.get - println("stage " + stage + " failed: " + stageFailureCount(stage)) } } }) @@ -66,7 +64,7 @@ class DAGSchedulerFailureRecoverySuite extends SparkFunSuite with Logging { // to avoid broadcast failures val someBlockManager = bms.filter{!_.isDriver}(0) - val shuffled = rawData.groupByKey(100).mapPartitionsWithIndex { case (idx, itr) => + val shuffled = rawData.groupByKey(20).mapPartitionsWithIndex { case (idx, itr) => // we want one failure quickly, and more failures after stage 0 has finished its // second attempt val stageAttemptId = TaskContext.get().asInstanceOf[TaskContextImpl].stageAttemptId @@ -74,26 +72,29 @@ class DAGSchedulerFailureRecoverySuite extends SparkFunSuite with Logging { if (idx == 0) { throw new FetchFailedException(someBlockManager, 0, 0, idx, cause = new RuntimeException("simulated fetch failure")) - } else if (idx > 0 && math.random < 0.2) { - Thread.sleep(5000) + } else if (idx == 1) { + Thread.sleep(2000) throw new FetchFailedException(someBlockManager, 0, 0, idx, cause = new RuntimeException("simulated fetch failure")) - } else { - // want to make sure plenty of these finish after task 0 fails, and some even finish - // after the previous stage is retried and this stage retry is started - Thread.sleep((500 + math.random * 5000).toLong) } + } else { + // just to make sure the second attempt doesn't finish before we trigger more failures + // from the first attempt + Thread.sleep(2000) } itr.map { x => ((x._1 + 5) % 100) -> x._2 } } - val data = shuffled.mapPartitions { itr => itr.flatMap(_._2) }.collect() + val data = shuffled.mapPartitions { itr => + itr.flatMap(_._2) + }.cache().collect() val count = data.size assert(count === 1e6.toInt) assert(data.toSet === (1 to 1e6.toInt).toSet) assert(stageFailureCount.getOrElse(1, 0) === 0) - assert(stageFailureCount.getOrElse(2, 0) == 1) - assert(stageFailureCount.getOrElse(3, 0) == 0) + assert(stageFailureCount.getOrElse(2, 0) === 1) + assert(stageSubmissionCount.getOrElse(1, 0) <= 2) + assert(stageSubmissionCount.getOrElse(2, 0) === 2) } finally { clusterSc.stop() } From 06a0af63bb2c2a5f61aff16cdee246a72b091f00 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Wed, 10 Jun 2015 14:21:08 -0500 Subject: [PATCH 04/32] ignore for jenkins --- .../spark/scheduler/DAGSchedulerFailureRecoverySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerFailureRecoverySuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerFailureRecoverySuite.scala index f330ef622f7d4..580e31bb350e8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerFailureRecoverySuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerFailureRecoverySuite.scala @@ -26,7 +26,7 @@ import org.apache.spark._ class DAGSchedulerFailureRecoverySuite extends SparkFunSuite with Logging { - test("no concurrent retries for stage attempts (SPARK-8103)") { + ignore("no concurrent retries for stage attempts (SPARK-8103)") { // make sure that if we get fetch failures after the retry has started, we ignore them, // and so don't end up submitting multiple concurrent attempts for the same stage From 6e14683b41d19cc3a60f3595d2f478b4408bf4e1 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Wed, 10 Jun 2015 14:38:12 -0500 Subject: [PATCH 05/32] unit test just to make sure we fail fast on concurrent attempts --- .../spark/scheduler/TaskSchedulerImplSuite.scala | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index a6d5232feb8de..07bdb84cd756f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -128,4 +128,20 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L assert(taskDescriptions.map(_.executorId) === Seq("executor0")) } + test("refuse to schedule concurrent attempts for the same stage (SPARK-8103)") { + sc = new SparkContext("local", "TaskSchedulerImplSuite") + val taskScheduler = new TaskSchedulerImpl(sc) + taskScheduler.initialize(new FakeSchedulerBackend) + // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks. + val dagScheduler = new DAGScheduler(sc, taskScheduler) { + override def taskStarted(task: Task[_], taskInfo: TaskInfo) {} + override def executorAdded(execId: String, host: String) {} + } + taskScheduler.setDAGScheduler(dagScheduler) + val attempt1 = new TaskSet(Array(new FakeTask(0)), 0, 0, 0, null) + val attempt2 = new TaskSet(Array(new FakeTask(0)), 0, 1, 0, null) + taskScheduler.submitTasks(attempt1) + intercept[SparkIllegalStateException] { taskScheduler.submitTasks(attempt2)} + } + } From 883fe49d53b08df8be1d75b2a09dbee6f1412073 Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Tue, 26 May 2015 15:08:03 -0700 Subject: [PATCH 06/32] Unit tests for concurrent stages issue --- .../spark/scheduler/DAGSchedulerSuite.scala | 134 ++++++++++++++++++ 1 file changed, 134 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 47b2868753c0e..38cebe42a2ca7 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -101,9 +101,15 @@ class DAGSchedulerSuite /** Length of time to wait while draining listener events. */ val WAIT_TIMEOUT_MILLIS = 10000 val sparkListener = new SparkListener() { + val submittedStageInfos = new HashSet[StageInfo] val successfulStages = new HashSet[Int] val failedStages = new ArrayBuffer[Int] val stageByOrderOfExecution = new ArrayBuffer[Int] + + override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { + submittedStageInfos += stageSubmitted.stageInfo + } + override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { val stageInfo = stageCompleted.stageInfo stageByOrderOfExecution += stageInfo.stageId @@ -150,6 +156,7 @@ class DAGSchedulerSuite // Enable local execution for this test val conf = new SparkConf().set("spark.localExecution.enabled", "true") sc = new SparkContext("local", "DAGSchedulerSuite", conf) + sparkListener.submittedStageInfos.clear() sparkListener.successfulStages.clear() sparkListener.failedStages.clear() failure = null @@ -547,6 +554,133 @@ class DAGSchedulerSuite assert(sparkListener.failedStages.size == 1) } + /** This tests the case where another FetchFailed comes in while the map stage is getting + * re-run. */ + test("late fetch failures don't cause multiple concurrent attempts for the same map stage") { + val shuffleMapRdd = new MyRDD(sc, 2, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) + val shuffleId = shuffleDep.shuffleId + val reduceRdd = new MyRDD(sc, 2, List(shuffleDep)) + submit(reduceRdd, Array(0, 1)) + + val mapStageId = 0 + def countSubmittedMapStageAttempts(): Int = { + sparkListener.submittedStageInfos.count(_.stageId == mapStageId) + } + + // The map stage should have been submitted. + assert(countSubmittedMapStageAttempts() === 1) + + complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 1)), + (Success, makeMapStatus("hostB", 1)))) + // The MapOutputTracker should know about both map output locations. + assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) === + Array("hostA", "hostB")) + + // The first result task fails, with a fetch failure for the output from the first mapper. + runEvent(CompletionEvent( + taskSets(1).tasks(0), + FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), + null, + Map[Long, Any](), + createFakeTaskInfo(), + null)) + assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + assert(sparkListener.failedStages.contains(1)) + + // Trigger resubmission of the failed map stage. + runEvent(ResubmitFailedStages) + assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + + // Another attempt for the map stage should have been submitted, resulting in 2 total attempts. + assert(countSubmittedMapStageAttempts() === 2) + + // The second ResultTask fails, with a fetch failure for the output from the second mapper. + runEvent(CompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostB"), shuffleId, 1, 1, "ignored"), + null, + Map[Long, Any](), + createFakeTaskInfo(), + null)) + + // Another ResubmitFailedStages event should not result result in another attempt for the map + // stage being run concurrently. + runEvent(ResubmitFailedStages) + assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + assert(countSubmittedMapStageAttempts() === 2) + + // NOTE: the actual ResubmitFailedStages may get called at any time during this, shouldn't effect anything -- + // our calling it just makes *SURE* it gets called between the desired event and our check. + + } + + /** This tests the case where a late FetchFailed comes in after the map stage has finished getting + * retried and a new reduce stage starts running. + */ + test("extremely late fetch failures don't cause multiple concurrent attempts for the same stage") { + val shuffleMapRdd = new MyRDD(sc, 2, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) + val shuffleId = shuffleDep.shuffleId + val reduceRdd = new MyRDD(sc, 2, List(shuffleDep)) + submit(reduceRdd, Array(0, 1)) + + def countSubmittedReduceStageAttempts(): Int = { + sparkListener.submittedStageInfos.count(_.stageId == 1) + } + def countSubmittedMapStageAttempts(): Int = { + sparkListener.submittedStageInfos.count(_.stageId == 0) + } + + // The map stage should have been submitted. + assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + assert(countSubmittedMapStageAttempts() === 1) + + // Complete the map stage. + complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 1)), + (Success, makeMapStatus("hostB", 1)))) + + // The reduce stage should have been submitted. + assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + assert(countSubmittedReduceStageAttempts() === 1) + + // The first result task fails, with a fetch failure for the output from the first mapper. + runEvent(CompletionEvent( + taskSets(1).tasks(0), + FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), + null, + Map[Long, Any](), + createFakeTaskInfo(), + null)) + + // Trigger resubmission of the failed map stage and finish the re-started map task. + runEvent(ResubmitFailedStages) + complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1)))) + + // Because the map stage finished, another attempt for the reduce stage should have been + // submitted, resulting in 2 total attempts for each the map and the reduce stage. + assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + assert(countSubmittedMapStageAttempts() === 2) + assert(countSubmittedReduceStageAttempts() === 2) + + // A late FetchFailed arrives from the second task in the original reduce stage. + runEvent(CompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostB"), shuffleId, 1, 1, "ignored"), + null, + Map[Long, Any](), + createFakeTaskInfo(), + null)) + + // Trigger resubmission of the failed map stage and finish the re-started map task. + runEvent(ResubmitFailedStages) + + // The FetchFailed from the original reduce stage should be ignored. + assert(countSubmittedMapStageAttempts() === 2) + } + test("ignore late map task completions") { val shuffleMapRdd = new MyRDD(sc, 2, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) From 7021d288213490e4fff2cd16eab550bd92bafac2 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Wed, 10 Jun 2015 14:49:14 -0500 Subject: [PATCH 07/32] update test since listenerBus.waitUntilEmpty now throws an exception instead of returning a boolean --- .../apache/spark/scheduler/DAGSchedulerSuite.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 38cebe42a2ca7..6232bb652f861 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -586,12 +586,12 @@ class DAGSchedulerSuite Map[Long, Any](), createFakeTaskInfo(), null)) - assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(sparkListener.failedStages.contains(1)) // Trigger resubmission of the failed map stage. runEvent(ResubmitFailedStages) - assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) // Another attempt for the map stage should have been submitted, resulting in 2 total attempts. assert(countSubmittedMapStageAttempts() === 2) @@ -608,7 +608,7 @@ class DAGSchedulerSuite // Another ResubmitFailedStages event should not result result in another attempt for the map // stage being run concurrently. runEvent(ResubmitFailedStages) - assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(countSubmittedMapStageAttempts() === 2) // NOTE: the actual ResubmitFailedStages may get called at any time during this, shouldn't effect anything -- @@ -634,7 +634,7 @@ class DAGSchedulerSuite } // The map stage should have been submitted. - assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(countSubmittedMapStageAttempts() === 1) // Complete the map stage. @@ -643,7 +643,7 @@ class DAGSchedulerSuite (Success, makeMapStatus("hostB", 1)))) // The reduce stage should have been submitted. - assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(countSubmittedReduceStageAttempts() === 1) // The first result task fails, with a fetch failure for the output from the first mapper. @@ -661,7 +661,7 @@ class DAGSchedulerSuite // Because the map stage finished, another attempt for the reduce stage should have been // submitted, resulting in 2 total attempts for each the map and the reduce stage. - assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(countSubmittedMapStageAttempts() === 2) assert(countSubmittedReduceStageAttempts() === 2) From 55f4a944ed2ffbbe2f8534f1763b4d9a1179a34e Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Wed, 10 Jun 2015 15:01:15 -0500 Subject: [PATCH 08/32] get rid of more random test case since kays tests are clearer --- .../DAGSchedulerFailureRecoverySuite.scala | 106 ------------------ 1 file changed, 106 deletions(-) delete mode 100644 core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerFailureRecoverySuite.scala diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerFailureRecoverySuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerFailureRecoverySuite.scala deleted file mode 100644 index 580e31bb350e8..0000000000000 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerFailureRecoverySuite.scala +++ /dev/null @@ -1,106 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.spark.scheduler - -import java.util.Date - -import scala.collection.mutable.{ArrayBuffer, HashMap} - -import org.apache.spark.shuffle.FetchFailedException -import org.apache.spark.storage.BlockManagerId -import org.apache.spark._ - -class DAGSchedulerFailureRecoverySuite extends SparkFunSuite with Logging { - - ignore("no concurrent retries for stage attempts (SPARK-8103)") { - // make sure that if we get fetch failures after the retry has started, we ignore them, - // and so don't end up submitting multiple concurrent attempts for the same stage - - (0 until 20).foreach { idx => - logInfo(new Date() + "\ttrial " + idx) - - val conf = new SparkConf().set("spark.executor.memory", "100m") - val clusterSc = new SparkContext("local-cluster[2,2,100]", "test-cluster", conf) - val bms = ArrayBuffer[BlockManagerId]() - val stageFailureCount = HashMap[Int, Int]() - val stageSubmissionCount = HashMap[Int, Int]() - clusterSc.addSparkListener(new SparkListener { - override def onBlockManagerAdded(bmAdded: SparkListenerBlockManagerAdded): Unit = { - bms += bmAdded.blockManagerId - } - - override def onStageSubmitted(stageSubmited: SparkListenerStageSubmitted): Unit = { - val stage = stageSubmited.stageInfo.stageId - stageSubmissionCount(stage) = stageSubmissionCount.getOrElse(stage, 0) + 1 - } - - - override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { - if (stageCompleted.stageInfo.failureReason.isDefined) { - val stage = stageCompleted.stageInfo.stageId - stageFailureCount(stage) = stageFailureCount.getOrElse(stage, 0) + 1 - } - } - }) - try { - val rawData = clusterSc.parallelize(1 to 1e6.toInt, 20).map { x => (x % 100) -> x }.cache() - rawData.count() - - // choose any executor block manager for the fetch failures. Just can't be driver - // to avoid broadcast failures - val someBlockManager = bms.filter{!_.isDriver}(0) - - val shuffled = rawData.groupByKey(20).mapPartitionsWithIndex { case (idx, itr) => - // we want one failure quickly, and more failures after stage 0 has finished its - // second attempt - val stageAttemptId = TaskContext.get().asInstanceOf[TaskContextImpl].stageAttemptId - if (stageAttemptId == 0) { - if (idx == 0) { - throw new FetchFailedException(someBlockManager, 0, 0, idx, - cause = new RuntimeException("simulated fetch failure")) - } else if (idx == 1) { - Thread.sleep(2000) - throw new FetchFailedException(someBlockManager, 0, 0, idx, - cause = new RuntimeException("simulated fetch failure")) - } - } else { - // just to make sure the second attempt doesn't finish before we trigger more failures - // from the first attempt - Thread.sleep(2000) - } - itr.map { x => ((x._1 + 5) % 100) -> x._2 } - } - val data = shuffled.mapPartitions { itr => - itr.flatMap(_._2) - }.cache().collect() - val count = data.size - assert(count === 1e6.toInt) - assert(data.toSet === (1 to 1e6.toInt).toSet) - - assert(stageFailureCount.getOrElse(1, 0) === 0) - assert(stageFailureCount.getOrElse(2, 0) === 1) - assert(stageSubmissionCount.getOrElse(1, 0) <= 2) - assert(stageSubmissionCount.getOrElse(2, 0) === 2) - } finally { - clusterSc.stop() - } - } - } - - - -} From b6bc2489e8115fdb9b75dd6ab38082a08c546bae Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Wed, 10 Jun 2015 15:19:45 -0500 Subject: [PATCH 09/32] style --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 4 ++-- .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 9 +++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index e7226a63793a0..ca3f04b973479 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1108,8 +1108,8 @@ class DAGScheduler( } else { // It is likely that we receive multiple FetchFailed for a single stage (because we have - // multiple tasks running concurrently on different executors). In that case, it is possible - // the fetch failure has already been handled by the scheduler. + // multiple tasks running concurrently on different executors). In that case, it is + // possible the fetch failure has already been handled by the scheduler. if (runningStages.contains(failedStage)) { logInfo(s"Marking $failedStage (${failedStage.name}) as failed " + s"due to a fetch failure from $mapStage (${mapStage.name})") diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 6232bb652f861..090352c3d8b95 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -611,15 +611,16 @@ class DAGSchedulerSuite sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(countSubmittedMapStageAttempts() === 2) - // NOTE: the actual ResubmitFailedStages may get called at any time during this, shouldn't effect anything -- - // our calling it just makes *SURE* it gets called between the desired event and our check. - + // NOTE: the actual ResubmitFailedStages may get called at any time during this, shouldn't + // effect anything -- our calling it just makes *SURE* it gets called between the desired event + // and our check. } /** This tests the case where a late FetchFailed comes in after the map stage has finished getting * retried and a new reduce stage starts running. */ - test("extremely late fetch failures don't cause multiple concurrent attempts for the same stage") { + test("extremely late fetch failures don't cause multiple concurrent attempts for " + + "the same stage") { val shuffleMapRdd = new MyRDD(sc, 2, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) val shuffleId = shuffleDep.shuffleId From ecb4e7db817af6199642e96a9f6465b8f4b695d9 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Wed, 10 Jun 2015 23:58:40 -0500 Subject: [PATCH 10/32] debugging printlns --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 3 +++ .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 1 + 2 files changed, 4 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index ca3f04b973479..b86e6278ca87e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1046,6 +1046,7 @@ class DAGScheduler( if (runningStages.contains(shuffleStage) && shuffleStage.pendingTasks.isEmpty) { markStageAsFinished(shuffleStage) + println(s"marking $shuffleStage as finished") logInfo("looking for newly runnable stages") logInfo("running: " + runningStages) logInfo("waiting: " + waitingStages) @@ -1072,6 +1073,7 @@ class DAGScheduler( .map(_._2).mkString(", ")) submitStage(shuffleStage) } else { + println(s"looking for newly runnable stage") val newlyRunnable = new ArrayBuffer[Stage] for (shuffleStage <- waitingStages) { logInfo("Missing parents for " + shuffleStage + ": " + @@ -1081,6 +1083,7 @@ class DAGScheduler( { newlyRunnable += shuffleStage } + println(s"newly runnable stages = $newlyRunnable") waitingStages --= newlyRunnable runningStages ++= newlyRunnable for { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 090352c3d8b95..0cd07656821c0 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -88,6 +88,7 @@ class DAGSchedulerSuite // normally done by TaskSetManager taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch) taskSets += taskSet + println(s"submitting taskSet $taskSet. taskSets = $taskSets") } override def cancelTasks(stageId: Int, interruptThread: Boolean) { cancelledStages += stageId From 9601b47b1fd84a0e371173d4aed4632b24f14580 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Thu, 11 Jun 2015 07:18:28 -0500 Subject: [PATCH 11/32] more debug printlns --- .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 0cd07656821c0..66f6fb5f2b10d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -558,6 +558,7 @@ class DAGSchedulerSuite /** This tests the case where another FetchFailed comes in while the map stage is getting * re-run. */ test("late fetch failures don't cause multiple concurrent attempts for the same map stage") { + println("begin late fetch failure") val shuffleMapRdd = new MyRDD(sc, 2, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) val shuffleId = shuffleDep.shuffleId @@ -572,6 +573,7 @@ class DAGSchedulerSuite // The map stage should have been submitted. assert(countSubmittedMapStageAttempts() === 1) + println("late fetch failure: taskSets = " + taskSets) complete(taskSets(0), Seq( (Success, makeMapStatus("hostA", 1)), (Success, makeMapStatus("hostB", 1)))) @@ -579,6 +581,7 @@ class DAGSchedulerSuite assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) === Array("hostA", "hostB")) + println("late fetch failure: taskSets = " + taskSets) // The first result task fails, with a fetch failure for the output from the first mapper. runEvent(CompletionEvent( taskSets(1).tasks(0), @@ -622,6 +625,7 @@ class DAGSchedulerSuite */ test("extremely late fetch failures don't cause multiple concurrent attempts for " + "the same stage") { + println("begin extremely late fetch failure") val shuffleMapRdd = new MyRDD(sc, 2, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) val shuffleId = shuffleDep.shuffleId @@ -639,6 +643,7 @@ class DAGSchedulerSuite sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(countSubmittedMapStageAttempts() === 1) + println("extremely late fetch failure: taskSets = " + taskSets) // Complete the map stage. complete(taskSets(0), Seq( (Success, makeMapStatus("hostA", 1)), @@ -648,6 +653,7 @@ class DAGSchedulerSuite sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(countSubmittedReduceStageAttempts() === 1) + println("extremely late fetch failure: taskSets = " + taskSets) // The first result task fails, with a fetch failure for the output from the first mapper. runEvent(CompletionEvent( taskSets(1).tasks(0), From 89a59b6a6ece1ec3d0ed3b3855f4daca094932c0 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Thu, 11 Jun 2015 09:54:31 -0500 Subject: [PATCH 12/32] more printlns ... --- .../apache/spark/scheduler/DAGScheduler.scala | 5 ++++- .../spark/scheduler/DAGSchedulerSuite.scala | 20 ++++++++++++++++++- 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index b86e6278ca87e..38dd07af11da1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -835,6 +835,7 @@ class DAGScheduler( stage.pendingTasks.clear() // First figure out the indexes of partition ids to compute. + println(s"finding partitions to compute for $stage") val partitionsToCompute: Seq[Int] = { stage match { case stage: ShuffleMapStage => @@ -928,6 +929,7 @@ class DAGScheduler( s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})" } logDebug(debugString) + println(debugString) } } @@ -1083,7 +1085,8 @@ class DAGScheduler( { newlyRunnable += shuffleStage } - println(s"newly runnable stages = $newlyRunnable") + val newlyRunnableWithJob = newlyRunnable.map{x => x -> activeJobForStage(x)} + println(s"newly runnable stages = $newlyRunnableWithJob") waitingStages --= newlyRunnable runningStages ++= newlyRunnable for { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 66f6fb5f2b10d..2fd4c78251dc0 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -563,7 +563,13 @@ class DAGSchedulerSuite val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) val shuffleId = shuffleDep.shuffleId val reduceRdd = new MyRDD(sc, 2, List(shuffleDep)) - submit(reduceRdd, Array(0, 1)) + val jobId = submit(reduceRdd, Array(0, 1)) + println(s"late fetch failure: jobId = $jobId") + println(s"late fetch failure: jobToStages = ${scheduler.jobIdToStageIds}") + println(s"late fetch failure: jobToActiveJob = ${scheduler.jobIdToActiveJob}") + println(s"late fetch failure: waitingStages = ${scheduler.waitingStages}") + println(s"late fetch failure: runningStages = ${scheduler.runningStages}") + println(s"late fetch failure: failedStages = ${scheduler.failedStages}") val mapStageId = 0 def countSubmittedMapStageAttempts(): Int = { @@ -574,6 +580,11 @@ class DAGSchedulerSuite assert(countSubmittedMapStageAttempts() === 1) println("late fetch failure: taskSets = " + taskSets) + println(s"late fetch failure: jobToStages = ${scheduler.jobIdToStageIds}") + println(s"late fetch failure: jobToActiveJob = ${scheduler.jobIdToActiveJob}") + println(s"late fetch failure: waitingStages = ${scheduler.waitingStages}") + println(s"late fetch failure: runningStages = ${scheduler.runningStages}") + println(s"late fetch failure: failedStages = ${scheduler.failedStages}") complete(taskSets(0), Seq( (Success, makeMapStatus("hostA", 1)), (Success, makeMapStatus("hostB", 1)))) @@ -582,6 +593,13 @@ class DAGSchedulerSuite Array("hostA", "hostB")) println("late fetch failure: taskSets = " + taskSets) + println("late fetch failure: submittedStages = " + sparkListener.submittedStageInfos) + println(s"late fetch failure: jobToStages = ${scheduler.jobIdToStageIds}") + println(s"late fetch failure: jobToActiveJob = ${scheduler.jobIdToActiveJob}") + println(s"late fetch failure: waitingStages = ${scheduler.waitingStages}") + println(s"late fetch failure: runningStages = ${scheduler.runningStages}") + println(s"late fetch failure: failedStages = ${scheduler.failedStages}") + // The first result task fails, with a fetch failure for the output from the first mapper. runEvent(CompletionEvent( taskSets(1).tasks(0), From cb245da3921b7ac1e54edc2f6941fda661a69a75 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Thu, 11 Jun 2015 11:26:48 -0500 Subject: [PATCH 13/32] finally found the issue ... clean up debug stuff --- .../apache/spark/scheduler/DAGScheduler.scala | 5 --- .../spark/scheduler/DAGSchedulerSuite.scala | 37 ++++--------------- 2 files changed, 7 insertions(+), 35 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index de41053176313..e1bc3413819b6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -851,7 +851,6 @@ class DAGScheduler( stage.pendingTasks.clear() // First figure out the indexes of partition ids to compute. - println(s"finding partitions to compute for $stage") val partitionsToCompute: Seq[Int] = { stage match { case stage: ShuffleMapStage => @@ -945,7 +944,6 @@ class DAGScheduler( s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})" } logDebug(debugString) - println(debugString) } } @@ -1064,7 +1062,6 @@ class DAGScheduler( if (runningStages.contains(shuffleStage) && shuffleStage.pendingTasks.isEmpty) { markStageAsFinished(shuffleStage) - println(s"marking $shuffleStage as finished") logInfo("looking for newly runnable stages") logInfo("running: " + runningStages) logInfo("waiting: " + waitingStages) @@ -1091,7 +1088,6 @@ class DAGScheduler( .map(_._2).mkString(", ")) submitStage(shuffleStage) } else { - println(s"looking for newly runnable stage") val newlyRunnable = new ArrayBuffer[Stage] for (shuffleStage <- waitingStages) { logInfo("Missing parents for " + shuffleStage + ": " + @@ -1102,7 +1098,6 @@ class DAGScheduler( newlyRunnable += shuffleStage } val newlyRunnableWithJob = newlyRunnable.map{x => x -> activeJobForStage(x)} - println(s"newly runnable stages = $newlyRunnableWithJob") waitingStages --= newlyRunnable runningStages ++= newlyRunnable for { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 3f65996ea5fe8..63610add8e747 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -88,7 +88,6 @@ class DAGSchedulerSuite // normally done by TaskSetManager taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch) taskSets += taskSet - println(s"submitting taskSet $taskSet. taskSets = $taskSets") } override def cancelTasks(stageId: Int, interruptThread: Boolean) { cancelledStages += stageId @@ -558,18 +557,11 @@ class DAGSchedulerSuite /** This tests the case where another FetchFailed comes in while the map stage is getting * re-run. */ test("late fetch failures don't cause multiple concurrent attempts for the same map stage") { - println("begin late fetch failure") val shuffleMapRdd = new MyRDD(sc, 2, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) val shuffleId = shuffleDep.shuffleId val reduceRdd = new MyRDD(sc, 2, List(shuffleDep)) - val jobId = submit(reduceRdd, Array(0, 1)) - println(s"late fetch failure: jobId = $jobId") - println(s"late fetch failure: jobToStages = ${scheduler.jobIdToStageIds}") - println(s"late fetch failure: jobToActiveJob = ${scheduler.jobIdToActiveJob}") - println(s"late fetch failure: waitingStages = ${scheduler.waitingStages}") - println(s"late fetch failure: runningStages = ${scheduler.runningStages}") - println(s"late fetch failure: failedStages = ${scheduler.failedStages}") + submit(reduceRdd, Array(0, 1)) val mapStageId = 0 def countSubmittedMapStageAttempts(): Int = { @@ -579,26 +571,14 @@ class DAGSchedulerSuite // The map stage should have been submitted. assert(countSubmittedMapStageAttempts() === 1) - println("late fetch failure: taskSets = " + taskSets) - println(s"late fetch failure: jobToStages = ${scheduler.jobIdToStageIds}") - println(s"late fetch failure: jobToActiveJob = ${scheduler.jobIdToActiveJob}") - println(s"late fetch failure: waitingStages = ${scheduler.waitingStages}") - println(s"late fetch failure: runningStages = ${scheduler.runningStages}") - println(s"late fetch failure: failedStages = ${scheduler.failedStages}") complete(taskSets(0), Seq( - (Success, makeMapStatus("hostA", 1)), - (Success, makeMapStatus("hostB", 1)))) + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2)))) // The MapOutputTracker should know about both map output locations. assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) === Array("hostA", "hostB")) - - println("late fetch failure: taskSets = " + taskSets) - println("late fetch failure: submittedStages = " + sparkListener.submittedStageInfos) - println(s"late fetch failure: jobToStages = ${scheduler.jobIdToStageIds}") - println(s"late fetch failure: jobToActiveJob = ${scheduler.jobIdToActiveJob}") - println(s"late fetch failure: waitingStages = ${scheduler.waitingStages}") - println(s"late fetch failure: runningStages = ${scheduler.runningStages}") - println(s"late fetch failure: failedStages = ${scheduler.failedStages}") + assert(mapOutputTracker.getServerStatuses(shuffleId, 1).map(_._1.host) === + Array("hostA", "hostB")) // The first result task fails, with a fetch failure for the output from the first mapper. runEvent(CompletionEvent( @@ -643,7 +623,6 @@ class DAGSchedulerSuite */ test("extremely late fetch failures don't cause multiple concurrent attempts for " + "the same stage") { - println("begin extremely late fetch failure") val shuffleMapRdd = new MyRDD(sc, 2, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) val shuffleId = shuffleDep.shuffleId @@ -661,17 +640,15 @@ class DAGSchedulerSuite sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(countSubmittedMapStageAttempts() === 1) - println("extremely late fetch failure: taskSets = " + taskSets) // Complete the map stage. complete(taskSets(0), Seq( - (Success, makeMapStatus("hostA", 1)), - (Success, makeMapStatus("hostB", 1)))) + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2)))) // The reduce stage should have been submitted. sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(countSubmittedReduceStageAttempts() === 1) - println("extremely late fetch failure: taskSets = " + taskSets) // The first result task fails, with a fetch failure for the output from the first mapper. runEvent(CompletionEvent( taskSets(1).tasks(0), From 46bc26aadfe1dcd5cbf37b8d3754bbe019d9664e Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Thu, 11 Jun 2015 11:28:33 -0500 Subject: [PATCH 14/32] more cleanup of debug garbage --- .../src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index e1bc3413819b6..6f7e9aa5cfc16 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1097,7 +1097,6 @@ class DAGScheduler( { newlyRunnable += shuffleStage } - val newlyRunnableWithJob = newlyRunnable.map{x => x -> activeJobForStage(x)} waitingStages --= newlyRunnable runningStages ++= newlyRunnable for { From ada7726d4599875355114e59a7d5444f05df4685 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Wed, 1 Jul 2015 15:18:28 -0500 Subject: [PATCH 15/32] reviewer feedback --- .../apache/spark/scheduler/TaskSchedulerImpl.scala | 13 ++++++++----- .../apache/spark/scheduler/DAGSchedulerSuite.scala | 14 ++++++++------ .../spark/scheduler/TaskSchedulerImplSuite.scala | 10 +++++++++- 3 files changed, 25 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 60173e21b64a8..0a9181345add9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -163,11 +163,14 @@ private[spark] class TaskSchedulerImpl( this.synchronized { val manager = createTaskSetManager(taskSet, maxTaskFailures) activeTaskSets(taskSet.id) = manager - val taskSetsPerStage = activeTaskSets.values.filterNot(_.isZombie).groupBy(_.stageId) - taskSetsPerStage.foreach { case (stage, taskSets) => - if (taskSets.size > 1) { - throw new SparkIllegalStateException("more than one active taskSet for stage " + stage) - } + val stage = taskSet.stageId + val conflictingTaskSet = activeTaskSets.exists { case (id, ts) => + // if the id matches, it really should be the same taskSet, but in some unit tests + // we add new taskSets with the same id + id != taskSet.id && !ts.isZombie && ts.stageId == stage + } + if (conflictingTaskSet) { + throw new SparkIllegalStateException(s"more than one active taskSet for stage $stage") } schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index fd60bf5782590..38bf4f79d6bf0 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -554,8 +554,10 @@ class DAGSchedulerSuite assert(sparkListener.failedStages.size == 1) } - /** This tests the case where another FetchFailed comes in while the map stage is getting - * re-run. */ + /** + * This tests the case where another FetchFailed comes in while the map stage is getting + * re-run. + */ test("late fetch failures don't cause multiple concurrent attempts for the same map stage") { val shuffleMapRdd = new MyRDD(sc, 2, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) @@ -607,15 +609,15 @@ class DAGSchedulerSuite createFakeTaskInfo(), null)) - // Another ResubmitFailedStages event should not result result in another attempt for the map + // Another ResubmitFailedStages event should not result in another attempt for the map // stage being run concurrently. + // NOTE: the actual ResubmitFailedStages may get called at any time during this, shouldn't + // effect anything -- our calling it just makes *SURE* it gets called between the desired event + // and our check. runEvent(ResubmitFailedStages) sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(countSubmittedMapStageAttempts() === 2) - // NOTE: the actual ResubmitFailedStages may get called at any time during this, shouldn't - // effect anything -- our calling it just makes *SURE* it gets called between the desired event - // and our check. } /** This tests the case where a late FetchFailed comes in after the map stage has finished getting diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 07bdb84cd756f..8af47a0809e0d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -141,7 +141,15 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L val attempt1 = new TaskSet(Array(new FakeTask(0)), 0, 0, 0, null) val attempt2 = new TaskSet(Array(new FakeTask(0)), 0, 1, 0, null) taskScheduler.submitTasks(attempt1) - intercept[SparkIllegalStateException] { taskScheduler.submitTasks(attempt2)} + intercept[SparkIllegalStateException] { taskScheduler.submitTasks(attempt2) } + + // OK to submit multiple if previous attempts are all zombie + taskScheduler.activeTaskSets(attempt1.id).isZombie = true + taskScheduler.submitTasks(attempt2) + val attempt3 = new TaskSet(Array(new FakeTask(0)), 0, 2, 0, null) + intercept[SparkIllegalStateException] { taskScheduler.submitTasks(attempt3) } + taskScheduler.activeTaskSets(attempt2.id).isZombie = true + taskScheduler.submitTasks(attempt3) } } From 6542b4297e31d02cd1d0e808574f222fa9c2fbc1 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Thu, 2 Jul 2015 16:58:45 -0500 Subject: [PATCH 16/32] remove extra stageAttemptId --- core/src/main/scala/org/apache/spark/TaskContextImpl.scala | 1 - core/src/main/scala/org/apache/spark/scheduler/Task.scala | 1 - core/src/test/java/org/apache/spark/JavaAPISuite.java | 2 +- 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala index 077fa038bbe69..b4d572cb52313 100644 --- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala +++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala @@ -30,7 +30,6 @@ private[spark] class TaskContextImpl( override val attemptNumber: Int, override val taskMemoryManager: TaskMemoryManager, val runningLocally: Boolean = false, - val stageAttemptId: Int = 0, // for testing val taskMetrics: TaskMetrics = TaskMetrics.empty) extends TaskContext with Logging { diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 1f87977af9260..4172356d63044 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -58,7 +58,6 @@ private[spark] abstract class Task[T]( final def run(taskAttemptId: Long, attemptNumber: Int): T = { context = new TaskContextImpl( stageId = stageId, - stageAttemptId = stageAttemptId, partitionId = partitionId, taskAttemptId = taskAttemptId, attemptNumber = attemptNumber, diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 4152f96cf4def..dfd86d3e51e7d 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -1011,7 +1011,7 @@ public void persist() { @Test public void iterator() { JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2); - TaskContext context = new TaskContextImpl(0, 0, 0L, 0, null, false, 0, new TaskMetrics()); + TaskContext context = new TaskContextImpl(0, 0, 0L, 0, null, false, new TaskMetrics()); Assert.assertEquals(1, rdd.iterator(rdd.partitions().get(0), context).next().intValue()); } From b2faef52a5a2b49e8196a2f79f1f0c5349afd63e Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 7 Jul 2015 11:35:37 -0500 Subject: [PATCH 17/32] faster check for conflicting task sets --- .../spark/scheduler/TaskSchedulerImpl.scala | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 0a9181345add9..014d3c126f70b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -76,6 +76,7 @@ private[spark] class TaskSchedulerImpl( // TaskSetManagers are not thread safe, so any access to one should be synchronized // on this class. val activeTaskSets = new HashMap[String, TaskSetManager] + val taskSetsByStage = new HashMap[Int, HashMap[Int, TaskSetManager]] val taskIdToTaskSetId = new HashMap[Long, String] val taskIdToExecutorId = new HashMap[Long, String] @@ -164,13 +165,14 @@ private[spark] class TaskSchedulerImpl( val manager = createTaskSetManager(taskSet, maxTaskFailures) activeTaskSets(taskSet.id) = manager val stage = taskSet.stageId - val conflictingTaskSet = activeTaskSets.exists { case (id, ts) => - // if the id matches, it really should be the same taskSet, but in some unit tests - // we add new taskSets with the same id - id != taskSet.id && !ts.isZombie && ts.stageId == stage + val stageTaskSets = taskSetsByStage.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager]) + stageTaskSets(taskSet.attempt) = manager + val conflictingTaskSet = stageTaskSets.exists { case (_, ts) => + ts.taskSet != taskSet && !ts.isZombie } if (conflictingTaskSet) { - throw new SparkIllegalStateException(s"more than one active taskSet for stage $stage") + throw new SparkIllegalStateException(s"more than one active taskSet for stage $stage:" + + s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}") } schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) @@ -224,6 +226,12 @@ private[spark] class TaskSchedulerImpl( */ def taskSetFinished(manager: TaskSetManager): Unit = synchronized { activeTaskSets -= manager.taskSet.id + taskSetsByStage.get(manager.taskSet.stageId).foreach { taskSetsForStage => + taskSetsForStage -= manager.taskSet.attempt + if (taskSetsForStage.isEmpty) { + taskSetsByStage -= manager.taskSet.stageId + } + } manager.parent.removeSchedulable(manager) logInfo("Removed TaskSet %s, whose tasks have all completed, from pool %s" .format(manager.taskSet.id, manager.parent.name)) From 517b6e5398bc4466b332af30cb636fd9b63cc967 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 7 Jul 2015 12:15:18 -0500 Subject: [PATCH 18/32] get rid of SparkIllegalStateException --- .../main/scala/org/apache/spark/SparkException.scala | 10 ---------- .../org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../spark/scheduler/TaskSchedulerImplSuite.scala | 4 ++-- 3 files changed, 3 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkException.scala b/core/src/main/scala/org/apache/spark/SparkException.scala index b7c2386fd7d87..2ebd7a7151a59 100644 --- a/core/src/main/scala/org/apache/spark/SparkException.scala +++ b/core/src/main/scala/org/apache/spark/SparkException.scala @@ -30,13 +30,3 @@ class SparkException(message: String, cause: Throwable) */ private[spark] class SparkDriverExecutionException(cause: Throwable) extends SparkException("Execution error", cause) - -/** - * Exception indicating an error internal to Spark -- it is in an inconsistent state, not due - * to any error by the user - */ -class SparkIllegalStateException(message: String, cause: Throwable) - extends SparkException(message, cause) { - - def this(message: String) = this(message, null) -} diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 014d3c126f70b..4eebff8dbb516 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -171,7 +171,7 @@ private[spark] class TaskSchedulerImpl( ts.taskSet != taskSet && !ts.isZombie } if (conflictingTaskSet) { - throw new SparkIllegalStateException(s"more than one active taskSet for stage $stage:" + + throw new IllegalStateException(s"more than one active taskSet for stage $stage:" + s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}") } schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 8af47a0809e0d..55be409afcf31 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -141,13 +141,13 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L val attempt1 = new TaskSet(Array(new FakeTask(0)), 0, 0, 0, null) val attempt2 = new TaskSet(Array(new FakeTask(0)), 0, 1, 0, null) taskScheduler.submitTasks(attempt1) - intercept[SparkIllegalStateException] { taskScheduler.submitTasks(attempt2) } + intercept[IllegalStateException] { taskScheduler.submitTasks(attempt2) } // OK to submit multiple if previous attempts are all zombie taskScheduler.activeTaskSets(attempt1.id).isZombie = true taskScheduler.submitTasks(attempt2) val attempt3 = new TaskSet(Array(new FakeTask(0)), 0, 2, 0, null) - intercept[SparkIllegalStateException] { taskScheduler.submitTasks(attempt3) } + intercept[IllegalStateException] { taskScheduler.submitTasks(attempt3) } taskScheduler.activeTaskSets(attempt2.id).isZombie = true taskScheduler.submitTasks(attempt3) } From 227b40d0173297af60f929bc8d76ba0b60e4476c Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 7 Jul 2015 12:17:00 -0500 Subject: [PATCH 19/32] style --- .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 38bf4f79d6bf0..25cbc3f884b00 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -620,7 +620,8 @@ class DAGSchedulerSuite } - /** This tests the case where a late FetchFailed comes in after the map stage has finished getting + /** + * This tests the case where a late FetchFailed comes in after the map stage has finished getting * retried and a new reduce stage starts running. */ test("extremely late fetch failures don't cause multiple concurrent attempts for " + From a5f7c8c0f0775529f381afa498d0a934eb0e0ff5 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 7 Jul 2015 12:22:45 -0500 Subject: [PATCH 20/32] remove comment for reviewers --- .../src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index e612c4dd2dd47..531581dbd5909 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1001,7 +1001,6 @@ class DAGScheduler( val stageId = task.stageId val taskType = Utils.getFormattedClassName(task) - // REVIEWERS: does this need special handling for multiple completions of the same task? outputCommitCoordinator.taskCompleted(stageId, task.partitionId, event.taskInfo.attempt, event.reason) From 19685bba6d4615b9b725032a070249255caa0ff8 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 7 Jul 2015 12:35:54 -0500 Subject: [PATCH 21/32] switch to using latestInfo.attemptId, and add comments --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 5 ++++- .../src/main/scala/org/apache/spark/scheduler/Stage.scala | 8 +++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 531581dbd5909..c6029675eab0e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1126,7 +1126,10 @@ class DAGScheduler( case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) => val failedStage = stageIdToStage(task.stageId) val mapStage = shuffleToMapStage(shuffleId) - if (failedStage.attemptId - 1 > task.stageAttemptId) { + + // failedStage.attemptId is already on the next attempt, so we have to use + // failedStage.latestInfo.attemptId + if (failedStage.latestInfo.attemptId != task.stageAttemptId) { logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" + s" ${task.stageAttemptId}, which has already failed") } else { diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index c59d6e4f5bc04..14ab2b86e1b77 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -77,7 +77,13 @@ private[spark] abstract class Stage( id } - def attemptId: Int = nextAttemptId + /** + * The id for the **next** stage attempt. + * + * The unusual meaning of this method means its unlikely to hold the value you are interested in + * -- you probably want to use [[latestInfo.attemptId]] + */ + private[spark] def attemptId: Int = nextAttemptId override final def hashCode(): Int = id override final def equals(other: Any): Boolean = other match { From baf46e19829f9693ceb1a54457b4e1c3602ba560 Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Tue, 7 Jul 2015 18:05:38 -0700 Subject: [PATCH 22/32] Index active task sets by stage Id rather than by task set id --- .../spark/scheduler/TaskSchedulerImpl.scala | 49 +++++++------------ .../CoarseGrainedSchedulerBackend.scala | 4 +- .../scheduler/TaskSchedulerImplSuite.scala | 4 +- 3 files changed, 23 insertions(+), 34 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 4eebff8dbb516..0a89761108726 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -75,10 +75,9 @@ private[spark] class TaskSchedulerImpl( // TaskSetManagers are not thread safe, so any access to one should be synchronized // on this class. - val activeTaskSets = new HashMap[String, TaskSetManager] - val taskSetsByStage = new HashMap[Int, HashMap[Int, TaskSetManager]] + val stageIdToActiveTaskSet = new HashMap[Int, TaskSetManager] - val taskIdToTaskSetId = new HashMap[Long, String] + val taskIdToStageId = new HashMap[Long, Int] val taskIdToExecutorId = new HashMap[Long, String] @volatile private var hasReceivedTask = false @@ -163,17 +162,13 @@ private[spark] class TaskSchedulerImpl( logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") this.synchronized { val manager = createTaskSetManager(taskSet, maxTaskFailures) - activeTaskSets(taskSet.id) = manager - val stage = taskSet.stageId - val stageTaskSets = taskSetsByStage.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager]) - stageTaskSets(taskSet.attempt) = manager - val conflictingTaskSet = stageTaskSets.exists { case (_, ts) => - ts.taskSet != taskSet && !ts.isZombie - } - if (conflictingTaskSet) { - throw new IllegalStateException(s"more than one active taskSet for stage $stage:" + - s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}") + stageIdToActiveTaskSet(taskSet.stageId) = manager + val stageId = taskSet.stageId + stageIdToActiveTaskSet.get(stageId).map { activeTaskSet => + throw new IllegalStateException( + s"Active taskSet with id already exists for stage $stageId: ${activeTaskSet.taskSet.id}") } + stageIdToActiveTaskSet(stageId) = manager schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) if (!isLocal && !hasReceivedTask) { @@ -203,7 +198,7 @@ private[spark] class TaskSchedulerImpl( override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized { logInfo("Cancelling stage " + stageId) - activeTaskSets.find(_._2.stageId == stageId).foreach { case (_, tsm) => + stageIdToActiveTaskSet.get(stageId).map {tsm => // There are two possible cases here: // 1. The task set manager has been created and some tasks have been scheduled. // In this case, send a kill signal to the executors to kill the task and then abort @@ -225,13 +220,7 @@ private[spark] class TaskSchedulerImpl( * cleaned up. */ def taskSetFinished(manager: TaskSetManager): Unit = synchronized { - activeTaskSets -= manager.taskSet.id - taskSetsByStage.get(manager.taskSet.stageId).foreach { taskSetsForStage => - taskSetsForStage -= manager.taskSet.attempt - if (taskSetsForStage.isEmpty) { - taskSetsByStage -= manager.taskSet.stageId - } - } + stageIdToActiveTaskSet -= manager.stageId manager.parent.removeSchedulable(manager) logInfo("Removed TaskSet %s, whose tasks have all completed, from pool %s" .format(manager.taskSet.id, manager.parent.name)) @@ -252,7 +241,7 @@ private[spark] class TaskSchedulerImpl( for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { tasks(i) += task val tid = task.taskId - taskIdToTaskSetId(tid) = taskSet.taskSet.id + taskIdToStageId(tid) = taskSet.taskSet.stageId taskIdToExecutorId(tid) = execId executorsByHost(host) += execId availableCpus(i) -= CPUS_PER_TASK @@ -336,13 +325,13 @@ private[spark] class TaskSchedulerImpl( failedExecutor = Some(execId) } } - taskIdToTaskSetId.get(tid) match { - case Some(taskSetId) => + taskIdToStageId.get(tid) match { + case Some(stageId) => if (TaskState.isFinished(state)) { - taskIdToTaskSetId.remove(tid) + taskIdToStageId.remove(tid) taskIdToExecutorId.remove(tid) } - activeTaskSets.get(taskSetId).foreach { taskSet => + stageIdToActiveTaskSet.get(stageId).foreach { taskSet => if (state == TaskState.FINISHED) { taskSet.removeRunningTask(tid) taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) @@ -380,8 +369,8 @@ private[spark] class TaskSchedulerImpl( val metricsWithStageIds: Array[(Long, Int, Int, TaskMetrics)] = synchronized { taskMetrics.flatMap { case (id, metrics) => - taskIdToTaskSetId.get(id) - .flatMap(activeTaskSets.get) + taskIdToStageId.get(id) + .flatMap(stageIdToActiveTaskSet.get) .map(taskSetMgr => (id, taskSetMgr.stageId, taskSetMgr.taskSet.attempt, metrics)) } } @@ -414,9 +403,9 @@ private[spark] class TaskSchedulerImpl( def error(message: String) { synchronized { - if (activeTaskSets.nonEmpty) { + if (stageIdToActiveTaskSet.nonEmpty) { // Have each task set throw a SparkException with the error - for ((taskSetId, manager) <- activeTaskSets) { + for ((_, manager) <- stageIdToActiveTaskSet) { try { manager.abort(message) } catch { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 7c7f70d8a193b..f2bd76aaef8ee 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -191,8 +191,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp for (task <- tasks.flatten) { val serializedTask = ser.serialize(task) if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) { - val taskSetId = scheduler.taskIdToTaskSetId(task.taskId) - scheduler.activeTaskSets.get(taskSetId).foreach { taskSet => + val taskSetId = scheduler.taskIdToStageId(task.taskId) + scheduler.stageIdToActiveTaskSet.get(taskSetId).foreach { taskSet => try { var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " + diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 55be409afcf31..48eda6741b8d6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -144,11 +144,11 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L intercept[IllegalStateException] { taskScheduler.submitTasks(attempt2) } // OK to submit multiple if previous attempts are all zombie - taskScheduler.activeTaskSets(attempt1.id).isZombie = true + taskScheduler.stageIdToActiveTaskSet(attempt1.stageId).isZombie = true taskScheduler.submitTasks(attempt2) val attempt3 = new TaskSet(Array(new FakeTask(0)), 0, 2, 0, null) intercept[IllegalStateException] { taskScheduler.submitTasks(attempt3) } - taskScheduler.activeTaskSets(attempt2.id).isZombie = true + taskScheduler.stageIdToActiveTaskSet(attempt2.stageId).isZombie = true taskScheduler.submitTasks(attempt3) } From c0d4d9051c508b08c830e1958aedb93e02838c8b Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 14 Jul 2015 11:51:42 -0500 Subject: [PATCH 23/32] Revert "Index active task sets by stage Id rather than by task set id" This reverts commit baf46e19829f9693ceb1a54457b4e1c3602ba560. --- .../spark/scheduler/TaskSchedulerImpl.scala | 49 ++++++++++++------- .../CoarseGrainedSchedulerBackend.scala | 4 +- .../scheduler/TaskSchedulerImplSuite.scala | 4 +- 3 files changed, 34 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 0a89761108726..4eebff8dbb516 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -75,9 +75,10 @@ private[spark] class TaskSchedulerImpl( // TaskSetManagers are not thread safe, so any access to one should be synchronized // on this class. - val stageIdToActiveTaskSet = new HashMap[Int, TaskSetManager] + val activeTaskSets = new HashMap[String, TaskSetManager] + val taskSetsByStage = new HashMap[Int, HashMap[Int, TaskSetManager]] - val taskIdToStageId = new HashMap[Long, Int] + val taskIdToTaskSetId = new HashMap[Long, String] val taskIdToExecutorId = new HashMap[Long, String] @volatile private var hasReceivedTask = false @@ -162,13 +163,17 @@ private[spark] class TaskSchedulerImpl( logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") this.synchronized { val manager = createTaskSetManager(taskSet, maxTaskFailures) - stageIdToActiveTaskSet(taskSet.stageId) = manager - val stageId = taskSet.stageId - stageIdToActiveTaskSet.get(stageId).map { activeTaskSet => - throw new IllegalStateException( - s"Active taskSet with id already exists for stage $stageId: ${activeTaskSet.taskSet.id}") + activeTaskSets(taskSet.id) = manager + val stage = taskSet.stageId + val stageTaskSets = taskSetsByStage.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager]) + stageTaskSets(taskSet.attempt) = manager + val conflictingTaskSet = stageTaskSets.exists { case (_, ts) => + ts.taskSet != taskSet && !ts.isZombie + } + if (conflictingTaskSet) { + throw new IllegalStateException(s"more than one active taskSet for stage $stage:" + + s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}") } - stageIdToActiveTaskSet(stageId) = manager schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) if (!isLocal && !hasReceivedTask) { @@ -198,7 +203,7 @@ private[spark] class TaskSchedulerImpl( override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized { logInfo("Cancelling stage " + stageId) - stageIdToActiveTaskSet.get(stageId).map {tsm => + activeTaskSets.find(_._2.stageId == stageId).foreach { case (_, tsm) => // There are two possible cases here: // 1. The task set manager has been created and some tasks have been scheduled. // In this case, send a kill signal to the executors to kill the task and then abort @@ -220,7 +225,13 @@ private[spark] class TaskSchedulerImpl( * cleaned up. */ def taskSetFinished(manager: TaskSetManager): Unit = synchronized { - stageIdToActiveTaskSet -= manager.stageId + activeTaskSets -= manager.taskSet.id + taskSetsByStage.get(manager.taskSet.stageId).foreach { taskSetsForStage => + taskSetsForStage -= manager.taskSet.attempt + if (taskSetsForStage.isEmpty) { + taskSetsByStage -= manager.taskSet.stageId + } + } manager.parent.removeSchedulable(manager) logInfo("Removed TaskSet %s, whose tasks have all completed, from pool %s" .format(manager.taskSet.id, manager.parent.name)) @@ -241,7 +252,7 @@ private[spark] class TaskSchedulerImpl( for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { tasks(i) += task val tid = task.taskId - taskIdToStageId(tid) = taskSet.taskSet.stageId + taskIdToTaskSetId(tid) = taskSet.taskSet.id taskIdToExecutorId(tid) = execId executorsByHost(host) += execId availableCpus(i) -= CPUS_PER_TASK @@ -325,13 +336,13 @@ private[spark] class TaskSchedulerImpl( failedExecutor = Some(execId) } } - taskIdToStageId.get(tid) match { - case Some(stageId) => + taskIdToTaskSetId.get(tid) match { + case Some(taskSetId) => if (TaskState.isFinished(state)) { - taskIdToStageId.remove(tid) + taskIdToTaskSetId.remove(tid) taskIdToExecutorId.remove(tid) } - stageIdToActiveTaskSet.get(stageId).foreach { taskSet => + activeTaskSets.get(taskSetId).foreach { taskSet => if (state == TaskState.FINISHED) { taskSet.removeRunningTask(tid) taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) @@ -369,8 +380,8 @@ private[spark] class TaskSchedulerImpl( val metricsWithStageIds: Array[(Long, Int, Int, TaskMetrics)] = synchronized { taskMetrics.flatMap { case (id, metrics) => - taskIdToStageId.get(id) - .flatMap(stageIdToActiveTaskSet.get) + taskIdToTaskSetId.get(id) + .flatMap(activeTaskSets.get) .map(taskSetMgr => (id, taskSetMgr.stageId, taskSetMgr.taskSet.attempt, metrics)) } } @@ -403,9 +414,9 @@ private[spark] class TaskSchedulerImpl( def error(message: String) { synchronized { - if (stageIdToActiveTaskSet.nonEmpty) { + if (activeTaskSets.nonEmpty) { // Have each task set throw a SparkException with the error - for ((_, manager) <- stageIdToActiveTaskSet) { + for ((taskSetId, manager) <- activeTaskSets) { try { manager.abort(message) } catch { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index f2bd76aaef8ee..7c7f70d8a193b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -191,8 +191,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp for (task <- tasks.flatten) { val serializedTask = ser.serialize(task) if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) { - val taskSetId = scheduler.taskIdToStageId(task.taskId) - scheduler.stageIdToActiveTaskSet.get(taskSetId).foreach { taskSet => + val taskSetId = scheduler.taskIdToTaskSetId(task.taskId) + scheduler.activeTaskSets.get(taskSetId).foreach { taskSet => try { var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " + diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 48eda6741b8d6..55be409afcf31 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -144,11 +144,11 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L intercept[IllegalStateException] { taskScheduler.submitTasks(attempt2) } // OK to submit multiple if previous attempts are all zombie - taskScheduler.stageIdToActiveTaskSet(attempt1.stageId).isZombie = true + taskScheduler.activeTaskSets(attempt1.id).isZombie = true taskScheduler.submitTasks(attempt2) val attempt3 = new TaskSet(Array(new FakeTask(0)), 0, 2, 0, null) intercept[IllegalStateException] { taskScheduler.submitTasks(attempt3) } - taskScheduler.stageIdToActiveTaskSet(attempt2.stageId).isZombie = true + taskScheduler.activeTaskSets(attempt2.id).isZombie = true taskScheduler.submitTasks(attempt3) } From 906d6266efaa899b3fba949428018bd75157731a Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 14 Jul 2015 12:01:18 -0500 Subject: [PATCH 24/32] fix merge --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 11b12edf7eaf1..65e17e677bff0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -913,7 +913,7 @@ class DAGScheduler( partitionsToCompute.map { id => val locs = getPreferredLocs(stage.rdd, id) val part = stage.rdd.partitions(id) - new ShuffleMapTask(stage.id, stage.attemptId, taskBinary, part, locs) + new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs) } case stage: ResultStage => @@ -922,7 +922,7 @@ class DAGScheduler( val p: Int = job.partitions(id) val part = stage.rdd.partitions(p) val locs = getPreferredLocs(stage.rdd, p) - new ResultTask(stage.id, stage.attemptId, taskBinary, part, locs, id) + new ResultTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, id) } } } catch { @@ -1128,8 +1128,6 @@ class DAGScheduler( val failedStage = stageIdToStage(task.stageId) val mapStage = shuffleToMapStage(shuffleId) - // failedStage.attemptId is already on the next attempt, so we have to use - // failedStage.latestInfo.attemptId if (failedStage.latestInfo.attemptId != task.stageAttemptId) { logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" + s" ${task.stageAttemptId}, which has already failed") From d7f1ef2350a380569f191ca11e662bdbff015d3d Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Wed, 15 Jul 2015 13:08:20 -0500 Subject: [PATCH 25/32] get rid of activeTaskSets --- .../spark/scheduler/TaskSchedulerImpl.scala | 93 +++++++++++-------- .../org/apache/spark/scheduler/TaskSet.scala | 4 +- .../CoarseGrainedSchedulerBackend.scala | 3 +- 3 files changed, 59 insertions(+), 41 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 4eebff8dbb516..ae02ba64020d5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -75,10 +75,9 @@ private[spark] class TaskSchedulerImpl( // TaskSetManagers are not thread safe, so any access to one should be synchronized // on this class. - val activeTaskSets = new HashMap[String, TaskSetManager] val taskSetsByStage = new HashMap[Int, HashMap[Int, TaskSetManager]] - val taskIdToTaskSetId = new HashMap[Long, String] + val taskIdToStageIdAndAttempt = new HashMap[Long, (Int, Int)] val taskIdToExecutorId = new HashMap[Long, String] @volatile private var hasReceivedTask = false @@ -163,10 +162,9 @@ private[spark] class TaskSchedulerImpl( logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") this.synchronized { val manager = createTaskSetManager(taskSet, maxTaskFailures) - activeTaskSets(taskSet.id) = manager val stage = taskSet.stageId val stageTaskSets = taskSetsByStage.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager]) - stageTaskSets(taskSet.attempt) = manager + stageTaskSets(taskSet.stageAttemptId) = manager val conflictingTaskSet = stageTaskSets.exists { case (_, ts) => ts.taskSet != taskSet && !ts.isZombie } @@ -203,19 +201,21 @@ private[spark] class TaskSchedulerImpl( override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized { logInfo("Cancelling stage " + stageId) - activeTaskSets.find(_._2.stageId == stageId).foreach { case (_, tsm) => - // There are two possible cases here: - // 1. The task set manager has been created and some tasks have been scheduled. - // In this case, send a kill signal to the executors to kill the task and then abort - // the stage. - // 2. The task set manager has been created but no tasks has been scheduled. In this case, - // simply abort the stage. - tsm.runningTasksSet.foreach { tid => - val execId = taskIdToExecutorId(tid) - backend.killTask(tid, execId, interruptThread) + taskSetsByStage.get(stageId).foreach { attempts => + attempts.foreach { case (_, tsm) => + // There are two possible cases here: + // 1. The task set manager has been created and some tasks have been scheduled. + // In this case, send a kill signal to the executors to kill the task and then abort + // the stage. + // 2. The task set manager has been created but no tasks has been scheduled. In this case, + // simply abort the stage. + tsm.runningTasksSet.foreach { tid => + val execId = taskIdToExecutorId(tid) + backend.killTask(tid, execId, interruptThread) + } + tsm.abort("Stage %s cancelled".format(stageId)) + logInfo("Stage %d was cancelled".format(stageId)) } - tsm.abort("Stage %s cancelled".format(stageId)) - logInfo("Stage %d was cancelled".format(stageId)) } } @@ -225,9 +225,8 @@ private[spark] class TaskSchedulerImpl( * cleaned up. */ def taskSetFinished(manager: TaskSetManager): Unit = synchronized { - activeTaskSets -= manager.taskSet.id taskSetsByStage.get(manager.taskSet.stageId).foreach { taskSetsForStage => - taskSetsForStage -= manager.taskSet.attempt + taskSetsForStage -= manager.taskSet.stageAttemptId if (taskSetsForStage.isEmpty) { taskSetsByStage -= manager.taskSet.stageId } @@ -252,7 +251,7 @@ private[spark] class TaskSchedulerImpl( for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { tasks(i) += task val tid = task.taskId - taskIdToTaskSetId(tid) = taskSet.taskSet.id + taskIdToStageIdAndAttempt(tid) = (taskSet.taskSet.stageId, taskSet.taskSet.stageAttemptId) taskIdToExecutorId(tid) = execId executorsByHost(host) += execId availableCpus(i) -= CPUS_PER_TASK @@ -336,26 +335,24 @@ private[spark] class TaskSchedulerImpl( failedExecutor = Some(execId) } } - taskIdToTaskSetId.get(tid) match { - case Some(taskSetId) => + taskSetManagerForTask(tid) match { + case Some(taskSet) => if (TaskState.isFinished(state)) { - taskIdToTaskSetId.remove(tid) + taskIdToStageIdAndAttempt.remove(tid) taskIdToExecutorId.remove(tid) } - activeTaskSets.get(taskSetId).foreach { taskSet => - if (state == TaskState.FINISHED) { - taskSet.removeRunningTask(tid) - taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) - } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) { - taskSet.removeRunningTask(tid) - taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData) - } + if (state == TaskState.FINISHED) { + taskSet.removeRunningTask(tid) + taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) + } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) { + taskSet.removeRunningTask(tid) + taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData) } case None => logError( ("Ignoring update with state %s for TID %s because its task set is gone (this is " + - "likely the result of receiving duplicate task finished status updates)") - .format(state, tid)) + "likely the result of receiving duplicate task finished status updates)") + .format(state, tid)) } } catch { case e: Exception => logError("Exception in statusUpdate", e) @@ -380,9 +377,13 @@ private[spark] class TaskSchedulerImpl( val metricsWithStageIds: Array[(Long, Int, Int, TaskMetrics)] = synchronized { taskMetrics.flatMap { case (id, metrics) => - taskIdToTaskSetId.get(id) - .flatMap(activeTaskSets.get) - .map(taskSetMgr => (id, taskSetMgr.stageId, taskSetMgr.taskSet.attempt, metrics)) + for { + (stageId, stageAttemptId) <- taskIdToStageIdAndAttempt.get(id) + attempts <- taskSetsByStage.get(stageId) + taskSetMgr <- attempts.get(stageAttemptId) + } yield { + (id, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId, metrics) + } } } dagScheduler.executorHeartbeatReceived(execId, metricsWithStageIds, blockManagerId) @@ -414,9 +415,12 @@ private[spark] class TaskSchedulerImpl( def error(message: String) { synchronized { - if (activeTaskSets.nonEmpty) { + if (taskSetsByStage.nonEmpty) { // Have each task set throw a SparkException with the error - for ((taskSetId, manager) <- activeTaskSets) { + for { + attempts <- taskSetsByStage.values + manager <- attempts.values + } { try { manager.abort(message) } catch { @@ -537,6 +541,21 @@ private[spark] class TaskSchedulerImpl( override def applicationAttemptId(): Option[String] = backend.applicationAttemptId() + private[scheduler] def taskSetManagerForTask(taskId: Long): Option[TaskSetManager] = { + taskIdToStageIdAndAttempt.get(taskId).flatMap{ case (stageId, stageAttemptId) => + taskSetManagerForAttempt(stageId, stageAttemptId) + } + } + + private[scheduler] def taskSetManagerForAttempt(stageId: Int, stageAttemptId: Int): Option[TaskSetManager] = { + for { + attempts <- taskSetsByStage.get(stageId) + manager <- attempts.get(stageAttemptId) + } yield { + manager + } + } + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala index c3ad325156f53..be8526ba9b94f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala @@ -26,10 +26,10 @@ import java.util.Properties private[spark] class TaskSet( val tasks: Array[Task[_]], val stageId: Int, - val attempt: Int, + val stageAttemptId: Int, val priority: Int, val properties: Properties) { - val id: String = stageId + "." + attempt + val id: String = stageId + "." + stageAttemptId override def toString: String = "TaskSet " + id } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 7c7f70d8a193b..69cea02674388 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -191,8 +191,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp for (task <- tasks.flatten) { val serializedTask = ser.serialize(task) if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) { - val taskSetId = scheduler.taskIdToTaskSetId(task.taskId) - scheduler.activeTaskSets.get(taskSetId).foreach { taskSet => + scheduler.taskSetManagerForTask(task.taskId).foreach { taskSet => try { var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " + From 88b61cce4e9ffe4d379680438cb9d67c6bf3f7b6 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Wed, 15 Jul 2015 13:09:02 -0500 Subject: [PATCH 26/32] add tests to make sure that TaskSchedulerImpl schedules correctly with zombie attempts --- .../org/apache/spark/scheduler/FakeTask.scala | 6 +- .../scheduler/TaskSchedulerImplSuite.scala | 99 +++++++++++++++++-- 2 files changed, 97 insertions(+), 8 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala index 188dded7c02f7..b3ca150195a5f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala @@ -31,12 +31,16 @@ object FakeTask { * locations for each task (given as varargs) if this sequence is not empty. */ def createTaskSet(numTasks: Int, prefLocs: Seq[TaskLocation]*): TaskSet = { + createTaskSet(numTasks, 0, prefLocs: _*) + } + + def createTaskSet(numTasks: Int, stageAttemptId: Int, prefLocs: Seq[TaskLocation]*): TaskSet = { if (prefLocs.size != 0 && prefLocs.size != numTasks) { throw new IllegalArgumentException("Wrong number of task locations") } val tasks = Array.tabulate[Task[_]](numTasks) { i => new FakeTask(i, if (prefLocs.size != 0) prefLocs(i) else Nil) } - new TaskSet(tasks, 0, 0, 0, null) + new TaskSet(tasks, 0, stageAttemptId, 0, null) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 55be409afcf31..199d51275c51d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -33,7 +33,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L val taskScheduler = new TaskSchedulerImpl(sc) taskScheduler.initialize(new FakeSchedulerBackend) // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks. - val dagScheduler = new DAGScheduler(sc, taskScheduler) { + new DAGScheduler(sc, taskScheduler) { override def taskStarted(task: Task[_], taskInfo: TaskInfo) {} override def executorAdded(execId: String, host: String) {} } @@ -67,7 +67,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L val taskScheduler = new TaskSchedulerImpl(sc) taskScheduler.initialize(new FakeSchedulerBackend) // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks. - val dagScheduler = new DAGScheduler(sc, taskScheduler) { + new DAGScheduler(sc, taskScheduler) { override def taskStarted(task: Task[_], taskInfo: TaskInfo) {} override def executorAdded(execId: String, host: String) {} } @@ -138,18 +138,103 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L override def executorAdded(execId: String, host: String) {} } taskScheduler.setDAGScheduler(dagScheduler) - val attempt1 = new TaskSet(Array(new FakeTask(0)), 0, 0, 0, null) - val attempt2 = new TaskSet(Array(new FakeTask(0)), 0, 1, 0, null) + val attempt1 = FakeTask.createTaskSet(1, 0) + val attempt2 = FakeTask.createTaskSet(1, 1) taskScheduler.submitTasks(attempt1) intercept[IllegalStateException] { taskScheduler.submitTasks(attempt2) } // OK to submit multiple if previous attempts are all zombie - taskScheduler.activeTaskSets(attempt1.id).isZombie = true + taskScheduler.taskSetManagerForAttempt(attempt1.stageId, attempt1.stageAttemptId) + .get.isZombie = true taskScheduler.submitTasks(attempt2) - val attempt3 = new TaskSet(Array(new FakeTask(0)), 0, 2, 0, null) + val attempt3 = FakeTask.createTaskSet(1, 2) intercept[IllegalStateException] { taskScheduler.submitTasks(attempt3) } - taskScheduler.activeTaskSets(attempt2.id).isZombie = true + taskScheduler.taskSetManagerForAttempt(attempt2.stageId, attempt2.stageAttemptId) + .get.isZombie = true taskScheduler.submitTasks(attempt3) } + test("don't schedule more tasks after a taskset is zombie") { + sc = new SparkContext("local", "TaskSchedulerImplSuite") + val taskScheduler = new TaskSchedulerImpl(sc) + taskScheduler.initialize(new FakeSchedulerBackend) + // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks. + new DAGScheduler(sc, taskScheduler) { + override def taskStarted(task: Task[_], taskInfo: TaskInfo) {} + override def executorAdded(execId: String, host: String) {} + } + + val numFreeCores = 1 + val workerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores)) + val attempt1 = FakeTask.createTaskSet(10) + + // submit attempt 1, offer some resources, some tasks get scheduled + taskScheduler.submitTasks(attempt1) + val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten + assert(1 === taskDescriptions.length) + + // now mark attempt 1 as a zombie + taskScheduler.taskSetManagerForAttempt(attempt1.stageId, attempt1.stageAttemptId) + .get.isZombie = true + + // don't schedule anything on another resource offer + val taskDescriptions2 = taskScheduler.resourceOffers(workerOffers).flatten + assert(0 === taskDescriptions2.length) + + // if we schedule another attempt for the same stage, it should get scheduled + val attempt2 = FakeTask.createTaskSet(10, 1) + + // submit attempt 2, offer some resources, some tasks get scheduled + taskScheduler.submitTasks(attempt2) + val taskDescriptions3 = taskScheduler.resourceOffers(workerOffers).flatten + assert(1 === taskDescriptions3.length) + val mgr = taskScheduler.taskSetManagerForTask(taskDescriptions3(0).taskId).get + assert(mgr.taskSet.stageAttemptId === 1) + } + + test("if a zombie attempt finishes, continue scheduling tasks for non-zombie attempts") { + sc = new SparkContext("local", "TaskSchedulerImplSuite") + val taskScheduler = new TaskSchedulerImpl(sc) + taskScheduler.initialize(new FakeSchedulerBackend) + // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks. + new DAGScheduler(sc, taskScheduler) { + override def taskStarted(task: Task[_], taskInfo: TaskInfo) {} + override def executorAdded(execId: String, host: String) {} + } + + val numFreeCores = 10 + val workerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores)) + val attempt1 = FakeTask.createTaskSet(10) + + // submit attempt 1, offer some resources, some tasks get scheduled + taskScheduler.submitTasks(attempt1) + val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten + assert(10 === taskDescriptions.length) + + // now mark attempt 1 as a zombie + val mgr1 = taskScheduler.taskSetManagerForAttempt(attempt1.stageId, attempt1.stageAttemptId).get + mgr1.isZombie = true + + // don't schedule anything on another resource offer + val taskDescriptions2 = taskScheduler.resourceOffers(workerOffers).flatten + assert(0 === taskDescriptions2.length) + + //submit attempt 2 + val attempt2 = FakeTask.createTaskSet(10, 1) + taskScheduler.submitTasks(attempt2) + + // attempt 1 finished (this can happen even if it was marked zombie earlier -- all tasks were + // already submitted, and then they finish) + taskScheduler.taskSetFinished(mgr1) + + // now with another resource offer, we should still schedule all the tasks in attempt2 + val taskDescriptions3 = taskScheduler.resourceOffers(workerOffers).flatten + assert(10 === taskDescriptions3.length) + + taskDescriptions3.foreach{ task => + val mgr = taskScheduler.taskSetManagerForTask(task.taskId).get + assert(mgr.taskSet.stageAttemptId === 1) + } + } + } From c04707ee9e177a6150eb6df250be427faec6f137 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Wed, 15 Jul 2015 13:10:55 -0500 Subject: [PATCH 27/32] style --- .../org/apache/spark/scheduler/TaskSchedulerImpl.scala | 7 +++++-- .../apache/spark/scheduler/TaskSchedulerImplSuite.scala | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index ae02ba64020d5..ec306c1e20456 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -251,7 +251,8 @@ private[spark] class TaskSchedulerImpl( for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { tasks(i) += task val tid = task.taskId - taskIdToStageIdAndAttempt(tid) = (taskSet.taskSet.stageId, taskSet.taskSet.stageAttemptId) + taskIdToStageIdAndAttempt(tid) = + (taskSet.taskSet.stageId, taskSet.taskSet.stageAttemptId) taskIdToExecutorId(tid) = execId executorsByHost(host) += execId availableCpus(i) -= CPUS_PER_TASK @@ -547,7 +548,9 @@ private[spark] class TaskSchedulerImpl( } } - private[scheduler] def taskSetManagerForAttempt(stageId: Int, stageAttemptId: Int): Option[TaskSetManager] = { + private[scheduler] def taskSetManagerForAttempt( + stageId: Int, + stageAttemptId: Int): Option[TaskSetManager] = { for { attempts <- taskSetsByStage.get(stageId) manager <- attempts.get(stageAttemptId) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 199d51275c51d..cb0dce44536d1 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -219,7 +219,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L val taskDescriptions2 = taskScheduler.resourceOffers(workerOffers).flatten assert(0 === taskDescriptions2.length) - //submit attempt 2 + // submit attempt 2 val attempt2 = FakeTask.createTaskSet(10, 1) taskScheduler.submitTasks(attempt2) From 4470fa1aa10c902728a44da8ee06bbfca32b5108 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Wed, 15 Jul 2015 13:21:13 -0500 Subject: [PATCH 28/32] rename --- .../spark/scheduler/TaskSchedulerImpl.scala | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index ec306c1e20456..a34b67db388f6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -75,7 +75,7 @@ private[spark] class TaskSchedulerImpl( // TaskSetManagers are not thread safe, so any access to one should be synchronized // on this class. - val taskSetsByStage = new HashMap[Int, HashMap[Int, TaskSetManager]] + val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, TaskSetManager]] val taskIdToStageIdAndAttempt = new HashMap[Long, (Int, Int)] val taskIdToExecutorId = new HashMap[Long, String] @@ -163,7 +163,8 @@ private[spark] class TaskSchedulerImpl( this.synchronized { val manager = createTaskSetManager(taskSet, maxTaskFailures) val stage = taskSet.stageId - val stageTaskSets = taskSetsByStage.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager]) + val stageTaskSets = + taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager]) stageTaskSets(taskSet.stageAttemptId) = manager val conflictingTaskSet = stageTaskSets.exists { case (_, ts) => ts.taskSet != taskSet && !ts.isZombie @@ -201,7 +202,7 @@ private[spark] class TaskSchedulerImpl( override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized { logInfo("Cancelling stage " + stageId) - taskSetsByStage.get(stageId).foreach { attempts => + taskSetsByStageIdAndAttempt.get(stageId).foreach { attempts => attempts.foreach { case (_, tsm) => // There are two possible cases here: // 1. The task set manager has been created and some tasks have been scheduled. @@ -225,10 +226,10 @@ private[spark] class TaskSchedulerImpl( * cleaned up. */ def taskSetFinished(manager: TaskSetManager): Unit = synchronized { - taskSetsByStage.get(manager.taskSet.stageId).foreach { taskSetsForStage => + taskSetsByStageIdAndAttempt.get(manager.taskSet.stageId).foreach { taskSetsForStage => taskSetsForStage -= manager.taskSet.stageAttemptId if (taskSetsForStage.isEmpty) { - taskSetsByStage -= manager.taskSet.stageId + taskSetsByStageIdAndAttempt -= manager.taskSet.stageId } } manager.parent.removeSchedulable(manager) @@ -380,7 +381,7 @@ private[spark] class TaskSchedulerImpl( taskMetrics.flatMap { case (id, metrics) => for { (stageId, stageAttemptId) <- taskIdToStageIdAndAttempt.get(id) - attempts <- taskSetsByStage.get(stageId) + attempts <- taskSetsByStageIdAndAttempt.get(stageId) taskSetMgr <- attempts.get(stageAttemptId) } yield { (id, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId, metrics) @@ -416,10 +417,10 @@ private[spark] class TaskSchedulerImpl( def error(message: String) { synchronized { - if (taskSetsByStage.nonEmpty) { + if (taskSetsByStageIdAndAttempt.nonEmpty) { // Have each task set throw a SparkException with the error for { - attempts <- taskSetsByStage.values + attempts <- taskSetsByStageIdAndAttempt.values manager <- attempts.values } { try { @@ -552,7 +553,7 @@ private[spark] class TaskSchedulerImpl( stageId: Int, stageAttemptId: Int): Option[TaskSetManager] = { for { - attempts <- taskSetsByStage.get(stageId) + attempts <- taskSetsByStageIdAndAttempt.get(stageId) manager <- attempts.get(stageAttemptId) } yield { manager From 6bc23af9817675d18baec37d2c8b9d54d0518efa Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Thu, 16 Jul 2015 20:00:25 -0500 Subject: [PATCH 29/32] update log msg --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 65e17e677bff0..d88fd434c8855 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1130,7 +1130,8 @@ class DAGScheduler( if (failedStage.latestInfo.attemptId != task.stageAttemptId) { logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" + - s" ${task.stageAttemptId}, which has already failed") + s" ${task.stageAttemptId} and there is a more recent attempt for that stage " + + s"(attempt ID ${failedStage.latestInfo.attemptId}) running") } else { // It is likely that we receive multiple FetchFailed for a single stage (because we have From 584acd4ce80ad0c8409638c49874ea1e46099bc6 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Fri, 17 Jul 2015 13:30:52 -0500 Subject: [PATCH 30/32] simplify going from taskId to taskSetMgr --- .../spark/scheduler/TaskSchedulerImpl.scala | 25 ++++++------------- .../CoarseGrainedSchedulerBackend.scala | 4 +-- .../scheduler/TaskSchedulerImplSuite.scala | 4 +-- 3 files changed, 11 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index a34b67db388f6..1705e7f962de2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -75,9 +75,9 @@ private[spark] class TaskSchedulerImpl( // TaskSetManagers are not thread safe, so any access to one should be synchronized // on this class. - val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, TaskSetManager]] + private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, TaskSetManager]] - val taskIdToStageIdAndAttempt = new HashMap[Long, (Int, Int)] + private[scheduler] val taskIdToTaskSetManager = new HashMap[Long, TaskSetManager] val taskIdToExecutorId = new HashMap[Long, String] @volatile private var hasReceivedTask = false @@ -252,8 +252,7 @@ private[spark] class TaskSchedulerImpl( for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { tasks(i) += task val tid = task.taskId - taskIdToStageIdAndAttempt(tid) = - (taskSet.taskSet.stageId, taskSet.taskSet.stageAttemptId) + taskIdToTaskSetManager(tid) = taskSet taskIdToExecutorId(tid) = execId executorsByHost(host) += execId availableCpus(i) -= CPUS_PER_TASK @@ -337,10 +336,10 @@ private[spark] class TaskSchedulerImpl( failedExecutor = Some(execId) } } - taskSetManagerForTask(tid) match { + taskIdToTaskSetManager.get(tid) match { case Some(taskSet) => if (TaskState.isFinished(state)) { - taskIdToStageIdAndAttempt.remove(tid) + taskIdToTaskSetManager.remove(tid) taskIdToExecutorId.remove(tid) } if (state == TaskState.FINISHED) { @@ -379,12 +378,8 @@ private[spark] class TaskSchedulerImpl( val metricsWithStageIds: Array[(Long, Int, Int, TaskMetrics)] = synchronized { taskMetrics.flatMap { case (id, metrics) => - for { - (stageId, stageAttemptId) <- taskIdToStageIdAndAttempt.get(id) - attempts <- taskSetsByStageIdAndAttempt.get(stageId) - taskSetMgr <- attempts.get(stageAttemptId) - } yield { - (id, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId, metrics) + taskIdToTaskSetManager.get(id).map { taskSetMgr => + (id, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId, metrics) } } } @@ -543,12 +538,6 @@ private[spark] class TaskSchedulerImpl( override def applicationAttemptId(): Option[String] = backend.applicationAttemptId() - private[scheduler] def taskSetManagerForTask(taskId: Long): Option[TaskSetManager] = { - taskIdToStageIdAndAttempt.get(taskId).flatMap{ case (stageId, stageAttemptId) => - taskSetManagerForAttempt(stageId, stageAttemptId) - } - } - private[scheduler] def taskSetManagerForAttempt( stageId: Int, stageAttemptId: Int): Option[TaskSetManager] = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 69cea02674388..8d2369ef99aef 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -191,14 +191,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp for (task <- tasks.flatten) { val serializedTask = ser.serialize(task) if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) { - scheduler.taskSetManagerForTask(task.taskId).foreach { taskSet => + scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr => try { var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " + "spark.akka.frameSize or using broadcast variables for large values." msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize, AkkaUtils.reservedSizeBytes) - taskSet.abort(msg) + taskSetMgr.abort(msg) } catch { case e: Exception => logError("Exception in error callback", e) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index cb0dce44536d1..b734d3ae0be7c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -188,7 +188,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L taskScheduler.submitTasks(attempt2) val taskDescriptions3 = taskScheduler.resourceOffers(workerOffers).flatten assert(1 === taskDescriptions3.length) - val mgr = taskScheduler.taskSetManagerForTask(taskDescriptions3(0).taskId).get + val mgr = taskScheduler.taskIdToTaskSetManager.get(taskDescriptions3(0).taskId).get assert(mgr.taskSet.stageAttemptId === 1) } @@ -232,7 +232,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L assert(10 === taskDescriptions3.length) taskDescriptions3.foreach{ task => - val mgr = taskScheduler.taskSetManagerForTask(task.taskId).get + val mgr = taskScheduler.taskIdToTaskSetManager.get(task.taskId).get assert(mgr.taskSet.stageAttemptId === 1) } } From e01b7aa9ae3640a23ee769f7ce779332e25db5b6 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Fri, 17 Jul 2015 13:34:55 -0500 Subject: [PATCH 31/32] fix some comments, style --- .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 9 +++++---- .../apache/spark/scheduler/TaskSchedulerImplSuite.scala | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 25cbc3f884b00..b296be1e07657 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -611,9 +611,9 @@ class DAGSchedulerSuite // Another ResubmitFailedStages event should not result in another attempt for the map // stage being run concurrently. - // NOTE: the actual ResubmitFailedStages may get called at any time during this, shouldn't - // effect anything -- our calling it just makes *SURE* it gets called between the desired event - // and our check. + // NOTE: the actual ResubmitFailedStages may get called at any time during this, but it + // shouldn't effect anything -- our calling it just makes *SURE* it gets called between the + // desired event and our check. runEvent(ResubmitFailedStages) sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(countSubmittedMapStageAttempts() === 2) @@ -680,7 +680,8 @@ class DAGSchedulerSuite createFakeTaskInfo(), null)) - // Trigger resubmission of the failed map stage and finish the re-started map task. + // Running ResubmitFailedStages shouldn't result in any more attempts for the map stage, because + // the FetchFailed should have been ignored runEvent(ResubmitFailedStages) // The FetchFailed from the original reduce stage should be ignored. diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index b734d3ae0be7c..c2edd4c317d6e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -231,7 +231,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L val taskDescriptions3 = taskScheduler.resourceOffers(workerOffers).flatten assert(10 === taskDescriptions3.length) - taskDescriptions3.foreach{ task => + taskDescriptions3.foreach { task => val mgr = taskScheduler.taskIdToTaskSetManager.get(task.taskId).get assert(mgr.taskSet.stageAttemptId === 1) } From fb3acfc6b7f3f5738061dbe033de33df4d84cbd9 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Fri, 17 Jul 2015 14:56:13 -0500 Subject: [PATCH 32/32] fix log msg --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index d88fd434c8855..46397b98269aa 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1142,8 +1142,8 @@ class DAGScheduler( s"due to a fetch failure from $mapStage (${mapStage.name})") markStageAsFinished(failedStage, Some(failureMessage)) } else { - logInfo(s"Ignoring fetch failure from $task as it's from $failedStage, " + - s"which is no longer running") + logDebug(s"Received fetch failure from $task, but its from $failedStage which is no " + + s"longer running") } if (disallowStageRetryForTest) {