From d0647fedd330bdeb3f5178c7066044b2fe3ed7ee Mon Sep 17 00:00:00 2001 From: Sital Kedia Date: Fri, 15 Apr 2016 16:44:23 -0700 Subject: [PATCH 01/11] [SPARK-14649][CORE] DagScheduler should not run duplicate tasks on fetch failure --- .../org/apache/spark/MapOutputTracker.scala | 17 +- .../apache/spark/scheduler/DAGScheduler.scala | 195 ++++---- .../spark/scheduler/DAGSchedulerEvent.scala | 3 + .../spark/scheduler/ShuffleMapStage.scala | 11 - .../org/apache/spark/scheduler/Stage.scala | 7 + .../spark/scheduler/TaskSetManager.scala | 23 +- .../spark/scheduler/DAGSchedulerSuite.scala | 465 +++++++++++------- .../spark/scheduler/TaskSetManagerSuite.scala | 48 ++ 8 files changed, 481 insertions(+), 288 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 4ef665622245..193fc034ce16 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -292,6 +292,9 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf, protected val mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]().asScala private val cachedSerializedStatuses = new ConcurrentHashMap[Int, Array[Byte]]().asScala + // HashMap for storing the epoch for the mapStatuses on the driver. This will be used to + // detect and ignore any bogus fetch failures + private val epochForMapStatus = new ConcurrentHashMap[Int, Array[Long]]().asScala private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf) // Kept in sync with cachedSerializedStatuses explicitly @@ -370,6 +373,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf, if (mapStatuses.put(shuffleId, new Array[MapStatus](numMaps)).isDefined) { throw new IllegalArgumentException("Shuffle ID " + shuffleId + " registered twice") } + epochForMapStatus.put(shuffleId, new Array[Long](numMaps)) // add in advance shuffleIdLocks.putIfAbsent(shuffleId, new Object()) } @@ -378,15 +382,17 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf, val array = mapStatuses(shuffleId) array.synchronized { array(mapId) = status + val epochs = epochForMapStatus.get(shuffleId).get + epochs(mapId) = epoch } } /** Register multiple map output information for the given shuffle */ def registerMapOutputs(shuffleId: Int, statuses: Array[MapStatus], changeEpoch: Boolean = false) { - mapStatuses.put(shuffleId, statuses.clone()) if (changeEpoch) { incrementEpoch() } + mapStatuses.put(shuffleId, statuses.clone()) } /** Unregister map output information of the given shuffle, mapper and block manager */ @@ -418,6 +424,15 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf, cachedSerializedStatuses.contains(shuffleId) || mapStatuses.contains(shuffleId) } + /** Get the epoch for map output for a shuffle, if it is available */ + def getEpochForMapOutput(shuffleId: Int, mapId: Int): Option[Long] = { + val arrayOpt = mapStatuses.get(shuffleId) + if (arrayOpt.isDefined && arrayOpt.get != null && arrayOpt.get(mapId) != null) { + return Some(epochForMapStatus.get(shuffleId).get(mapId)) + } + None + } + /** * Return the preferred hosts on which to run the given map output partition in a given shuffle, * i.e. the nodes that the most outputs for that partition are on. diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 09717316833a..9725e592e3a6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -265,6 +265,13 @@ class DAGScheduler( eventProcessLoop.post(TaskSetFailed(taskSet, reason, exception)) } + /** + * Called by the TaskSetManager when a set of tasks are aborted due to fetch failure. + */ + def tasksAborted(stageId: Int, tasks: Seq[Task[_]]): Unit = { + eventProcessLoop.post(TasksAborted(stageId, tasks)) + } + private[scheduler] def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] = cacheLocs.synchronized { // Note: this doesn't use `getOrElse()` because this method is called O(num tasks) times @@ -819,6 +826,16 @@ class DAGScheduler( stageIdToStage.get(taskSet.stageId).foreach { abortStage(_, reason, exception) } } + private[scheduler] def handleTasksAborted( + stageId: Int, + tasks: Seq[Task[_]]): Unit = { + for (stage <- stageIdToStage.get(stageId)) { + for (task <- tasks) { + stage.pendingPartitions -= task.partitionId + } + } + } + private[scheduler] def cleanUpAfterSchedulerStop() { for (job <- activeJobs) { val error = @@ -945,12 +962,22 @@ class DAGScheduler( } } - /** Called when stage's parents are available and we can now do its task. */ + /** + * Called when stage's parents are available and we can now run its task. + * This only submits the partitions which are missing and have not been + * submitted to the lower-level scheduler for execution. + */ private def submitMissingTasks(stage: Stage, jobId: Int) { logDebug("submitMissingTasks(" + stage + ")") - // First figure out the indexes of partition ids to compute. - val partitionsToCompute: Seq[Int] = stage.findMissingPartitions() + val missingPartitions = stage.findMissingPartitions() + val partitionsToCompute = + missingPartitions.filter(id => !stage.pendingPartitions.contains(id)) + stage.pendingPartitions ++= partitionsToCompute + + if (partitionsToCompute.isEmpty) { + return + } // Use the scheduling pool, job group, description, etc. from an ActiveJob associated // with this Stage @@ -1027,11 +1054,9 @@ class DAGScheduler( val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array() stage match { case stage: ShuffleMapStage => - stage.pendingPartitions.clear() partitionsToCompute.map { id => val locs = taskIdToLocations(id) val part = stage.rdd.partitions(id) - stage.pendingPartitions += id new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) @@ -1057,6 +1082,7 @@ class DAGScheduler( if (tasks.size > 0) { logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " + s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})") + logDebug("New pending partitions: " + stage.pendingPartitions) taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties)) stage.latestInfo.submissionTime = Some(clock.getTimeMillis()) @@ -1160,6 +1186,7 @@ class DAGScheduler( } val stage = stageIdToStage(task.stageId) + stage.pendingPartitions -= task.partitionId event.reason match { case Success => task match { @@ -1179,6 +1206,12 @@ class DAGScheduler( cleanupStateForJobAndIndependentStages(job) listenerBus.post( SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded)) + } else if (resultStage.pendingPartitions.isEmpty) { + logInfo("Resubmitting " + resultStage + " (" + resultStage.name + + ") because some of its tasks had failed: " + + resultStage.findMissingPartitions().mkString(", ")) + markStageAsFinished(resultStage) + submitStage(resultStage) } // taskSucceeded runs some user code that might throw an exception. Make sure @@ -1201,30 +1234,12 @@ class DAGScheduler( val status = event.result.asInstanceOf[MapStatus] val execId = status.location.executorId logDebug("ShuffleMapTask finished on " + execId) - if (stageIdToStage(task.stageId).latestInfo.attemptId == task.stageAttemptId) { - // This task was for the currently running attempt of the stage. Since the task - // completed successfully from the perspective of the TaskSetManager, mark it as - // no longer pending (the TaskSetManager may consider the task complete even - // when the output needs to be ignored because the task's epoch is too small below. - // In this case, when pending partitions is empty, there will still be missing - // output locations, which will cause the DAGScheduler to resubmit the stage below.) - shuffleStage.pendingPartitions -= task.partitionId - } - if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { - logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") - } else { - // The epoch of the task is acceptable (i.e., the task was launched after the most - // recent failure we're aware of for the executor), so mark the task's output as - // available. - shuffleStage.addOutputLoc(smt.partitionId, status) - // Remove the task's partition from pending partitions. This may have already been - // done above, but will not have been done yet in cases where the task attempt was - // from an earlier attempt of the stage (i.e., not the attempt that's currently - // running). This allows the DAGScheduler to mark the stage as complete when one - // copy of each task has finished successfully, even if the currently active stage - // still has tasks running. - shuffleStage.pendingPartitions -= task.partitionId - } + shuffleStage.addOutputLoc(smt.partitionId, status) + + mapOutputTracker.registerMapOutput( + shuffleStage.shuffleDep.shuffleId, + smt.partitionId, + status) if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) { markStageAsFinished(shuffleStage) @@ -1281,65 +1296,11 @@ class DAGScheduler( val failedStage = stageIdToStage(task.stageId) val mapStage = shuffleIdToMapStage(shuffleId) - if (failedStage.latestInfo.attemptId != task.stageAttemptId) { - logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" + - s" ${task.stageAttemptId} and there is a more recent attempt for that stage " + - s"(attempt ID ${failedStage.latestInfo.attemptId}) running") - } else { - // It is likely that we receive multiple FetchFailed for a single stage (because we have - // multiple tasks running concurrently on different executors). In that case, it is - // possible the fetch failure has already been handled by the scheduler. - if (runningStages.contains(failedStage)) { - logInfo(s"Marking $failedStage (${failedStage.name}) as failed " + - s"due to a fetch failure from $mapStage (${mapStage.name})") - markStageAsFinished(failedStage, Some(failureMessage)) - } else { - logDebug(s"Received fetch failure from $task, but its from $failedStage which is no " + - s"longer running") - } - - failedStage.fetchFailedAttemptIds.add(task.stageAttemptId) - val shouldAbortStage = - failedStage.fetchFailedAttemptIds.size >= maxConsecutiveStageAttempts || - disallowStageRetryForTest - - if (shouldAbortStage) { - val abortMessage = if (disallowStageRetryForTest) { - "Fetch failure will not retry stage due to testing config" - } else { - s"""$failedStage (${failedStage.name}) - |has failed the maximum allowable number of - |times: $maxConsecutiveStageAttempts. - |Most recent failure reason: $failureMessage""".stripMargin.replaceAll("\n", " ") - } - abortStage(failedStage, abortMessage, None) - } else { // update failedStages and make sure a ResubmitFailedStages event is enqueued - // TODO: Cancel running tasks in the failed stage -- cf. SPARK-17064 - val noResubmitEnqueued = !failedStages.contains(failedStage) - failedStages += failedStage - failedStages += mapStage - if (noResubmitEnqueued) { - // We expect one executor failure to trigger many FetchFailures in rapid succession, - // but all of those task failures can typically be handled by a single resubmission of - // the failed stage. We avoid flooding the scheduler's event queue with resubmit - // messages by checking whether a resubmit is already in the event queue for the - // failed stage. If there is already a resubmit enqueued for a different failed - // stage, that event would also be sufficient to handle the current failed stage, but - // producing a resubmit for each failed stage makes debugging and logging a little - // simpler while not producing an overwhelming number of scheduler events. - logInfo( - s"Resubmitting $mapStage (${mapStage.name}) and " + - s"$failedStage (${failedStage.name}) due to fetch failure" - ) - messageScheduler.schedule( - new Runnable { - override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) - }, - DAGScheduler.RESUBMIT_TIMEOUT, - TimeUnit.MILLISECONDS - ) - } - } + val epochForMapOutput = mapOutputTracker.getEpochForMapOutput(shuffleId, mapId) + // It is possible that the map output was regenerated by rerun of the stage and the + // fetch failure is being reported for stale map output. In that case, we should just + // ignore the fetch failure and relaunch the task with latest map output info. + if (epochForMapOutput.nonEmpty && epochForMapOutput.get <= task.epoch) { // Mark the map whose fetch failed as broken in the map stage if (mapId != -1) { mapStage.removeOutputLoc(mapId, bmAddress) @@ -1352,6 +1313,61 @@ class DAGScheduler( } } + // It is likely that we receive multiple FetchFailed for a single stage (because we have + // multiple tasks running concurrently on different executors). In that case, it is + // possible the fetch failure has already been handled by the scheduler. + if (runningStages.contains(failedStage)) { + logInfo(s"Marking $failedStage (${failedStage.name}) as failed " + + s"due to a fetch failure from $mapStage (${mapStage.name})") + markStageAsFinished(failedStage, Some(failureMessage)) + } else { + logDebug(s"Received fetch failure from $task, but its from $failedStage which is no " + + s"longer running") + } + + failedStage.fetchFailedAttemptIds.add(task.stageAttemptId) + val shouldAbortStage = + failedStage.fetchFailedAttemptIds.size >= maxConsecutiveStageAttempts || + disallowStageRetryForTest + + if (shouldAbortStage) { + val abortMessage = if (disallowStageRetryForTest) { + "Fetch failure will not retry stage due to testing config" + } else { + s"""$failedStage (${failedStage.name}) + |has failed the maximum allowable number of + |times: $maxConsecutiveStageAttempts. + |Most recent failure reason: $failureMessage""".stripMargin.replaceAll("\n", " ") + } + abortStage(failedStage, abortMessage, None) + } else { + // update failedStages and make sure a ResubmitFailedStages event is enqueued + val noResubmitEnqueued = !failedStages.contains(failedStage) + failedStages += failedStage + failedStages += mapStage + if (noResubmitEnqueued) { + // We expect one executor failure to trigger many FetchFailures in rapid succession, + // but all of those task failures can typically be handled by a single resubmission of + // the failed stage. We avoid flooding the scheduler's event queue with resubmit + // messages by checking whether a resubmit is already in the event queue for the + // failed stage. If there is already a resubmit enqueued for a different failed + // stage, that event would also be sufficient to handle the current failed stage, but + // producing a resubmit for each failed stage makes debugging and logging a little + // simpler while not producing an overwhelming number of scheduler events. + logInfo( + s"Resubmitting $mapStage (${mapStage.name}) and " + + s"$failedStage (${failedStage.name}) due to fetch failure" + ) + messageScheduler.schedule( + new Runnable { + override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) + }, + DAGScheduler.RESUBMIT_TIMEOUT, + TimeUnit.MILLISECONDS + ) + } + } + case commitDenied: TaskCommitDenied => // Do nothing here, left up to the TaskScheduler to decide how to handle denied commits @@ -1718,6 +1734,9 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler case TaskSetFailed(taskSet, reason, exception) => dagScheduler.handleTaskSetFailed(taskSet, reason, exception) + case TasksAborted(stageId, tasks) => + dagScheduler.handleTasksAborted(stageId, tasks) + case ResubmitFailedStages => dagScheduler.resubmitFailedStages() } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala index cda0585f154a..b2a9e3905527 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala @@ -90,4 +90,7 @@ private[scheduler] case class TaskSetFailed(taskSet: TaskSet, reason: String, exception: Option[Throwable]) extends DAGSchedulerEvent +private[scheduler] +case class TasksAborted(stageId: Int, tasks: Seq[Task[_]]) extends DAGSchedulerEvent + private[scheduler] case object ResubmitFailedStages extends DAGSchedulerEvent diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala index db4d9efa2270..295b0007efb0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala @@ -49,17 +49,6 @@ private[spark] class ShuffleMapStage( private[this] var _numAvailableOutputs: Int = 0 - /** - * Partitions that either haven't yet been computed, or that were computed on an executor - * that has since been lost, so should be re-computed. This variable is used by the - * DAGScheduler to determine when a stage has completed. Task successes in both the active - * attempt for the stage or in earlier attempts for this stage can cause paritition ids to get - * removed from pendingPartitions. As a result, this variable may be inconsistent with the pending - * tasks in the TaskSetManager for the active attempt for the stage (the partitions stored here - * will always be a subset of the partitions that the TaskSetManager thinks are pending). - */ - val pendingPartitions = new HashSet[Int] - /** * List of [[MapStatus]] for each partition. The index of the array is the map partition id, * and each value in the array is the list of possible [[MapStatus]] for a partition diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index 290fd073caf2..cafb30ad586b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -17,6 +17,7 @@ package org.apache.spark.scheduler +import scala.collection.mutable import scala.collection.mutable.HashSet import org.apache.spark.executor.TaskMetrics @@ -67,6 +68,12 @@ private[scheduler] abstract class Stage( /** Set of jobs that this stage belongs to. */ val jobIds = new HashSet[Int] + /** + * Set of partitions which have been submitted to the lower-level scheduler and + * they should not be resubmitted when rerun of the stage. + */ + val pendingPartitions = new HashSet[Int] + /** The ID to use for the next new attempt for this stage. */ private var nextAttemptId: Int = 0 diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index a177aab5f95d..0ad5f5cbe856 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -109,8 +109,8 @@ private[spark] class TaskSetManager( // the zombie state once at least one attempt of each task has completed successfully, or if the // task set is aborted (for example, because it was killed). TaskSetManagers remain in the zombie // state until all tasks have finished running; we keep TaskSetManagers that are in the zombie - // state in order to continue to track and account for the running tasks. - // TODO: We should kill any running task attempts when the task set manager becomes a zombie. + // state in order to continue to track and account for the running tasks. The tasks running in the + // zombie TaskSetManagers are not rerun by the DagScheduler unless they fail. private[scheduler] var isZombie = false // Set of pending tasks for each executor. These collections are actually @@ -767,12 +767,29 @@ private[spark] class TaskSetManager( s" executor ${info.executorId}): ${reason.toErrorString}" val failureException: Option[Throwable] = reason match { case fetchFailed: FetchFailed => + if (!isZombie) { + // Only for the first occurrence of the fetch failure, get the list + // of all non-running and non-successful tasks and notify the + // DagScheduler of their abortion so that they can be rescheduled in retry + // of the stage. Note that this does not include the fetch failed tasks, + // because that is separately handled by the DagScheduler. + val abortedTasks = new ArrayBuffer[Task[_]] + for (i <- 0 until numTasks) { + if (i != index && !successful(i) && copiesRunning(i) == 0) { + abortedTasks += taskSet.tasks(i) + } + } + if (!abortedTasks.isEmpty) { + sched.dagScheduler.tasksAborted(abortedTasks(0).stageId, abortedTasks) + } + isZombie = true + } + logWarning(failureReason) if (!successful(index)) { successful(index) = true tasksSuccessful += 1 } - isZombie = true None case ef: ExceptionFailure => diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index a10941b579fe..9adea4107101 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -606,6 +606,277 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou assertDataStructuresEmpty() } + test("Test no duplicate shuffle map tasks running on fetch failure (SPARK-14649)") { + val firstRDD = new MyRDD(sc, 2, Nil) + val firstShuffleDep = new ShuffleDependency(firstRDD, new HashPartitioner(2)) + val firstShuffleId = firstShuffleDep.shuffleId + val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep)) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) + val reduceRdd = new MyRDD(sc, 1, List(shuffleDep)) + submit(reduceRdd, Array(0)) + + // things start out smoothly, stage 0 completes with no issues + complete(taskSets(0), Seq( + (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)), + (Success, makeMapStatus("hostA", shuffleMapRdd.partitions.length)) + )) + + // Begin event for the reduce tasks. + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + + // fail taks 0 in stage 1 due to fetch failure + val failedTask1 = taskSets(1).tasks(0) + runEvent(makeCompletionEvent( + failedTask1, + FetchFailed(makeBlockManagerId("hostA"), firstShuffleId, 0, 0, "ignored"), + 42, Seq.empty, createFakeTaskInfo())) + + // abort task 1 in stage 1 due to fetch failure, task 2 still running + val abortedTask1 = taskSets(1).tasks(1) + runEvent(new TasksAborted(1, List(abortedTask1))) + + // Make sure that we still have 2 running tasks for the first attempt + assert(sparkListener.failedStages.contains(1)) + + // Wait for resubmission of the map stage + Thread.sleep(1000) + + // so we resubmit stage 0, which completes happily + val stage0Resubmit1 = taskSets(2) + assert(stage0Resubmit1.stageId == 0) + assert(stage0Resubmit1.stageAttemptId === 1) + val task1 = stage0Resubmit1.tasks(0) + runEvent(makeCompletionEvent( + task1, + Success, + makeMapStatus("hostC", shuffleMapRdd.partitions.length), + Seq.empty, createFakeTaskInfo())) + + // we will now have a task set representing + // the second attempt for stage 1, but we *also* have 1 task for the first attempt for + // stage 1 still going, so we make sure that we don't resubmit the already running tasks. + val stage1Resubmit1 = taskSets(3) + assert(stage1Resubmit1.stageId == 1) + assert(stage1Resubmit1.stageAttemptId === 1) + assert(stage1Resubmit1.tasks.length === 2) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + + // Now fail the running task from the first attempt and + // succeed all others + val succeededTask1 = taskSets(3).tasks(0) + runEvent(makeCompletionEvent( + succeededTask1, + Success, + makeMapStatus("hostC", reduceRdd.partitions.length), + Seq.empty, createFakeTaskInfo())) + + val failedTask2 = taskSets(1).tasks(2) + runEvent(makeCompletionEvent( + failedTask2, + FetchFailed(makeBlockManagerId("hostB"), firstShuffleId, 0, 0, "ignored"), + 42, Seq.empty, createFakeTaskInfo())) + + val succeededTask2 = taskSets(3).tasks(1) + runEvent(makeCompletionEvent( + succeededTask2, + Success, + makeMapStatus("hostC", reduceRdd.partitions.length), + Seq.empty, createFakeTaskInfo())) + + // Sleep for some time for the completion event to be processed by the DagScheduler + // and make sure that stage 0 is resumbitted. + Thread.sleep(1000) + val stage0Resubmit2 = taskSets(4) + assert(stage0Resubmit2.stageId == 0) + assert(stage0Resubmit2.stageAttemptId === 2) + assert(stage0Resubmit2.tasks.length === 1) + val task2 = stage0Resubmit2.tasks(0) + runEvent(makeCompletionEvent( + task2, + Success, + makeMapStatus("hostC", shuffleMapRdd.partitions.length))) + + Thread.sleep(1000) + // Make sure we resubmit the only failed tasks in stage 1. + val stage1Resubmit2 = taskSets(5) + assert(stage1Resubmit2.stageId == 1) + assert(stage1Resubmit2.stageAttemptId === 2) + assert(stage1Resubmit2.tasks.length === 1) + } + + test("Test no duplicate shuffle reduce tasks running on fetch failure (SPARK-14649)") { + val shuffleMapRdd = new MyRDD(sc, 2, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) + val shuffleId = shuffleDep.shuffleId + val reduceRdd = new MyRDD(sc, 3, List(shuffleDep), tracker = mapOutputTracker) + submit(reduceRdd, Array(0, 1, 2)) + complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", reduceRdd.partitions.length)), + (Success, makeMapStatus("hostB", reduceRdd.partitions.length)))) + // Begin event for the reduce tasks. + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + // Fail one and abort one of the reduce tasks due to fetch failure. + assert(taskSets(1).tasks.size === 3) + val failedTask = taskSets(1).tasks(0) + runEvent(makeCompletionEvent( + failedTask, + FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), + 42, Seq.empty, createFakeTaskInfo())) + val abortedTask = taskSets(1).tasks(1) + runEvent(new TasksAborted(1, List(abortedTask))) + // note that taskSet(1).tasks(2) will be still running state so + // it should not be resumbitted in the next retry + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + assert(sparkListener.failedStages.contains(1)) + + // Wait for resubmission of the map stage + Thread.sleep(1000) + // Retry of the map stage finishes happily + assert(taskSets(2).tasks.size === 1) + complete(taskSets(2), Seq( + (Success, makeMapStatus("hostC", reduceRdd.partitions.length)))) + + // Newly submitted taskSet for the reduce phase should not contain the + // running task. + assert(taskSets(3).tasks.size == 2) + + // Finish the taskSet(3) successfully + runEvent(makeCompletionEvent( + taskSets(3).tasks(0), Success, 42, + Seq.empty, createFakeTaskInfo())) + + runEvent(makeCompletionEvent( + taskSets(3).tasks(1), Success, 42, + Seq.empty, createFakeTaskInfo())) + + // Fail the running tasks in taskSets(1) and make sure its resubmitted + runEvent(makeCompletionEvent( + taskSets(1).tasks(2), + FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), + 42, Seq.empty, createFakeTaskInfo())) + + // Make sure that we resubmit the failed reduce task + Thread.sleep(1000) + assert(taskSets(4).tasks.size == 1) + } + + test("Test fetch failure on stale map output do not " + + "cause resubmission of the stage (SPARK-14649)") { + val shuffleMapRdd = new MyRDD(sc, 2, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) + val shuffleId = shuffleDep.shuffleId + val reduceRdd = new MyRDD(sc, 3, List(shuffleDep), tracker = mapOutputTracker) + submit(reduceRdd, Array(0, 1, 2)) + complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", reduceRdd.partitions.length)), + (Success, makeMapStatus("hostB", reduceRdd.partitions.length)))) + // Begin event for the reduce tasks. + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + // Fail one and abort one of the reduce tasks due to fetch failure. + assert(taskSets(1).tasks.size === 3) + val failedTask = taskSets(1).tasks(0) + runEvent(makeCompletionEvent( + failedTask, + FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), + 42, Seq.empty, createFakeTaskInfo())) + val abortedTask = taskSets(1).tasks(1) + runEvent(new TasksAborted(1, List(abortedTask))) + // note that taskSet(1).tasks(2) will be still running state so + // it should not be resumbitted in the next retry + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + assert(sparkListener.failedStages.contains(1)) + + // Wait for resubmission of the map stage + Thread.sleep(1000) + // Retry of the map stage finishes happily + assert(taskSets(2).tasks.size === 1) + complete(taskSets(2), Seq( + (Success, makeMapStatus("hostA", reduceRdd.partitions.length)))) + + + Thread.sleep(1000) + // Newly submitted taskSet for the reduce phase should not contain the + // running task. + assert(taskSets(3).tasks.size == 2) + + // Fetch fail taskSet(1).tasks(2) for mapId 0 which has already been re-computed + // by rerun of the stage 0. This fetch failure should be ignored and this should not + // trigger another rerun of the stage 0. + runEvent(makeCompletionEvent( + taskSets(1).tasks(2), + FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 2, "ignored"), + 42, Seq.empty, createFakeTaskInfo())) + + // Wait for resubmission of the map stage + Thread.sleep(1000) + + // Newly submitted taskSet for the reduce phase should not contain the + // running task. + + assert(taskSets(4).tasks.size == 1) + assert(taskSets(4).tasks(0).stageId == 1) + assert(taskSets(4).tasks(0).stageAttemptId == 2) + } + + test("Test task rerun in case of failure in zombie taskSet (SPARK-14649)") { + val shuffleMapRdd = new MyRDD(sc, 2, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) + val shuffleId = shuffleDep.shuffleId + val reduceRdd = new MyRDD(sc, 3, List(shuffleDep), tracker = mapOutputTracker) + submit(reduceRdd, Array(0, 1, 2)) + complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", reduceRdd.partitions.length)), + (Success, makeMapStatus("hostB", reduceRdd.partitions.length)))) + // Begin event for the reduce tasks. + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + // Fail one and abort one of the reduce tasks due to fetch failure. + assert(taskSets(1).tasks.size === 3) + val failedTask = taskSets(1).tasks(0) + runEvent(makeCompletionEvent( + failedTask, + FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), + 42, Seq.empty, createFakeTaskInfo())) + val abortedTask = taskSets(1).tasks(1) + runEvent(new TasksAborted(1, List(abortedTask))) + // note that taskSet(1).tasks(2) will be still running state so + // it should not be resumbitted in the next retry + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + assert(sparkListener.failedStages.contains(1)) + + // Wait for resubmission of the map stage + Thread.sleep(1000) + // Retry of the map stage finishes happily + assert(taskSets(2).tasks.size === 1) + complete(taskSets(2), Seq( + (Success, makeMapStatus("hostC", reduceRdd.partitions.length)))) + + // Newly submitted taskSet for the reduce phase should not contain the + // running task. + assert(taskSets(3).tasks.size == 2) + + // Fail the running tasks in taskSets(1) with ExceptionFailure. Since the + // taskSet(1) is in zombie state now, the DagScheduler should rerun the + // failed task. + + val exceptionFailure = new ExceptionFailure( + new SparkException("fondue?"), Seq.empty) + + runEvent(makeCompletionEvent(taskSets(1).tasks(2), exceptionFailure, "result")) + + // Finish the taskSet(3) successfully + runEvent(makeCompletionEvent( + taskSets(3).tasks(0), Success, 42, + Seq.empty, createFakeTaskInfo())) + + runEvent(makeCompletionEvent( + taskSets(3).tasks(1), Success, 42, + Seq.empty, createFakeTaskInfo())) + + // Make sure that we resubmit the failed reduce task + Thread.sleep(1000) + assert(taskSets(4).tasks.size == 1) + } + test("run trivial shuffle with fetch failure") { val shuffleMapRdd = new MyRDD(sc, 2, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) @@ -1084,6 +1355,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou // Trigger resubmission of the failed map stage and finish the re-started map task. runEvent(ResubmitFailedStages) + assert(taskSets(2).tasks.size === 1) complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1)))) // Because the map stage finished, another attempt for the reduce stage should have been @@ -1102,8 +1374,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou // the FetchFailed should have been ignored runEvent(ResubmitFailedStages) - // The FetchFailed from the original reduce stage should be ignored. - assert(countSubmittedMapStageAttempts() === 2) + // The FetchFailed from the original reduce stage should not be ignored. + assert(countSubmittedMapStageAttempts() === 3) } test("task events always posted in speculation / when stage is killed") { @@ -1152,63 +1424,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou assert(sparkListener.endedTasks.size == 6) } - test("ignore late map task completions") { - val shuffleMapRdd = new MyRDD(sc, 2, Nil) - val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) - val shuffleId = shuffleDep.shuffleId - val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker) - submit(reduceRdd, Array(0, 1)) - - // pretend we were told hostA went away - val oldEpoch = mapOutputTracker.getEpoch - runEvent(ExecutorLost("exec-hostA", ExecutorKilled)) - val newEpoch = mapOutputTracker.getEpoch - assert(newEpoch > oldEpoch) - - // now start completing some tasks in the shuffle map stage, under different hosts - // and epochs, and make sure scheduler updates its state correctly - val taskSet = taskSets(0) - val shuffleStage = scheduler.stageIdToStage(taskSet.stageId).asInstanceOf[ShuffleMapStage] - assert(shuffleStage.numAvailableOutputs === 0) - - // should be ignored for being too old - runEvent(makeCompletionEvent( - taskSet.tasks(0), - Success, - makeMapStatus("hostA", reduceRdd.partitions.size))) - assert(shuffleStage.numAvailableOutputs === 0) - - // should work because it's a non-failed host (so the available map outputs will increase) - runEvent(makeCompletionEvent( - taskSet.tasks(0), - Success, - makeMapStatus("hostB", reduceRdd.partitions.size))) - assert(shuffleStage.numAvailableOutputs === 1) - - // should be ignored for being too old - runEvent(makeCompletionEvent( - taskSet.tasks(0), - Success, - makeMapStatus("hostA", reduceRdd.partitions.size))) - assert(shuffleStage.numAvailableOutputs === 1) - - // should work because it's a new epoch, which will increase the number of available map - // outputs, and also finish the stage - taskSet.tasks(1).epoch = newEpoch - runEvent(makeCompletionEvent( - taskSet.tasks(1), - Success, - makeMapStatus("hostA", reduceRdd.partitions.size))) - assert(shuffleStage.numAvailableOutputs === 2) - assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === - HashSet(makeBlockManagerId("hostB"), makeBlockManagerId("hostA"))) - - // finish the next stage normally, which completes the job - complete(taskSets(1), Seq((Success, 42), (Success, 43))) - assert(results === Map(0 -> 42, 1 -> 43)) - assertDataStructuresEmpty() - } - test("run shuffle with map stage failure") { val shuffleMapRdd = new MyRDD(sc, 2, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) @@ -1312,11 +1527,11 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou // now here is where things get tricky : we will now have a task set representing // the second attempt for stage 1, but we *also* have some tasks for the first attempt for - // stage 1 still going + // stage 1 still going, so we make sure that we don't resubmit the already running tasks. val stage1Resubmit = taskSets(3) assert(stage1Resubmit.stageId == 1) assert(stage1Resubmit.stageAttemptId === 1) - assert(stage1Resubmit.tasks.length === 3) + assert(stage1Resubmit.tasks.length === 1) // we'll have some tasks finish from the first attempt, and some finish from the second attempt, // so that we actually have all stage outputs, though no attempt has completed all its @@ -1326,7 +1541,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou Success, makeMapStatus("hostC", reduceRdd.partitions.length))) runEvent(makeCompletionEvent( - taskSets(3).tasks(1), + taskSets(1).tasks(1), Success, makeMapStatus("hostC", reduceRdd.partitions.length))) // late task finish from the first attempt @@ -1575,50 +1790,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou assertDataStructuresEmpty() } - /** - * In this test, we run a map stage where one of the executors fails but we still receive a - * "zombie" complete message from a task that ran on that executor. We want to make sure the - * stage is resubmitted so that the task that ran on the failed executor is re-executed, and - * that the stage is only marked as finished once that task completes. - */ - test("run trivial shuffle with out-of-band executor failure and retry") { - val shuffleMapRdd = new MyRDD(sc, 2, Nil) - val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) - val shuffleId = shuffleDep.shuffleId - val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker) - submit(reduceRdd, Array(0)) - // Tell the DAGScheduler that hostA was lost. - runEvent(ExecutorLost("exec-hostA", ExecutorKilled)) - complete(taskSets(0), Seq( - (Success, makeMapStatus("hostA", 1)), - (Success, makeMapStatus("hostB", 1)))) - - // At this point, no more tasks are running for the stage (and the TaskSetManager considers the - // stage complete), but the tasks that ran on HostA need to be re-run, so the DAGScheduler - // should re-submit the stage with one task (the task that originally ran on HostA). - assert(taskSets.size === 2) - assert(taskSets(1).tasks.size === 1) - - // Make sure that the stage that was re-submitted was the ShuffleMapStage (not the reduce - // stage, which shouldn't be run until all of the tasks in the ShuffleMapStage complete on - // alive executors). - assert(taskSets(1).tasks(0).isInstanceOf[ShuffleMapTask]) - - // have hostC complete the resubmitted task - complete(taskSets(1), Seq((Success, makeMapStatus("hostC", 1)))) - assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === - HashSet(makeBlockManagerId("hostC"), makeBlockManagerId("hostB"))) - - // Make sure that the reduce stage was now submitted. - assert(taskSets.size === 3) - assert(taskSets(2).tasks(0).isInstanceOf[ResultTask[_, _]]) - - // Complete the reduce stage. - complete(taskSets(2), Seq((Success, 42))) - assert(results === Map(0 -> 42)) - assertDataStructuresEmpty() - } - test("recursive shuffle failures") { val shuffleOneRdd = new MyRDD(sc, 2, Nil) val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, new HashPartitioner(2)) @@ -2054,73 +2225,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou assertDataStructuresEmpty() } - /** - * In this test, we run a map stage where one of the executors fails but we still receive a - * "zombie" complete message from that executor. We want to make sure the stage is not reported - * as done until all tasks have completed. - * - * Most of the functionality in this test is tested in "run trivial shuffle with out-of-band - * executor failure and retry". However, that test uses ShuffleMapStages that are followed by - * a ResultStage, whereas in this test, the ShuffleMapStage is tested in isolation, without a - * ResultStage after it. - */ - test("map stage submission with executor failure late map task completions") { - val shuffleMapRdd = new MyRDD(sc, 3, Nil) - val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) - - submitMapStage(shuffleDep) - - val oldTaskSet = taskSets(0) - runEvent(makeCompletionEvent(oldTaskSet.tasks(0), Success, makeMapStatus("hostA", 2))) - assert(results.size === 0) // Map stage job should not be complete yet - - // Pretend host A was lost. This will cause the TaskSetManager to resubmit task 0, because it - // completed on hostA. - val oldEpoch = mapOutputTracker.getEpoch - runEvent(ExecutorLost("exec-hostA", ExecutorKilled)) - val newEpoch = mapOutputTracker.getEpoch - assert(newEpoch > oldEpoch) - - // Suppose we also get a completed event from task 1 on the same host; this should be ignored - runEvent(makeCompletionEvent(oldTaskSet.tasks(1), Success, makeMapStatus("hostA", 2))) - assert(results.size === 0) // Map stage job should not be complete yet - - // A completion from another task should work because it's a non-failed host - runEvent(makeCompletionEvent(oldTaskSet.tasks(2), Success, makeMapStatus("hostB", 2))) - - // At this point, no more tasks are running for the stage (and the TaskSetManager considers - // the stage complete), but the task that ran on hostA needs to be re-run, so the map stage - // shouldn't be marked as complete, and the DAGScheduler should re-submit the stage. - assert(results.size === 0) - assert(taskSets.size === 2) - - // Now complete tasks in the second task set - val newTaskSet = taskSets(1) - // 2 tasks should have been re-submitted, for tasks 0 and 1 (which ran on hostA). - assert(newTaskSet.tasks.size === 2) - // Complete task 0 from the original task set (i.e., not hte one that's currently active). - // This should still be counted towards the job being complete (but there's still one - // outstanding task). - runEvent(makeCompletionEvent(newTaskSet.tasks(0), Success, makeMapStatus("hostB", 2))) - assert(results.size === 0) - - // Complete the final task, from the currently active task set. There's still one - // running task, task 0 in the currently active stage attempt, but the success of task 0 means - // the DAGScheduler can mark the stage as finished. - runEvent(makeCompletionEvent(newTaskSet.tasks(1), Success, makeMapStatus("hostB", 2))) - assert(results.size === 1) // Map stage job should now finally be complete - assertDataStructuresEmpty() - - // Also test that a reduce stage using this shuffled data can immediately run - val reduceRDD = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker) - results.clear() - submit(reduceRDD, Array(0, 1)) - complete(taskSets(2), Seq((Success, 42), (Success, 43))) - assert(results === Map(0 -> 42, 1 -> 43)) - results.clear() - assertDataStructuresEmpty() - } - /** * Checks the DAGScheduler's internal logic for traversing an RDD DAG by making sure that * getShuffleDependencies correctly returns the direct shuffle dependencies of a particular @@ -2246,28 +2350,19 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou (Success, makeMapStatus("hostB", 2)))) // Complete task(stageId=1, stageAttemptId=0, partitionId=1) running on failed hostA - // successfully. The success should be ignored because the task started before the - // executor failed, so the output may have been lost. + // successfully. runEvent(makeCompletionEvent( taskSets(1).tasks(1), Success, makeMapStatus("hostA", 2))) - // Both tasks in rddB should be resubmitted, because none of them has succeeded truely. + // Only one task in rddB should be resubmitted, because one of them already completed. // Complete the task(stageId=1, stageAttemptId=1, partitionId=0) successfully. // Task(stageId=1, stageAttemptId=1, partitionId=1) of this new active stage attempt // is still running. assert(taskSets(3).stageId === 1 && taskSets(3).stageAttemptId === 1 - && taskSets(3).tasks.size === 2) + && taskSets(3).tasks.size === 1) runEvent(makeCompletionEvent( taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2))) - // There should be no new attempt of stage submitted, - // because task(stageId=1, stageAttempt=1, partitionId=1) is still running in - // the current attempt (and hasn't completed successfully in any earlier attempts). - assert(taskSets.size === 4) - - // Complete task(stageId=1, stageAttempt=1, partitionId=1) successfully. - runEvent(makeCompletionEvent( - taskSets(3).tasks(1), Success, makeMapStatus("hostB", 2))) // Now the ResultStage should be submitted, because all of the tasks of rddB have // completed successfully on alive executors. diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 132caef0978f..76368e17d347 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -34,11 +34,17 @@ import org.apache.spark.util.{AccumulatorV2, ManualClock} class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler) extends DAGScheduler(sc) { + val abortedPartitions = new mutable.HashSet[Int] override def taskStarted(task: Task[_], taskInfo: TaskInfo) { taskScheduler.startedTasks += taskInfo.index } + override def tasksAborted(stageId: Int, tasks: Seq[Task[_]]): Unit = { + for (task <- tasks) { + abortedPartitions += task.partitionId + } + } override def taskEnded( task: Task[_], reason: TaskEndReason, @@ -415,6 +421,48 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg } } + test("pending tasks should be aborted after first fetch failure") { + val rescheduleDelay = 300L + val conf = new SparkConf(). + set("spark.scheduler.executorTaskBlacklistTime", rescheduleDelay.toString). + // don't wait to jump locality levels in this test + set("spark.locality.wait", "0") + + sc = new SparkContext("local", "test", conf) + // two executors on same host, one on different. + val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), + ("exec1.1", "host1"), ("exec2", "host2")) + // affinity to exec1 on host1 - which we will fail. + val taskSet = FakeTask.createTaskSet(4) + val clock = new ManualClock + val manager = new TaskSetManager(sched, taskSet, 4, None, clock) + + val offerResult1 = manager.resourceOffer("exec1", "host1", ANY) + assert(offerResult1.isDefined, "Expect resource offer to return a task") + + assert(offerResult1.get.index === 0) + assert(offerResult1.get.executorId === "exec1") + + val offerResult2 = manager.resourceOffer("exec2", "host2", ANY) + assert(offerResult2.isDefined, "Expect resource offer to return a task") + + assert(offerResult2.get.index === 1) + assert(offerResult2.get.executorId === "exec2") + // At this point, we have 2 tasks running and 2 pending. First fetch failure should + // abort all the pending tasks but the running tasks should not be aborted. + manager.handleFailedTask(offerResult1.get.taskId, TaskState.FINISHED, + FetchFailed(BlockManagerId("exec-host2", "host2", 12345), 0, 0, 0, "ignored")) + val dagScheduler = sched.dagScheduler.asInstanceOf[FakeDAGScheduler] + assert(dagScheduler.abortedPartitions.size === 2) + + dagScheduler.abortedPartitions.clear() + // Second fetch failure should not notify the DagScheduler of the aborted tasks. + + manager.handleFailedTask(offerResult1.get.taskId, TaskState.FINISHED, + FetchFailed(BlockManagerId("exec-host2", "host2", 12345), 0, 0, 0, "ignored")) + assert(dagScheduler.abortedPartitions.size === 0) + } + test("executors should be blacklisted after task failure, in spite of locality preferences") { val rescheduleDelay = 300L val conf = new SparkConf(). From 2da37de60aa8ed6e2ab52bcda79461a92faa2fb7 Mon Sep 17 00:00:00 2001 From: Sital Kedia Date: Wed, 15 Mar 2017 17:23:08 -0700 Subject: [PATCH 02/11] Remove unneeded check for conflicting taskSet --- .../org/apache/spark/scheduler/TaskSchedulerImpl.scala | 7 ------- 1 file changed, 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 07aea773fa63..0578ce7a7c08 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -195,13 +195,6 @@ private[spark] class TaskSchedulerImpl private[scheduler]( val stageTaskSets = taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager]) stageTaskSets(taskSet.stageAttemptId) = manager - val conflictingTaskSet = stageTaskSets.exists { case (_, ts) => - ts.taskSet != taskSet && !ts.isZombie - } - if (conflictingTaskSet) { - throw new IllegalStateException(s"more than one active taskSet for stage $stage:" + - s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}") - } schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) if (!isLocal && !hasReceivedTask) { From 8f98ff1a7935b68a72ab7b3cd42bd3bbbf7fa76d Mon Sep 17 00:00:00 2001 From: Sital Kedia Date: Tue, 21 Mar 2017 17:24:27 -0700 Subject: [PATCH 03/11] Address review comments and fix commiter issue --- .../org/apache/spark/MapOutputTracker.scala | 14 +++--- .../apache/spark/scheduler/DAGScheduler.scala | 20 ++++---- .../scheduler/OutputCommitCoordinator.scala | 8 +++- .../spark/scheduler/DAGSchedulerSuite.scala | 46 +++++++++++++++++++ .../OutputCommitCoordinatorSuite.scala | 4 +- 5 files changed, 71 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 193fc034ce16..a6f6f2a1dbfa 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -382,17 +382,17 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf, val array = mapStatuses(shuffleId) array.synchronized { array(mapId) = status - val epochs = epochForMapStatus.get(shuffleId).get + val epochs = epochForMapStatus(shuffleId) epochs(mapId) = epoch } } /** Register multiple map output information for the given shuffle */ def registerMapOutputs(shuffleId: Int, statuses: Array[MapStatus], changeEpoch: Boolean = false) { + mapStatuses.put(shuffleId, statuses.clone()) if (changeEpoch) { incrementEpoch() } - mapStatuses.put(shuffleId, statuses.clone()) } /** Unregister map output information of the given shuffle, mapper and block manager */ @@ -426,11 +426,11 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf, /** Get the epoch for map output for a shuffle, if it is available */ def getEpochForMapOutput(shuffleId: Int, mapId: Int): Option[Long] = { - val arrayOpt = mapStatuses.get(shuffleId) - if (arrayOpt.isDefined && arrayOpt.get != null && arrayOpt.get(mapId) != null) { - return Some(epochForMapStatus.get(shuffleId).get(mapId)) - } - None + for { + mapStatus <- mapStatuses.get(shuffleId).flatMap { mapStatusArray => + Option(mapStatusArray(mapId)) + } + } yield epochForMapStatus(shuffleId)(mapId) } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 9725e592e3a6..b6b587a3b766 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -829,11 +829,10 @@ class DAGScheduler( private[scheduler] def handleTasksAborted( stageId: Int, tasks: Seq[Task[_]]): Unit = { - for (stage <- stageIdToStage.get(stageId)) { - for (task <- tasks) { - stage.pendingPartitions -= task.partitionId - } - } + for { + stage <- stageIdToStage.get(stageId) + task <- tasks + } stage.pendingPartitions -= task.partitionId } private[scheduler] def cleanUpAfterSchedulerStop() { @@ -971,8 +970,7 @@ class DAGScheduler( logDebug("submitMissingTasks(" + stage + ")") val missingPartitions = stage.findMissingPartitions() - val partitionsToCompute = - missingPartitions.filter(id => !stage.pendingPartitions.contains(id)) + val partitionsToCompute = missingPartitions.filterNot(stage.pendingPartitions) stage.pendingPartitions ++= partitionsToCompute if (partitionsToCompute.isEmpty) { @@ -990,10 +988,11 @@ class DAGScheduler( // event. stage match { case s: ShuffleMapStage => - outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1) + outputCommitCoordinator.stageStart(stage = s.id, partitionsToCompute, + maxPartitionId = s.numPartitions - 1) case s: ResultStage => outputCommitCoordinator.stageStart( - stage = s.id, maxPartitionId = s.rdd.partitions.length - 1) + stage = s.id, partitionsToCompute, maxPartitionId = s.rdd.partitions.length - 1) } val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try { stage match { @@ -1300,7 +1299,8 @@ class DAGScheduler( // It is possible that the map output was regenerated by rerun of the stage and the // fetch failure is being reported for stale map output. In that case, we should just // ignore the fetch failure and relaunch the task with latest map output info. - if (epochForMapOutput.nonEmpty && epochForMapOutput.get <= task.epoch) { + for(epochForMapOutput <- mapOutputTracker.getEpochForMapOutput(shuffleId, mapId) if + epochForMapOutput <= task.epoch) { // Mark the map whose fetch failed as broken in the map stage if (mapId != -1) { mapStage.removeOutputLoc(mapId, bmAddress) diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index 83d87b548a43..b1c37ad93a08 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -109,8 +109,12 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) * @param maxPartitionId the maximum partition id that could appear in this stage's tasks (i.e. * the maximum possible value of `context.partitionId`). */ - private[scheduler] def stageStart(stage: StageId, maxPartitionId: Int): Unit = synchronized { - stageStates(stage) = new StageState(maxPartitionId + 1) + private[scheduler] def stageStart(stage: StageId, partitionsToCompute: Seq[Int], maxPartitionId: Int): Unit = synchronized { + // stageStates(stage) = new StageState(maxPartitionId + 1) + val stageState = stageStates.getOrElseUpdate(stage, new StageState(maxPartitionId + 1)) + for (i <- partitionsToCompute) { + stageState.authorizedCommitters(i) = NO_AUTHORIZED_COMMITTER + } } // Called by DAGScheduler diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 9adea4107101..09b35d7306a0 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -704,6 +704,52 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou assert(stage1Resubmit2.tasks.length === 1) } + test("Test allow commit from task from staged failed due to fetch failure (SPARK-14649)") { + val firstRDD = new MyRDD(sc, 2, Nil) + val firstShuffleDep = new ShuffleDependency(firstRDD, new HashPartitioner(2)) + val firstShuffleId = firstShuffleDep.shuffleId + val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep)) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) + val reduceRdd = new MyRDD(sc, 1, List(shuffleDep)) + submit(reduceRdd, Array(0)) + + // things start out smoothly, stage 0 completes with no issues + complete(taskSets(0), Seq( + (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)), + (Success, makeMapStatus("hostA", shuffleMapRdd.partitions.length)) + )) + + // Begin event for the reduce tasks. + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + + // fail taks 0 in stage 1 due to fetch failure + val failedTask1 = taskSets(1).tasks(0) + runEvent(makeCompletionEvent( + failedTask1, + FetchFailed(makeBlockManagerId("hostA"), firstShuffleId, 0, 0, "ignored"), + 42, Seq.empty, createFakeTaskInfo())) + + // abort task 1 in stage 1 due to fetch failure, task 2 still running + val abortedTask1 = taskSets(1).tasks(1) + runEvent(new TasksAborted(1, List(abortedTask1))) + + // Make sure that we still have 2 running tasks for the first attempt + assert(sparkListener.failedStages.contains(1)) + + // Wait for resubmission of the map stage + Thread.sleep(1000) + + // so we resubmit stage 0, which completes happily + val stage0Resubmit1 = taskSets(2) + assert(stage0Resubmit1.stageId == 0) + assert(stage0Resubmit1.stageAttemptId === 1) + + // Now the running task from the first attempt tries to commit the output and we + // make sure it succeeds. + assert(scheduler.outputCommitCoordinator.canCommit(1, + taskSets(1).tasks(2).partitionId, 0)) + } + test("Test no duplicate shuffle reduce tasks running on fetch failure (SPARK-14649)") { val shuffleMapRdd = new MyRDD(sc, 2, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index e51e6a0d3ff6..1772afd811ca 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -170,7 +170,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { val partition: Int = 2 val authorizedCommitter: Int = 3 val nonAuthorizedCommitter: Int = 100 - outputCommitCoordinator.stageStart(stage, maxPartitionId = 2) + outputCommitCoordinator.stageStart(stage, Array(1, 2), maxPartitionId = 2) assert(outputCommitCoordinator.canCommit(stage, partition, authorizedCommitter)) assert(!outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter)) @@ -201,7 +201,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { val stage: Int = 1 val partition: Int = 1 val failedAttempt: Int = 0 - outputCommitCoordinator.stageStart(stage, maxPartitionId = 1) + outputCommitCoordinator.stageStart(stage, Array(1), maxPartitionId = 1) outputCommitCoordinator.taskCompleted(stage, partition, attemptNumber = failedAttempt, reason = ExecutorLostFailure("0", exitCausedByApp = true, None)) assert(!outputCommitCoordinator.canCommit(stage, partition, failedAttempt)) From e2661ff712d1f2a7a6b321141ffd6ea3fb2316e4 Mon Sep 17 00:00:00 2001 From: Sital Kedia Date: Tue, 21 Mar 2017 23:17:03 -0700 Subject: [PATCH 04/11] Fix tests and build --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 1 - .../apache/spark/scheduler/OutputCommitCoordinator.scala | 9 +++------ 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index b6b587a3b766..445a03ab784f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1484,7 +1484,6 @@ class DAGScheduler( logInfo(s"$stage (${stage.name}) failed in $serviceTime s due to ${errorMessage.get}") } - outputCommitCoordinator.stageEnd(stage.id) listenerBus.post(SparkListenerStageCompleted(stage.latestInfo)) runningStages -= stage } diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index b1c37ad93a08..1dfb623b228a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -109,7 +109,9 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) * @param maxPartitionId the maximum partition id that could appear in this stage's tasks (i.e. * the maximum possible value of `context.partitionId`). */ - private[scheduler] def stageStart(stage: StageId, partitionsToCompute: Seq[Int], maxPartitionId: Int): Unit = synchronized { + private[scheduler] def stageStart(stage: StageId, + partitionsToCompute: Seq[Int], + maxPartitionId: Int): Unit = synchronized { // stageStates(stage) = new StageState(maxPartitionId + 1) val stageState = stageStates.getOrElseUpdate(stage, new StageState(maxPartitionId + 1)) for (i <- partitionsToCompute) { @@ -117,11 +119,6 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) } } - // Called by DAGScheduler - private[scheduler] def stageEnd(stage: StageId): Unit = synchronized { - stageStates.remove(stage) - } - // Called by DAGScheduler private[scheduler] def taskCompleted( stage: StageId, From 8303f2edfb31a78291f4a99871eadb2d9308c24b Mon Sep 17 00:00:00 2001 From: Sital Kedia Date: Thu, 23 Mar 2017 11:27:03 -0700 Subject: [PATCH 05/11] Fix tests --- .../apache/spark/scheduler/DAGScheduler.scala | 4 +- .../scheduler/OutputCommitCoordinator.scala | 11 +--- .../spark/scheduler/TaskSetManager.scala | 4 ++ .../scheduler/BlacklistIntegrationSuite.scala | 65 +------------------ .../spark/scheduler/DAGSchedulerSuite.scala | 10 +-- 5 files changed, 11 insertions(+), 83 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 445a03ab784f..af731a7c6fbc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1185,9 +1185,9 @@ class DAGScheduler( } val stage = stageIdToStage(task.stageId) - stage.pendingPartitions -= task.partitionId event.reason match { case Success => + stage.pendingPartitions -= task.partitionId task match { case rt: ResultTask[_, _] => // Cast to ResultStage here because it's part of the ResultTask @@ -1284,7 +1284,6 @@ class DAGScheduler( logInfo("Resubmitted " + task + ", so marking it as still running") stage match { case sms: ShuffleMapStage => - sms.pendingPartitions += task.partitionId case _ => assert(false, "TaskSetManagers should only send Resubmitted task statuses for " + @@ -1292,6 +1291,7 @@ class DAGScheduler( } case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) => + stage.pendingPartitions -= task.partitionId val failedStage = stageIdToStage(task.stageId) val mapStage = shuffleIdToMapStage(shuffleId) diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index 1dfb623b228a..3625f25aadff 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -59,20 +59,12 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) * exclusive lock on committing task output for that partition, as well as any known failed * attempts in the stage. * - * Entries are added to the top-level map when stages start and are removed they finish - * (either successfully or unsuccessfully). + * Entries are added to the top-level map when stages start. * * Access to this map should be guarded by synchronizing on the OutputCommitCoordinator instance. */ private val stageStates = mutable.Map[StageId, StageState]() - /** - * Returns whether the OutputCommitCoordinator's internal data structures are all empty. - */ - def isEmpty: Boolean = { - stageStates.isEmpty - } - /** * Called by tasks to ask whether they can commit their output to HDFS. * @@ -112,7 +104,6 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) private[scheduler] def stageStart(stage: StageId, partitionsToCompute: Seq[Int], maxPartitionId: Int): Unit = synchronized { - // stageStates(stage) = new StageState(maxPartitionId + 1) val stageState = stageStates.getOrElseUpdate(stage, new StageState(maxPartitionId + 1)) for (i <- partitionsToCompute) { stageState.authorizedCommitters(i) = NO_AUTHORIZED_COMMITTER diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 0ad5f5cbe856..146dba31384f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -842,6 +842,10 @@ private[spark] class TaskSetManager( sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, info) + if (reason != Success && isZombie) { + sched.dagScheduler.tasksAborted(tasks(index).stageId, Array(tasks(index))) + } + if (successful(index)) { logInfo(s"Task ${info.id} in stage ${taskSet.id} (TID $tid) failed, but the task will not" + s" be re-executed (either because the task failed with a shuffle data fetch failure," + diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala index f6015cd51c2b..0d19d912af28 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.internal.config class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorMockBackend]{ val badHost = "host-0" - val duration = Duration(10, SECONDS) + val duration = Duration(10, MINUTES) /** * This backend just always fails if the task is executed on a bad host, but otherwise succeeds @@ -40,44 +40,6 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM } } - // Test demonstrating the issue -- without a config change, the scheduler keeps scheduling - // according to locality preferences, and so the job fails - testScheduler("If preferred node is bad, without blacklist job will fail", - extraConfs = Seq( - config.BLACKLIST_ENABLED.key -> "false" - )) { - val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost) - withBackend(badHostBackend _) { - val jobFuture = submit(rdd, (0 until 10).toArray) - awaitJobTermination(jobFuture, duration) - } - assertDataStructuresEmpty(noFailure = false) - } - - testScheduler( - "With default settings, job can succeed despite multiple bad executors on node", - extraConfs = Seq( - config.BLACKLIST_ENABLED.key -> "true", - config.MAX_TASK_FAILURES.key -> "4", - "spark.testing.nHosts" -> "2", - "spark.testing.nExecutorsPerHost" -> "5", - "spark.testing.nCoresPerExecutor" -> "10" - ) - ) { - // To reliably reproduce the failure that would occur without blacklisting, we have to use 1 - // task. That way, we ensure this 1 task gets rotated through enough bad executors on the host - // to fail the taskSet, before we have a bunch of different tasks fail in the executors so we - // blacklist them. - // But the point here is -- without blacklisting, we would never schedule anything on the good - // host-1 before we hit too many failures trying our preferred host-0. - val rdd = new MockRDDWithLocalityPrefs(sc, 1, Nil, badHost) - withBackend(badHostBackend _) { - val jobFuture = submit(rdd, (0 until 1).toArray) - awaitJobTermination(jobFuture, duration) - } - assertDataStructuresEmpty(noFailure = true) - } - // Here we run with the blacklist on, and the default config takes care of having this // robust to one bad node. testScheduler( @@ -97,31 +59,6 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM assertDataStructuresEmpty(noFailure = true) } - // Make sure that if we've failed on all executors, but haven't hit task.maxFailures yet, the job - // doesn't hang - testScheduler( - "SPARK-15865 Progress with fewer executors than maxTaskFailures", - extraConfs = Seq( - config.BLACKLIST_ENABLED.key -> "true", - "spark.testing.nHosts" -> "2", - "spark.testing.nExecutorsPerHost" -> "1", - "spark.testing.nCoresPerExecutor" -> "1" - ) - ) { - def runBackend(): Unit = { - val (taskDescription, _) = backend.beginTask() - backend.taskFailed(taskDescription, new RuntimeException("test task failure")) - } - withBackend(runBackend _) { - val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray) - awaitJobTermination(jobFuture, duration) - val pattern = ("Aborting TaskSet 0.0 because task .* " + - "cannot run anywhere due to node and executor blacklist").r - assert(pattern.findFirstIn(failure.getMessage).isDefined, - s"Couldn't find $pattern in ${failure.getMessage()}") - } - assertDataStructuresEmpty(noFailure = false) - } } class MultiExecutorMockBackend( diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 09b35d7306a0..f574e9cc18df 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -900,14 +900,11 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou // running task. assert(taskSets(3).tasks.size == 2) - // Fail the running tasks in taskSets(1) with ExceptionFailure. Since the + // Abort the running tasks in taskSets(1) (due to some exception failure). Since the // taskSet(1) is in zombie state now, the DagScheduler should rerun the - // failed task. + // aborted task. - val exceptionFailure = new ExceptionFailure( - new SparkException("fondue?"), Seq.empty) - - runEvent(makeCompletionEvent(taskSets(1).tasks(2), exceptionFailure, "result")) + runEvent(new TasksAborted(1, List(taskSets(1).tasks(2)))) // Finish the taskSet(3) successfully runEvent(makeCompletionEvent( @@ -2438,7 +2435,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou assert(scheduler.runningStages.isEmpty) assert(scheduler.shuffleIdToMapStage.isEmpty) assert(scheduler.waitingStages.isEmpty) - assert(scheduler.outputCommitCoordinator.isEmpty) } // Nothing in this test should break if the task info's fields are null, but From 3e878dd56ac672df1a80321e6e53c8fda087188f Mon Sep 17 00:00:00 2001 From: Sital Kedia Date: Thu, 23 Mar 2017 15:46:31 -0700 Subject: [PATCH 06/11] Fix tests --- .../spark/scheduler/TaskSetManager.scala | 2 +- .../scheduler/BlacklistIntegrationSuite.scala | 65 ++++++++++++++++++- .../spark/scheduler/TaskSetManagerSuite.scala | 44 +++++++++++++ 3 files changed, 109 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 146dba31384f..668e9c2a7441 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -842,7 +842,7 @@ private[spark] class TaskSetManager( sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, info) - if (reason != Success && isZombie) { + if (reason != Success && !reason.isInstanceOf[FetchFailed] && isZombie) { sched.dagScheduler.tasksAborted(tasks(index).stageId, Array(tasks(index))) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala index 0d19d912af28..f6015cd51c2b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.internal.config class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorMockBackend]{ val badHost = "host-0" - val duration = Duration(10, MINUTES) + val duration = Duration(10, SECONDS) /** * This backend just always fails if the task is executed on a bad host, but otherwise succeeds @@ -40,6 +40,44 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM } } + // Test demonstrating the issue -- without a config change, the scheduler keeps scheduling + // according to locality preferences, and so the job fails + testScheduler("If preferred node is bad, without blacklist job will fail", + extraConfs = Seq( + config.BLACKLIST_ENABLED.key -> "false" + )) { + val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost) + withBackend(badHostBackend _) { + val jobFuture = submit(rdd, (0 until 10).toArray) + awaitJobTermination(jobFuture, duration) + } + assertDataStructuresEmpty(noFailure = false) + } + + testScheduler( + "With default settings, job can succeed despite multiple bad executors on node", + extraConfs = Seq( + config.BLACKLIST_ENABLED.key -> "true", + config.MAX_TASK_FAILURES.key -> "4", + "spark.testing.nHosts" -> "2", + "spark.testing.nExecutorsPerHost" -> "5", + "spark.testing.nCoresPerExecutor" -> "10" + ) + ) { + // To reliably reproduce the failure that would occur without blacklisting, we have to use 1 + // task. That way, we ensure this 1 task gets rotated through enough bad executors on the host + // to fail the taskSet, before we have a bunch of different tasks fail in the executors so we + // blacklist them. + // But the point here is -- without blacklisting, we would never schedule anything on the good + // host-1 before we hit too many failures trying our preferred host-0. + val rdd = new MockRDDWithLocalityPrefs(sc, 1, Nil, badHost) + withBackend(badHostBackend _) { + val jobFuture = submit(rdd, (0 until 1).toArray) + awaitJobTermination(jobFuture, duration) + } + assertDataStructuresEmpty(noFailure = true) + } + // Here we run with the blacklist on, and the default config takes care of having this // robust to one bad node. testScheduler( @@ -59,6 +97,31 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM assertDataStructuresEmpty(noFailure = true) } + // Make sure that if we've failed on all executors, but haven't hit task.maxFailures yet, the job + // doesn't hang + testScheduler( + "SPARK-15865 Progress with fewer executors than maxTaskFailures", + extraConfs = Seq( + config.BLACKLIST_ENABLED.key -> "true", + "spark.testing.nHosts" -> "2", + "spark.testing.nExecutorsPerHost" -> "1", + "spark.testing.nCoresPerExecutor" -> "1" + ) + ) { + def runBackend(): Unit = { + val (taskDescription, _) = backend.beginTask() + backend.taskFailed(taskDescription, new RuntimeException("test task failure")) + } + withBackend(runBackend _) { + val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray) + awaitJobTermination(jobFuture, duration) + val pattern = ("Aborting TaskSet 0.0 because task .* " + + "cannot run anywhere due to node and executor blacklist").r + assert(pattern.findFirstIn(failure.getMessage).isDefined, + s"Couldn't find $pattern in ${failure.getMessage()}") + } + assertDataStructuresEmpty(noFailure = false) + } } class MultiExecutorMockBackend( diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 76368e17d347..adf4e2228b69 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -435,6 +435,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg // affinity to exec1 on host1 - which we will fail. val taskSet = FakeTask.createTaskSet(4) val clock = new ManualClock + clock.advance(1) val manager = new TaskSetManager(sched, taskSet, 4, None, clock) val offerResult1 = manager.resourceOffer("exec1", "host1", ANY) @@ -463,6 +464,49 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(dagScheduler.abortedPartitions.size === 0) } + test("Failed tasks should be aborted after fetch failure") { + val rescheduleDelay = 300L + val conf = new SparkConf(). + set("spark.scheduler.executorTaskBlacklistTime", rescheduleDelay.toString). + // don't wait to jump locality levels in this test + set("spark.locality.wait", "0") + + sc = new SparkContext("local", "test", conf) + // two executors on same host, one on different. + val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), + ("exec1.1", "host1"), ("exec2", "host2")) + // affinity to exec1 on host1 - which we will fail. + val taskSet = FakeTask.createTaskSet(4) + val clock = new ManualClock + clock.advance(1) + val manager = new TaskSetManager(sched, taskSet, 4, None, clock) + + val offerResult1 = manager.resourceOffer("exec1", "host1", ANY) + assert(offerResult1.isDefined, "Expect resource offer to return a task") + + assert(offerResult1.get.index === 0) + assert(offerResult1.get.executorId === "exec1") + + val offerResult2 = manager.resourceOffer("exec2", "host2", ANY) + assert(offerResult2.isDefined, "Expect resource offer to return a task") + + assert(offerResult2.get.index === 1) + assert(offerResult2.get.executorId === "exec2") + // At this point, we have 2 tasks running and 2 pending. First fetch failure should + // abort all the pending tasks but the running tasks should not be aborted. + manager.handleFailedTask(offerResult1.get.taskId, TaskState.FINISHED, + FetchFailed(BlockManagerId("exec-host2", "host2", 12345), 0, 0, 0, "ignored")) + val dagScheduler = sched.dagScheduler.asInstanceOf[FakeDAGScheduler] + assert(dagScheduler.abortedPartitions.size === 2) + + dagScheduler.abortedPartitions.clear() + + val exceptionFailure = new ExceptionFailure(new SparkException("fondue?"), Seq.empty) + manager.handleFailedTask(offerResult1.get.taskId, TaskState.FINISHED, exceptionFailure) + + assert(dagScheduler.abortedPartitions.size === 1) + } + test("executors should be blacklisted after task failure, in spite of locality preferences") { val rescheduleDelay = 300L val conf = new SparkConf(). From 2ac4b34b96eb3721888fa74bd14e71de53c96cce Mon Sep 17 00:00:00 2001 From: Sital Kedia Date: Thu, 23 Mar 2017 16:09:34 -0700 Subject: [PATCH 07/11] Minor changes --- .../scala/org/apache/spark/scheduler/TaskSetManager.scala | 4 ++++ .../org/apache/spark/scheduler/TaskSetManagerSuite.scala | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 668e9c2a7441..2123d511fbfe 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -843,6 +843,10 @@ private[spark] class TaskSetManager( sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, info) if (reason != Success && !reason.isInstanceOf[FetchFailed] && isZombie) { + // If the TaskSetManager is in zombie mode, we should inform the DAGScheduler to abort + // the task in case of failure so that the DagScheduler can rerun it in the retry of the stage. + // Please note that we exclude fetch failed tasks, because they are handled by the DAGScheduler + // separately. sched.dagScheduler.tasksAborted(tasks(index).stageId, Array(tasks(index))) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index adf4e2228b69..ec17710b33ad 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -464,7 +464,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(dagScheduler.abortedPartitions.size === 0) } - test("Failed tasks should be aborted after fetch failure") { + test("Failed tasks should be aborted in zombie mode") { val rescheduleDelay = 300L val conf = new SparkConf(). set("spark.scheduler.executorTaskBlacklistTime", rescheduleDelay.toString). From d32856d5f26f037e91cff8dbf3e603bd444755a5 Mon Sep 17 00:00:00 2001 From: Sital Kedia Date: Thu, 23 Mar 2017 16:21:12 -0700 Subject: [PATCH 08/11] Fix check style --- .../scala/org/apache/spark/scheduler/TaskSetManager.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 2123d511fbfe..44f7c00ad43f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -844,9 +844,9 @@ private[spark] class TaskSetManager( if (reason != Success && !reason.isInstanceOf[FetchFailed] && isZombie) { // If the TaskSetManager is in zombie mode, we should inform the DAGScheduler to abort - // the task in case of failure so that the DagScheduler can rerun it in the retry of the stage. - // Please note that we exclude fetch failed tasks, because they are handled by the DAGScheduler - // separately. + // the task in case of failure so that the DagScheduler can rerun it in the retry of + // the stage. Please note that we exclude fetch failed tasks, because they are handled + // by the DAGScheduler separately. sched.dagScheduler.tasksAborted(tasks(index).stageId, Array(tasks(index))) } From 1e6e88a37001bd2f026eff1bd8db6adb5e9bf796 Mon Sep 17 00:00:00 2001 From: Sital Kedia Date: Mon, 27 Mar 2017 17:15:27 -0700 Subject: [PATCH 09/11] May be fix tests? --- .../scala/org/apache/spark/MapOutputTracker.scala | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index a6f6f2a1dbfa..c5f6d25a24e8 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -426,11 +426,16 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf, /** Get the epoch for map output for a shuffle, if it is available */ def getEpochForMapOutput(shuffleId: Int, mapId: Int): Option[Long] = { - for { - mapStatus <- mapStatuses.get(shuffleId).flatMap { mapStatusArray => - Option(mapStatusArray(mapId)) + val arrayOpt = mapStatuses.get(shuffleId) + if (arrayOpt.isDefined && arrayOpt.get != null) { + val array = arrayOpt.get + array.synchronized { + if (array(mapId) != null) { + return Some(epochForMapStatus(shuffleId)(mapId)) + } } - } yield epochForMapStatus(shuffleId)(mapId) + } + None } /** From bdaff123dd21feff72218d8163fa1a69e45f1a1e Mon Sep 17 00:00:00 2001 From: Sital Kedia Date: Tue, 28 Mar 2017 18:00:14 -0700 Subject: [PATCH 10/11] Fix tests --- .../org/apache/spark/MapOutputTracker.scala | 16 +++++++--------- .../apache/spark/scheduler/DAGScheduler.scala | 1 - 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index c5f6d25a24e8..32d703e49bd7 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -426,16 +426,14 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf, /** Get the epoch for map output for a shuffle, if it is available */ def getEpochForMapOutput(shuffleId: Int, mapId: Int): Option[Long] = { - val arrayOpt = mapStatuses.get(shuffleId) - if (arrayOpt.isDefined && arrayOpt.get != null) { - val array = arrayOpt.get - array.synchronized { - if (array(mapId) != null) { - return Some(epochForMapStatus(shuffleId)(mapId)) - } - } + if (mapId < 0) { + return None } - None + for { + mapStatus <- mapStatuses.get(shuffleId).flatMap { mapStatusArray => + Option(mapStatusArray(mapId)) + } + } yield epochForMapStatus(shuffleId)(mapId) } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index af731a7c6fbc..1e7884d77e2c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1295,7 +1295,6 @@ class DAGScheduler( val failedStage = stageIdToStage(task.stageId) val mapStage = shuffleIdToMapStage(shuffleId) - val epochForMapOutput = mapOutputTracker.getEpochForMapOutput(shuffleId, mapId) // It is possible that the map output was regenerated by rerun of the stage and the // fetch failure is being reported for stale map output. In that case, we should just // ignore the fetch failure and relaunch the task with latest map output info. From ace8464a1ec34864e56fbfceaac509895dcf31d4 Mon Sep 17 00:00:00 2001 From: Sital Kedia Date: Tue, 28 Mar 2017 21:11:46 -0700 Subject: [PATCH 11/11] Remove test which is not applicable --- .../scheduler/TaskSchedulerImplSuite.scala | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 8b9d45f734cd..a1724ea6b58e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -199,25 +199,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(taskDescriptions.map(_.executorId) === Seq("executor0")) } - test("refuse to schedule concurrent attempts for the same stage (SPARK-8103)") { - val taskScheduler = setupScheduler() - val attempt1 = FakeTask.createTaskSet(1, 0) - val attempt2 = FakeTask.createTaskSet(1, 1) - taskScheduler.submitTasks(attempt1) - intercept[IllegalStateException] { taskScheduler.submitTasks(attempt2) } - - // OK to submit multiple if previous attempts are all zombie - taskScheduler.taskSetManagerForAttempt(attempt1.stageId, attempt1.stageAttemptId) - .get.isZombie = true - taskScheduler.submitTasks(attempt2) - val attempt3 = FakeTask.createTaskSet(1, 2) - intercept[IllegalStateException] { taskScheduler.submitTasks(attempt3) } - taskScheduler.taskSetManagerForAttempt(attempt2.stageId, attempt2.stageAttemptId) - .get.isZombie = true - taskScheduler.submitTasks(attempt3) - assert(!failedTaskSet) - } - test("don't schedule more tasks after a taskset is zombie") { val taskScheduler = setupScheduler()