diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 4ef6656222455..32d703e49bd74 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -292,6 +292,9 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf, protected val mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]().asScala private val cachedSerializedStatuses = new ConcurrentHashMap[Int, Array[Byte]]().asScala + // HashMap for storing the epoch for the mapStatuses on the driver. This will be used to + // detect and ignore any bogus fetch failures + private val epochForMapStatus = new ConcurrentHashMap[Int, Array[Long]]().asScala private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf) // Kept in sync with cachedSerializedStatuses explicitly @@ -370,6 +373,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf, if (mapStatuses.put(shuffleId, new Array[MapStatus](numMaps)).isDefined) { throw new IllegalArgumentException("Shuffle ID " + shuffleId + " registered twice") } + epochForMapStatus.put(shuffleId, new Array[Long](numMaps)) // add in advance shuffleIdLocks.putIfAbsent(shuffleId, new Object()) } @@ -378,6 +382,8 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf, val array = mapStatuses(shuffleId) array.synchronized { array(mapId) = status + val epochs = epochForMapStatus(shuffleId) + epochs(mapId) = epoch } } @@ -418,6 +424,18 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf, cachedSerializedStatuses.contains(shuffleId) || mapStatuses.contains(shuffleId) } + /** Get the epoch for map output for a shuffle, if it is available */ + def getEpochForMapOutput(shuffleId: Int, mapId: Int): Option[Long] = { + if (mapId < 0) { + return None + } + for { + mapStatus <- mapStatuses.get(shuffleId).flatMap { mapStatusArray => + Option(mapStatusArray(mapId)) + } + } yield epochForMapStatus(shuffleId)(mapId) + } + /** * Return the preferred hosts on which to run the given map output partition in a given shuffle, * i.e. the nodes that the most outputs for that partition are on. diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 09717316833a7..1e7884d77e2c4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -265,6 +265,13 @@ class DAGScheduler( eventProcessLoop.post(TaskSetFailed(taskSet, reason, exception)) } + /** + * Called by the TaskSetManager when a set of tasks are aborted due to fetch failure. + */ + def tasksAborted(stageId: Int, tasks: Seq[Task[_]]): Unit = { + eventProcessLoop.post(TasksAborted(stageId, tasks)) + } + private[scheduler] def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] = cacheLocs.synchronized { // Note: this doesn't use `getOrElse()` because this method is called O(num tasks) times @@ -819,6 +826,15 @@ class DAGScheduler( stageIdToStage.get(taskSet.stageId).foreach { abortStage(_, reason, exception) } } + private[scheduler] def handleTasksAborted( + stageId: Int, + tasks: Seq[Task[_]]): Unit = { + for { + stage <- stageIdToStage.get(stageId) + task <- tasks + } stage.pendingPartitions -= task.partitionId + } + private[scheduler] def cleanUpAfterSchedulerStop() { for (job <- activeJobs) { val error = @@ -945,12 +961,21 @@ class DAGScheduler( } } - /** Called when stage's parents are available and we can now do its task. */ + /** + * Called when stage's parents are available and we can now run its task. + * This only submits the partitions which are missing and have not been + * submitted to the lower-level scheduler for execution. + */ private def submitMissingTasks(stage: Stage, jobId: Int) { logDebug("submitMissingTasks(" + stage + ")") - // First figure out the indexes of partition ids to compute. - val partitionsToCompute: Seq[Int] = stage.findMissingPartitions() + val missingPartitions = stage.findMissingPartitions() + val partitionsToCompute = missingPartitions.filterNot(stage.pendingPartitions) + stage.pendingPartitions ++= partitionsToCompute + + if (partitionsToCompute.isEmpty) { + return + } // Use the scheduling pool, job group, description, etc. from an ActiveJob associated // with this Stage @@ -963,10 +988,11 @@ class DAGScheduler( // event. stage match { case s: ShuffleMapStage => - outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1) + outputCommitCoordinator.stageStart(stage = s.id, partitionsToCompute, + maxPartitionId = s.numPartitions - 1) case s: ResultStage => outputCommitCoordinator.stageStart( - stage = s.id, maxPartitionId = s.rdd.partitions.length - 1) + stage = s.id, partitionsToCompute, maxPartitionId = s.rdd.partitions.length - 1) } val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try { stage match { @@ -1027,11 +1053,9 @@ class DAGScheduler( val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array() stage match { case stage: ShuffleMapStage => - stage.pendingPartitions.clear() partitionsToCompute.map { id => val locs = taskIdToLocations(id) val part = stage.rdd.partitions(id) - stage.pendingPartitions += id new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) @@ -1057,6 +1081,7 @@ class DAGScheduler( if (tasks.size > 0) { logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " + s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})") + logDebug("New pending partitions: " + stage.pendingPartitions) taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties)) stage.latestInfo.submissionTime = Some(clock.getTimeMillis()) @@ -1162,6 +1187,7 @@ class DAGScheduler( val stage = stageIdToStage(task.stageId) event.reason match { case Success => + stage.pendingPartitions -= task.partitionId task match { case rt: ResultTask[_, _] => // Cast to ResultStage here because it's part of the ResultTask @@ -1179,6 +1205,12 @@ class DAGScheduler( cleanupStateForJobAndIndependentStages(job) listenerBus.post( SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded)) + } else if (resultStage.pendingPartitions.isEmpty) { + logInfo("Resubmitting " + resultStage + " (" + resultStage.name + + ") because some of its tasks had failed: " + + resultStage.findMissingPartitions().mkString(", ")) + markStageAsFinished(resultStage) + submitStage(resultStage) } // taskSucceeded runs some user code that might throw an exception. Make sure @@ -1201,30 +1233,12 @@ class DAGScheduler( val status = event.result.asInstanceOf[MapStatus] val execId = status.location.executorId logDebug("ShuffleMapTask finished on " + execId) - if (stageIdToStage(task.stageId).latestInfo.attemptId == task.stageAttemptId) { - // This task was for the currently running attempt of the stage. Since the task - // completed successfully from the perspective of the TaskSetManager, mark it as - // no longer pending (the TaskSetManager may consider the task complete even - // when the output needs to be ignored because the task's epoch is too small below. - // In this case, when pending partitions is empty, there will still be missing - // output locations, which will cause the DAGScheduler to resubmit the stage below.) - shuffleStage.pendingPartitions -= task.partitionId - } - if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { - logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") - } else { - // The epoch of the task is acceptable (i.e., the task was launched after the most - // recent failure we're aware of for the executor), so mark the task's output as - // available. - shuffleStage.addOutputLoc(smt.partitionId, status) - // Remove the task's partition from pending partitions. This may have already been - // done above, but will not have been done yet in cases where the task attempt was - // from an earlier attempt of the stage (i.e., not the attempt that's currently - // running). This allows the DAGScheduler to mark the stage as complete when one - // copy of each task has finished successfully, even if the currently active stage - // still has tasks running. - shuffleStage.pendingPartitions -= task.partitionId - } + shuffleStage.addOutputLoc(smt.partitionId, status) + + mapOutputTracker.registerMapOutput( + shuffleStage.shuffleDep.shuffleId, + smt.partitionId, + status) if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) { markStageAsFinished(shuffleStage) @@ -1270,7 +1284,6 @@ class DAGScheduler( logInfo("Resubmitted " + task + ", so marking it as still running") stage match { case sms: ShuffleMapStage => - sms.pendingPartitions += task.partitionId case _ => assert(false, "TaskSetManagers should only send Resubmitted task statuses for " + @@ -1278,68 +1291,15 @@ class DAGScheduler( } case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) => + stage.pendingPartitions -= task.partitionId val failedStage = stageIdToStage(task.stageId) val mapStage = shuffleIdToMapStage(shuffleId) - if (failedStage.latestInfo.attemptId != task.stageAttemptId) { - logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" + - s" ${task.stageAttemptId} and there is a more recent attempt for that stage " + - s"(attempt ID ${failedStage.latestInfo.attemptId}) running") - } else { - // It is likely that we receive multiple FetchFailed for a single stage (because we have - // multiple tasks running concurrently on different executors). In that case, it is - // possible the fetch failure has already been handled by the scheduler. - if (runningStages.contains(failedStage)) { - logInfo(s"Marking $failedStage (${failedStage.name}) as failed " + - s"due to a fetch failure from $mapStage (${mapStage.name})") - markStageAsFinished(failedStage, Some(failureMessage)) - } else { - logDebug(s"Received fetch failure from $task, but its from $failedStage which is no " + - s"longer running") - } - - failedStage.fetchFailedAttemptIds.add(task.stageAttemptId) - val shouldAbortStage = - failedStage.fetchFailedAttemptIds.size >= maxConsecutiveStageAttempts || - disallowStageRetryForTest - - if (shouldAbortStage) { - val abortMessage = if (disallowStageRetryForTest) { - "Fetch failure will not retry stage due to testing config" - } else { - s"""$failedStage (${failedStage.name}) - |has failed the maximum allowable number of - |times: $maxConsecutiveStageAttempts. - |Most recent failure reason: $failureMessage""".stripMargin.replaceAll("\n", " ") - } - abortStage(failedStage, abortMessage, None) - } else { // update failedStages and make sure a ResubmitFailedStages event is enqueued - // TODO: Cancel running tasks in the failed stage -- cf. SPARK-17064 - val noResubmitEnqueued = !failedStages.contains(failedStage) - failedStages += failedStage - failedStages += mapStage - if (noResubmitEnqueued) { - // We expect one executor failure to trigger many FetchFailures in rapid succession, - // but all of those task failures can typically be handled by a single resubmission of - // the failed stage. We avoid flooding the scheduler's event queue with resubmit - // messages by checking whether a resubmit is already in the event queue for the - // failed stage. If there is already a resubmit enqueued for a different failed - // stage, that event would also be sufficient to handle the current failed stage, but - // producing a resubmit for each failed stage makes debugging and logging a little - // simpler while not producing an overwhelming number of scheduler events. - logInfo( - s"Resubmitting $mapStage (${mapStage.name}) and " + - s"$failedStage (${failedStage.name}) due to fetch failure" - ) - messageScheduler.schedule( - new Runnable { - override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) - }, - DAGScheduler.RESUBMIT_TIMEOUT, - TimeUnit.MILLISECONDS - ) - } - } + // It is possible that the map output was regenerated by rerun of the stage and the + // fetch failure is being reported for stale map output. In that case, we should just + // ignore the fetch failure and relaunch the task with latest map output info. + for(epochForMapOutput <- mapOutputTracker.getEpochForMapOutput(shuffleId, mapId) if + epochForMapOutput <= task.epoch) { // Mark the map whose fetch failed as broken in the map stage if (mapId != -1) { mapStage.removeOutputLoc(mapId, bmAddress) @@ -1352,6 +1312,61 @@ class DAGScheduler( } } + // It is likely that we receive multiple FetchFailed for a single stage (because we have + // multiple tasks running concurrently on different executors). In that case, it is + // possible the fetch failure has already been handled by the scheduler. + if (runningStages.contains(failedStage)) { + logInfo(s"Marking $failedStage (${failedStage.name}) as failed " + + s"due to a fetch failure from $mapStage (${mapStage.name})") + markStageAsFinished(failedStage, Some(failureMessage)) + } else { + logDebug(s"Received fetch failure from $task, but its from $failedStage which is no " + + s"longer running") + } + + failedStage.fetchFailedAttemptIds.add(task.stageAttemptId) + val shouldAbortStage = + failedStage.fetchFailedAttemptIds.size >= maxConsecutiveStageAttempts || + disallowStageRetryForTest + + if (shouldAbortStage) { + val abortMessage = if (disallowStageRetryForTest) { + "Fetch failure will not retry stage due to testing config" + } else { + s"""$failedStage (${failedStage.name}) + |has failed the maximum allowable number of + |times: $maxConsecutiveStageAttempts. + |Most recent failure reason: $failureMessage""".stripMargin.replaceAll("\n", " ") + } + abortStage(failedStage, abortMessage, None) + } else { + // update failedStages and make sure a ResubmitFailedStages event is enqueued + val noResubmitEnqueued = !failedStages.contains(failedStage) + failedStages += failedStage + failedStages += mapStage + if (noResubmitEnqueued) { + // We expect one executor failure to trigger many FetchFailures in rapid succession, + // but all of those task failures can typically be handled by a single resubmission of + // the failed stage. We avoid flooding the scheduler's event queue with resubmit + // messages by checking whether a resubmit is already in the event queue for the + // failed stage. If there is already a resubmit enqueued for a different failed + // stage, that event would also be sufficient to handle the current failed stage, but + // producing a resubmit for each failed stage makes debugging and logging a little + // simpler while not producing an overwhelming number of scheduler events. + logInfo( + s"Resubmitting $mapStage (${mapStage.name}) and " + + s"$failedStage (${failedStage.name}) due to fetch failure" + ) + messageScheduler.schedule( + new Runnable { + override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) + }, + DAGScheduler.RESUBMIT_TIMEOUT, + TimeUnit.MILLISECONDS + ) + } + } + case commitDenied: TaskCommitDenied => // Do nothing here, left up to the TaskScheduler to decide how to handle denied commits @@ -1468,7 +1483,6 @@ class DAGScheduler( logInfo(s"$stage (${stage.name}) failed in $serviceTime s due to ${errorMessage.get}") } - outputCommitCoordinator.stageEnd(stage.id) listenerBus.post(SparkListenerStageCompleted(stage.latestInfo)) runningStages -= stage } @@ -1718,6 +1732,9 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler case TaskSetFailed(taskSet, reason, exception) => dagScheduler.handleTaskSetFailed(taskSet, reason, exception) + case TasksAborted(stageId, tasks) => + dagScheduler.handleTasksAborted(stageId, tasks) + case ResubmitFailedStages => dagScheduler.resubmitFailedStages() } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala index cda0585f154a9..b2a9e3905527b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala @@ -90,4 +90,7 @@ private[scheduler] case class TaskSetFailed(taskSet: TaskSet, reason: String, exception: Option[Throwable]) extends DAGSchedulerEvent +private[scheduler] +case class TasksAborted(stageId: Int, tasks: Seq[Task[_]]) extends DAGSchedulerEvent + private[scheduler] case object ResubmitFailedStages extends DAGSchedulerEvent diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index 83d87b548a430..3625f25aadff7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -59,20 +59,12 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) * exclusive lock on committing task output for that partition, as well as any known failed * attempts in the stage. * - * Entries are added to the top-level map when stages start and are removed they finish - * (either successfully or unsuccessfully). + * Entries are added to the top-level map when stages start. * * Access to this map should be guarded by synchronizing on the OutputCommitCoordinator instance. */ private val stageStates = mutable.Map[StageId, StageState]() - /** - * Returns whether the OutputCommitCoordinator's internal data structures are all empty. - */ - def isEmpty: Boolean = { - stageStates.isEmpty - } - /** * Called by tasks to ask whether they can commit their output to HDFS. * @@ -109,13 +101,13 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) * @param maxPartitionId the maximum partition id that could appear in this stage's tasks (i.e. * the maximum possible value of `context.partitionId`). */ - private[scheduler] def stageStart(stage: StageId, maxPartitionId: Int): Unit = synchronized { - stageStates(stage) = new StageState(maxPartitionId + 1) - } - - // Called by DAGScheduler - private[scheduler] def stageEnd(stage: StageId): Unit = synchronized { - stageStates.remove(stage) + private[scheduler] def stageStart(stage: StageId, + partitionsToCompute: Seq[Int], + maxPartitionId: Int): Unit = synchronized { + val stageState = stageStates.getOrElseUpdate(stage, new StageState(maxPartitionId + 1)) + for (i <- partitionsToCompute) { + stageState.authorizedCommitters(i) = NO_AUTHORIZED_COMMITTER + } } // Called by DAGScheduler diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala index db4d9efa2270c..295b0007efb09 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala @@ -49,17 +49,6 @@ private[spark] class ShuffleMapStage( private[this] var _numAvailableOutputs: Int = 0 - /** - * Partitions that either haven't yet been computed, or that were computed on an executor - * that has since been lost, so should be re-computed. This variable is used by the - * DAGScheduler to determine when a stage has completed. Task successes in both the active - * attempt for the stage or in earlier attempts for this stage can cause paritition ids to get - * removed from pendingPartitions. As a result, this variable may be inconsistent with the pending - * tasks in the TaskSetManager for the active attempt for the stage (the partitions stored here - * will always be a subset of the partitions that the TaskSetManager thinks are pending). - */ - val pendingPartitions = new HashSet[Int] - /** * List of [[MapStatus]] for each partition. The index of the array is the map partition id, * and each value in the array is the list of possible [[MapStatus]] for a partition diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index 290fd073caf27..cafb30ad586b1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -17,6 +17,7 @@ package org.apache.spark.scheduler +import scala.collection.mutable import scala.collection.mutable.HashSet import org.apache.spark.executor.TaskMetrics @@ -67,6 +68,12 @@ private[scheduler] abstract class Stage( /** Set of jobs that this stage belongs to. */ val jobIds = new HashSet[Int] + /** + * Set of partitions which have been submitted to the lower-level scheduler and + * they should not be resubmitted when rerun of the stage. + */ + val pendingPartitions = new HashSet[Int] + /** The ID to use for the next new attempt for this stage. */ private var nextAttemptId: Int = 0 diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 07aea773fa632..0578ce7a7c08f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -195,13 +195,6 @@ private[spark] class TaskSchedulerImpl private[scheduler]( val stageTaskSets = taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager]) stageTaskSets(taskSet.stageAttemptId) = manager - val conflictingTaskSet = stageTaskSets.exists { case (_, ts) => - ts.taskSet != taskSet && !ts.isZombie - } - if (conflictingTaskSet) { - throw new IllegalStateException(s"more than one active taskSet for stage $stage:" + - s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}") - } schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) if (!isLocal && !hasReceivedTask) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index a177aab5f95de..44f7c00ad43f8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -109,8 +109,8 @@ private[spark] class TaskSetManager( // the zombie state once at least one attempt of each task has completed successfully, or if the // task set is aborted (for example, because it was killed). TaskSetManagers remain in the zombie // state until all tasks have finished running; we keep TaskSetManagers that are in the zombie - // state in order to continue to track and account for the running tasks. - // TODO: We should kill any running task attempts when the task set manager becomes a zombie. + // state in order to continue to track and account for the running tasks. The tasks running in the + // zombie TaskSetManagers are not rerun by the DagScheduler unless they fail. private[scheduler] var isZombie = false // Set of pending tasks for each executor. These collections are actually @@ -767,12 +767,29 @@ private[spark] class TaskSetManager( s" executor ${info.executorId}): ${reason.toErrorString}" val failureException: Option[Throwable] = reason match { case fetchFailed: FetchFailed => + if (!isZombie) { + // Only for the first occurrence of the fetch failure, get the list + // of all non-running and non-successful tasks and notify the + // DagScheduler of their abortion so that they can be rescheduled in retry + // of the stage. Note that this does not include the fetch failed tasks, + // because that is separately handled by the DagScheduler. + val abortedTasks = new ArrayBuffer[Task[_]] + for (i <- 0 until numTasks) { + if (i != index && !successful(i) && copiesRunning(i) == 0) { + abortedTasks += taskSet.tasks(i) + } + } + if (!abortedTasks.isEmpty) { + sched.dagScheduler.tasksAborted(abortedTasks(0).stageId, abortedTasks) + } + isZombie = true + } + logWarning(failureReason) if (!successful(index)) { successful(index) = true tasksSuccessful += 1 } - isZombie = true None case ef: ExceptionFailure => @@ -825,6 +842,14 @@ private[spark] class TaskSetManager( sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, info) + if (reason != Success && !reason.isInstanceOf[FetchFailed] && isZombie) { + // If the TaskSetManager is in zombie mode, we should inform the DAGScheduler to abort + // the task in case of failure so that the DagScheduler can rerun it in the retry of + // the stage. Please note that we exclude fetch failed tasks, because they are handled + // by the DAGScheduler separately. + sched.dagScheduler.tasksAborted(tasks(index).stageId, Array(tasks(index))) + } + if (successful(index)) { logInfo(s"Task ${info.id} in stage ${taskSet.id} (TID $tid) failed, but the task will not" + s" be re-executed (either because the task failed with a shuffle data fetch failure," + diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index a10941b579fe2..f574e9cc18df9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -606,6 +606,320 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou assertDataStructuresEmpty() } + test("Test no duplicate shuffle map tasks running on fetch failure (SPARK-14649)") { + val firstRDD = new MyRDD(sc, 2, Nil) + val firstShuffleDep = new ShuffleDependency(firstRDD, new HashPartitioner(2)) + val firstShuffleId = firstShuffleDep.shuffleId + val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep)) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) + val reduceRdd = new MyRDD(sc, 1, List(shuffleDep)) + submit(reduceRdd, Array(0)) + + // things start out smoothly, stage 0 completes with no issues + complete(taskSets(0), Seq( + (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)), + (Success, makeMapStatus("hostA", shuffleMapRdd.partitions.length)) + )) + + // Begin event for the reduce tasks. + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + + // fail taks 0 in stage 1 due to fetch failure + val failedTask1 = taskSets(1).tasks(0) + runEvent(makeCompletionEvent( + failedTask1, + FetchFailed(makeBlockManagerId("hostA"), firstShuffleId, 0, 0, "ignored"), + 42, Seq.empty, createFakeTaskInfo())) + + // abort task 1 in stage 1 due to fetch failure, task 2 still running + val abortedTask1 = taskSets(1).tasks(1) + runEvent(new TasksAborted(1, List(abortedTask1))) + + // Make sure that we still have 2 running tasks for the first attempt + assert(sparkListener.failedStages.contains(1)) + + // Wait for resubmission of the map stage + Thread.sleep(1000) + + // so we resubmit stage 0, which completes happily + val stage0Resubmit1 = taskSets(2) + assert(stage0Resubmit1.stageId == 0) + assert(stage0Resubmit1.stageAttemptId === 1) + val task1 = stage0Resubmit1.tasks(0) + runEvent(makeCompletionEvent( + task1, + Success, + makeMapStatus("hostC", shuffleMapRdd.partitions.length), + Seq.empty, createFakeTaskInfo())) + + // we will now have a task set representing + // the second attempt for stage 1, but we *also* have 1 task for the first attempt for + // stage 1 still going, so we make sure that we don't resubmit the already running tasks. + val stage1Resubmit1 = taskSets(3) + assert(stage1Resubmit1.stageId == 1) + assert(stage1Resubmit1.stageAttemptId === 1) + assert(stage1Resubmit1.tasks.length === 2) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + + // Now fail the running task from the first attempt and + // succeed all others + val succeededTask1 = taskSets(3).tasks(0) + runEvent(makeCompletionEvent( + succeededTask1, + Success, + makeMapStatus("hostC", reduceRdd.partitions.length), + Seq.empty, createFakeTaskInfo())) + + val failedTask2 = taskSets(1).tasks(2) + runEvent(makeCompletionEvent( + failedTask2, + FetchFailed(makeBlockManagerId("hostB"), firstShuffleId, 0, 0, "ignored"), + 42, Seq.empty, createFakeTaskInfo())) + + val succeededTask2 = taskSets(3).tasks(1) + runEvent(makeCompletionEvent( + succeededTask2, + Success, + makeMapStatus("hostC", reduceRdd.partitions.length), + Seq.empty, createFakeTaskInfo())) + + // Sleep for some time for the completion event to be processed by the DagScheduler + // and make sure that stage 0 is resumbitted. + Thread.sleep(1000) + val stage0Resubmit2 = taskSets(4) + assert(stage0Resubmit2.stageId == 0) + assert(stage0Resubmit2.stageAttemptId === 2) + assert(stage0Resubmit2.tasks.length === 1) + val task2 = stage0Resubmit2.tasks(0) + runEvent(makeCompletionEvent( + task2, + Success, + makeMapStatus("hostC", shuffleMapRdd.partitions.length))) + + Thread.sleep(1000) + // Make sure we resubmit the only failed tasks in stage 1. + val stage1Resubmit2 = taskSets(5) + assert(stage1Resubmit2.stageId == 1) + assert(stage1Resubmit2.stageAttemptId === 2) + assert(stage1Resubmit2.tasks.length === 1) + } + + test("Test allow commit from task from staged failed due to fetch failure (SPARK-14649)") { + val firstRDD = new MyRDD(sc, 2, Nil) + val firstShuffleDep = new ShuffleDependency(firstRDD, new HashPartitioner(2)) + val firstShuffleId = firstShuffleDep.shuffleId + val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep)) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) + val reduceRdd = new MyRDD(sc, 1, List(shuffleDep)) + submit(reduceRdd, Array(0)) + + // things start out smoothly, stage 0 completes with no issues + complete(taskSets(0), Seq( + (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)), + (Success, makeMapStatus("hostA", shuffleMapRdd.partitions.length)) + )) + + // Begin event for the reduce tasks. + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + + // fail taks 0 in stage 1 due to fetch failure + val failedTask1 = taskSets(1).tasks(0) + runEvent(makeCompletionEvent( + failedTask1, + FetchFailed(makeBlockManagerId("hostA"), firstShuffleId, 0, 0, "ignored"), + 42, Seq.empty, createFakeTaskInfo())) + + // abort task 1 in stage 1 due to fetch failure, task 2 still running + val abortedTask1 = taskSets(1).tasks(1) + runEvent(new TasksAborted(1, List(abortedTask1))) + + // Make sure that we still have 2 running tasks for the first attempt + assert(sparkListener.failedStages.contains(1)) + + // Wait for resubmission of the map stage + Thread.sleep(1000) + + // so we resubmit stage 0, which completes happily + val stage0Resubmit1 = taskSets(2) + assert(stage0Resubmit1.stageId == 0) + assert(stage0Resubmit1.stageAttemptId === 1) + + // Now the running task from the first attempt tries to commit the output and we + // make sure it succeeds. + assert(scheduler.outputCommitCoordinator.canCommit(1, + taskSets(1).tasks(2).partitionId, 0)) + } + + test("Test no duplicate shuffle reduce tasks running on fetch failure (SPARK-14649)") { + val shuffleMapRdd = new MyRDD(sc, 2, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) + val shuffleId = shuffleDep.shuffleId + val reduceRdd = new MyRDD(sc, 3, List(shuffleDep), tracker = mapOutputTracker) + submit(reduceRdd, Array(0, 1, 2)) + complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", reduceRdd.partitions.length)), + (Success, makeMapStatus("hostB", reduceRdd.partitions.length)))) + // Begin event for the reduce tasks. + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + // Fail one and abort one of the reduce tasks due to fetch failure. + assert(taskSets(1).tasks.size === 3) + val failedTask = taskSets(1).tasks(0) + runEvent(makeCompletionEvent( + failedTask, + FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), + 42, Seq.empty, createFakeTaskInfo())) + val abortedTask = taskSets(1).tasks(1) + runEvent(new TasksAborted(1, List(abortedTask))) + // note that taskSet(1).tasks(2) will be still running state so + // it should not be resumbitted in the next retry + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + assert(sparkListener.failedStages.contains(1)) + + // Wait for resubmission of the map stage + Thread.sleep(1000) + // Retry of the map stage finishes happily + assert(taskSets(2).tasks.size === 1) + complete(taskSets(2), Seq( + (Success, makeMapStatus("hostC", reduceRdd.partitions.length)))) + + // Newly submitted taskSet for the reduce phase should not contain the + // running task. + assert(taskSets(3).tasks.size == 2) + + // Finish the taskSet(3) successfully + runEvent(makeCompletionEvent( + taskSets(3).tasks(0), Success, 42, + Seq.empty, createFakeTaskInfo())) + + runEvent(makeCompletionEvent( + taskSets(3).tasks(1), Success, 42, + Seq.empty, createFakeTaskInfo())) + + // Fail the running tasks in taskSets(1) and make sure its resubmitted + runEvent(makeCompletionEvent( + taskSets(1).tasks(2), + FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), + 42, Seq.empty, createFakeTaskInfo())) + + // Make sure that we resubmit the failed reduce task + Thread.sleep(1000) + assert(taskSets(4).tasks.size == 1) + } + + test("Test fetch failure on stale map output do not " + + "cause resubmission of the stage (SPARK-14649)") { + val shuffleMapRdd = new MyRDD(sc, 2, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) + val shuffleId = shuffleDep.shuffleId + val reduceRdd = new MyRDD(sc, 3, List(shuffleDep), tracker = mapOutputTracker) + submit(reduceRdd, Array(0, 1, 2)) + complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", reduceRdd.partitions.length)), + (Success, makeMapStatus("hostB", reduceRdd.partitions.length)))) + // Begin event for the reduce tasks. + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + // Fail one and abort one of the reduce tasks due to fetch failure. + assert(taskSets(1).tasks.size === 3) + val failedTask = taskSets(1).tasks(0) + runEvent(makeCompletionEvent( + failedTask, + FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), + 42, Seq.empty, createFakeTaskInfo())) + val abortedTask = taskSets(1).tasks(1) + runEvent(new TasksAborted(1, List(abortedTask))) + // note that taskSet(1).tasks(2) will be still running state so + // it should not be resumbitted in the next retry + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + assert(sparkListener.failedStages.contains(1)) + + // Wait for resubmission of the map stage + Thread.sleep(1000) + // Retry of the map stage finishes happily + assert(taskSets(2).tasks.size === 1) + complete(taskSets(2), Seq( + (Success, makeMapStatus("hostA", reduceRdd.partitions.length)))) + + + Thread.sleep(1000) + // Newly submitted taskSet for the reduce phase should not contain the + // running task. + assert(taskSets(3).tasks.size == 2) + + // Fetch fail taskSet(1).tasks(2) for mapId 0 which has already been re-computed + // by rerun of the stage 0. This fetch failure should be ignored and this should not + // trigger another rerun of the stage 0. + runEvent(makeCompletionEvent( + taskSets(1).tasks(2), + FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 2, "ignored"), + 42, Seq.empty, createFakeTaskInfo())) + + // Wait for resubmission of the map stage + Thread.sleep(1000) + + // Newly submitted taskSet for the reduce phase should not contain the + // running task. + + assert(taskSets(4).tasks.size == 1) + assert(taskSets(4).tasks(0).stageId == 1) + assert(taskSets(4).tasks(0).stageAttemptId == 2) + } + + test("Test task rerun in case of failure in zombie taskSet (SPARK-14649)") { + val shuffleMapRdd = new MyRDD(sc, 2, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) + val shuffleId = shuffleDep.shuffleId + val reduceRdd = new MyRDD(sc, 3, List(shuffleDep), tracker = mapOutputTracker) + submit(reduceRdd, Array(0, 1, 2)) + complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", reduceRdd.partitions.length)), + (Success, makeMapStatus("hostB", reduceRdd.partitions.length)))) + // Begin event for the reduce tasks. + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + // Fail one and abort one of the reduce tasks due to fetch failure. + assert(taskSets(1).tasks.size === 3) + val failedTask = taskSets(1).tasks(0) + runEvent(makeCompletionEvent( + failedTask, + FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), + 42, Seq.empty, createFakeTaskInfo())) + val abortedTask = taskSets(1).tasks(1) + runEvent(new TasksAborted(1, List(abortedTask))) + // note that taskSet(1).tasks(2) will be still running state so + // it should not be resumbitted in the next retry + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + assert(sparkListener.failedStages.contains(1)) + + // Wait for resubmission of the map stage + Thread.sleep(1000) + // Retry of the map stage finishes happily + assert(taskSets(2).tasks.size === 1) + complete(taskSets(2), Seq( + (Success, makeMapStatus("hostC", reduceRdd.partitions.length)))) + + // Newly submitted taskSet for the reduce phase should not contain the + // running task. + assert(taskSets(3).tasks.size == 2) + + // Abort the running tasks in taskSets(1) (due to some exception failure). Since the + // taskSet(1) is in zombie state now, the DagScheduler should rerun the + // aborted task. + + runEvent(new TasksAborted(1, List(taskSets(1).tasks(2)))) + + // Finish the taskSet(3) successfully + runEvent(makeCompletionEvent( + taskSets(3).tasks(0), Success, 42, + Seq.empty, createFakeTaskInfo())) + + runEvent(makeCompletionEvent( + taskSets(3).tasks(1), Success, 42, + Seq.empty, createFakeTaskInfo())) + + // Make sure that we resubmit the failed reduce task + Thread.sleep(1000) + assert(taskSets(4).tasks.size == 1) + } + test("run trivial shuffle with fetch failure") { val shuffleMapRdd = new MyRDD(sc, 2, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) @@ -1084,6 +1398,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou // Trigger resubmission of the failed map stage and finish the re-started map task. runEvent(ResubmitFailedStages) + assert(taskSets(2).tasks.size === 1) complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1)))) // Because the map stage finished, another attempt for the reduce stage should have been @@ -1102,8 +1417,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou // the FetchFailed should have been ignored runEvent(ResubmitFailedStages) - // The FetchFailed from the original reduce stage should be ignored. - assert(countSubmittedMapStageAttempts() === 2) + // The FetchFailed from the original reduce stage should not be ignored. + assert(countSubmittedMapStageAttempts() === 3) } test("task events always posted in speculation / when stage is killed") { @@ -1152,63 +1467,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou assert(sparkListener.endedTasks.size == 6) } - test("ignore late map task completions") { - val shuffleMapRdd = new MyRDD(sc, 2, Nil) - val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) - val shuffleId = shuffleDep.shuffleId - val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker) - submit(reduceRdd, Array(0, 1)) - - // pretend we were told hostA went away - val oldEpoch = mapOutputTracker.getEpoch - runEvent(ExecutorLost("exec-hostA", ExecutorKilled)) - val newEpoch = mapOutputTracker.getEpoch - assert(newEpoch > oldEpoch) - - // now start completing some tasks in the shuffle map stage, under different hosts - // and epochs, and make sure scheduler updates its state correctly - val taskSet = taskSets(0) - val shuffleStage = scheduler.stageIdToStage(taskSet.stageId).asInstanceOf[ShuffleMapStage] - assert(shuffleStage.numAvailableOutputs === 0) - - // should be ignored for being too old - runEvent(makeCompletionEvent( - taskSet.tasks(0), - Success, - makeMapStatus("hostA", reduceRdd.partitions.size))) - assert(shuffleStage.numAvailableOutputs === 0) - - // should work because it's a non-failed host (so the available map outputs will increase) - runEvent(makeCompletionEvent( - taskSet.tasks(0), - Success, - makeMapStatus("hostB", reduceRdd.partitions.size))) - assert(shuffleStage.numAvailableOutputs === 1) - - // should be ignored for being too old - runEvent(makeCompletionEvent( - taskSet.tasks(0), - Success, - makeMapStatus("hostA", reduceRdd.partitions.size))) - assert(shuffleStage.numAvailableOutputs === 1) - - // should work because it's a new epoch, which will increase the number of available map - // outputs, and also finish the stage - taskSet.tasks(1).epoch = newEpoch - runEvent(makeCompletionEvent( - taskSet.tasks(1), - Success, - makeMapStatus("hostA", reduceRdd.partitions.size))) - assert(shuffleStage.numAvailableOutputs === 2) - assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === - HashSet(makeBlockManagerId("hostB"), makeBlockManagerId("hostA"))) - - // finish the next stage normally, which completes the job - complete(taskSets(1), Seq((Success, 42), (Success, 43))) - assert(results === Map(0 -> 42, 1 -> 43)) - assertDataStructuresEmpty() - } - test("run shuffle with map stage failure") { val shuffleMapRdd = new MyRDD(sc, 2, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) @@ -1312,11 +1570,11 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou // now here is where things get tricky : we will now have a task set representing // the second attempt for stage 1, but we *also* have some tasks for the first attempt for - // stage 1 still going + // stage 1 still going, so we make sure that we don't resubmit the already running tasks. val stage1Resubmit = taskSets(3) assert(stage1Resubmit.stageId == 1) assert(stage1Resubmit.stageAttemptId === 1) - assert(stage1Resubmit.tasks.length === 3) + assert(stage1Resubmit.tasks.length === 1) // we'll have some tasks finish from the first attempt, and some finish from the second attempt, // so that we actually have all stage outputs, though no attempt has completed all its @@ -1326,7 +1584,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou Success, makeMapStatus("hostC", reduceRdd.partitions.length))) runEvent(makeCompletionEvent( - taskSets(3).tasks(1), + taskSets(1).tasks(1), Success, makeMapStatus("hostC", reduceRdd.partitions.length))) // late task finish from the first attempt @@ -1575,50 +1833,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou assertDataStructuresEmpty() } - /** - * In this test, we run a map stage where one of the executors fails but we still receive a - * "zombie" complete message from a task that ran on that executor. We want to make sure the - * stage is resubmitted so that the task that ran on the failed executor is re-executed, and - * that the stage is only marked as finished once that task completes. - */ - test("run trivial shuffle with out-of-band executor failure and retry") { - val shuffleMapRdd = new MyRDD(sc, 2, Nil) - val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) - val shuffleId = shuffleDep.shuffleId - val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker) - submit(reduceRdd, Array(0)) - // Tell the DAGScheduler that hostA was lost. - runEvent(ExecutorLost("exec-hostA", ExecutorKilled)) - complete(taskSets(0), Seq( - (Success, makeMapStatus("hostA", 1)), - (Success, makeMapStatus("hostB", 1)))) - - // At this point, no more tasks are running for the stage (and the TaskSetManager considers the - // stage complete), but the tasks that ran on HostA need to be re-run, so the DAGScheduler - // should re-submit the stage with one task (the task that originally ran on HostA). - assert(taskSets.size === 2) - assert(taskSets(1).tasks.size === 1) - - // Make sure that the stage that was re-submitted was the ShuffleMapStage (not the reduce - // stage, which shouldn't be run until all of the tasks in the ShuffleMapStage complete on - // alive executors). - assert(taskSets(1).tasks(0).isInstanceOf[ShuffleMapTask]) - - // have hostC complete the resubmitted task - complete(taskSets(1), Seq((Success, makeMapStatus("hostC", 1)))) - assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === - HashSet(makeBlockManagerId("hostC"), makeBlockManagerId("hostB"))) - - // Make sure that the reduce stage was now submitted. - assert(taskSets.size === 3) - assert(taskSets(2).tasks(0).isInstanceOf[ResultTask[_, _]]) - - // Complete the reduce stage. - complete(taskSets(2), Seq((Success, 42))) - assert(results === Map(0 -> 42)) - assertDataStructuresEmpty() - } - test("recursive shuffle failures") { val shuffleOneRdd = new MyRDD(sc, 2, Nil) val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, new HashPartitioner(2)) @@ -2054,73 +2268,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou assertDataStructuresEmpty() } - /** - * In this test, we run a map stage where one of the executors fails but we still receive a - * "zombie" complete message from that executor. We want to make sure the stage is not reported - * as done until all tasks have completed. - * - * Most of the functionality in this test is tested in "run trivial shuffle with out-of-band - * executor failure and retry". However, that test uses ShuffleMapStages that are followed by - * a ResultStage, whereas in this test, the ShuffleMapStage is tested in isolation, without a - * ResultStage after it. - */ - test("map stage submission with executor failure late map task completions") { - val shuffleMapRdd = new MyRDD(sc, 3, Nil) - val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) - - submitMapStage(shuffleDep) - - val oldTaskSet = taskSets(0) - runEvent(makeCompletionEvent(oldTaskSet.tasks(0), Success, makeMapStatus("hostA", 2))) - assert(results.size === 0) // Map stage job should not be complete yet - - // Pretend host A was lost. This will cause the TaskSetManager to resubmit task 0, because it - // completed on hostA. - val oldEpoch = mapOutputTracker.getEpoch - runEvent(ExecutorLost("exec-hostA", ExecutorKilled)) - val newEpoch = mapOutputTracker.getEpoch - assert(newEpoch > oldEpoch) - - // Suppose we also get a completed event from task 1 on the same host; this should be ignored - runEvent(makeCompletionEvent(oldTaskSet.tasks(1), Success, makeMapStatus("hostA", 2))) - assert(results.size === 0) // Map stage job should not be complete yet - - // A completion from another task should work because it's a non-failed host - runEvent(makeCompletionEvent(oldTaskSet.tasks(2), Success, makeMapStatus("hostB", 2))) - - // At this point, no more tasks are running for the stage (and the TaskSetManager considers - // the stage complete), but the task that ran on hostA needs to be re-run, so the map stage - // shouldn't be marked as complete, and the DAGScheduler should re-submit the stage. - assert(results.size === 0) - assert(taskSets.size === 2) - - // Now complete tasks in the second task set - val newTaskSet = taskSets(1) - // 2 tasks should have been re-submitted, for tasks 0 and 1 (which ran on hostA). - assert(newTaskSet.tasks.size === 2) - // Complete task 0 from the original task set (i.e., not hte one that's currently active). - // This should still be counted towards the job being complete (but there's still one - // outstanding task). - runEvent(makeCompletionEvent(newTaskSet.tasks(0), Success, makeMapStatus("hostB", 2))) - assert(results.size === 0) - - // Complete the final task, from the currently active task set. There's still one - // running task, task 0 in the currently active stage attempt, but the success of task 0 means - // the DAGScheduler can mark the stage as finished. - runEvent(makeCompletionEvent(newTaskSet.tasks(1), Success, makeMapStatus("hostB", 2))) - assert(results.size === 1) // Map stage job should now finally be complete - assertDataStructuresEmpty() - - // Also test that a reduce stage using this shuffled data can immediately run - val reduceRDD = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker) - results.clear() - submit(reduceRDD, Array(0, 1)) - complete(taskSets(2), Seq((Success, 42), (Success, 43))) - assert(results === Map(0 -> 42, 1 -> 43)) - results.clear() - assertDataStructuresEmpty() - } - /** * Checks the DAGScheduler's internal logic for traversing an RDD DAG by making sure that * getShuffleDependencies correctly returns the direct shuffle dependencies of a particular @@ -2246,28 +2393,19 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou (Success, makeMapStatus("hostB", 2)))) // Complete task(stageId=1, stageAttemptId=0, partitionId=1) running on failed hostA - // successfully. The success should be ignored because the task started before the - // executor failed, so the output may have been lost. + // successfully. runEvent(makeCompletionEvent( taskSets(1).tasks(1), Success, makeMapStatus("hostA", 2))) - // Both tasks in rddB should be resubmitted, because none of them has succeeded truely. + // Only one task in rddB should be resubmitted, because one of them already completed. // Complete the task(stageId=1, stageAttemptId=1, partitionId=0) successfully. // Task(stageId=1, stageAttemptId=1, partitionId=1) of this new active stage attempt // is still running. assert(taskSets(3).stageId === 1 && taskSets(3).stageAttemptId === 1 - && taskSets(3).tasks.size === 2) + && taskSets(3).tasks.size === 1) runEvent(makeCompletionEvent( taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2))) - // There should be no new attempt of stage submitted, - // because task(stageId=1, stageAttempt=1, partitionId=1) is still running in - // the current attempt (and hasn't completed successfully in any earlier attempts). - assert(taskSets.size === 4) - - // Complete task(stageId=1, stageAttempt=1, partitionId=1) successfully. - runEvent(makeCompletionEvent( - taskSets(3).tasks(1), Success, makeMapStatus("hostB", 2))) // Now the ResultStage should be submitted, because all of the tasks of rddB have // completed successfully on alive executors. @@ -2297,7 +2435,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou assert(scheduler.runningStages.isEmpty) assert(scheduler.shuffleIdToMapStage.isEmpty) assert(scheduler.waitingStages.isEmpty) - assert(scheduler.outputCommitCoordinator.isEmpty) } // Nothing in this test should break if the task info's fields are null, but diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index e51e6a0d3ff6b..1772afd811caf 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -170,7 +170,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { val partition: Int = 2 val authorizedCommitter: Int = 3 val nonAuthorizedCommitter: Int = 100 - outputCommitCoordinator.stageStart(stage, maxPartitionId = 2) + outputCommitCoordinator.stageStart(stage, Array(1, 2), maxPartitionId = 2) assert(outputCommitCoordinator.canCommit(stage, partition, authorizedCommitter)) assert(!outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter)) @@ -201,7 +201,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { val stage: Int = 1 val partition: Int = 1 val failedAttempt: Int = 0 - outputCommitCoordinator.stageStart(stage, maxPartitionId = 1) + outputCommitCoordinator.stageStart(stage, Array(1), maxPartitionId = 1) outputCommitCoordinator.taskCompleted(stage, partition, attemptNumber = failedAttempt, reason = ExecutorLostFailure("0", exitCausedByApp = true, None)) assert(!outputCommitCoordinator.canCommit(stage, partition, failedAttempt)) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 8b9d45f734cda..a1724ea6b58ef 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -199,25 +199,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(taskDescriptions.map(_.executorId) === Seq("executor0")) } - test("refuse to schedule concurrent attempts for the same stage (SPARK-8103)") { - val taskScheduler = setupScheduler() - val attempt1 = FakeTask.createTaskSet(1, 0) - val attempt2 = FakeTask.createTaskSet(1, 1) - taskScheduler.submitTasks(attempt1) - intercept[IllegalStateException] { taskScheduler.submitTasks(attempt2) } - - // OK to submit multiple if previous attempts are all zombie - taskScheduler.taskSetManagerForAttempt(attempt1.stageId, attempt1.stageAttemptId) - .get.isZombie = true - taskScheduler.submitTasks(attempt2) - val attempt3 = FakeTask.createTaskSet(1, 2) - intercept[IllegalStateException] { taskScheduler.submitTasks(attempt3) } - taskScheduler.taskSetManagerForAttempt(attempt2.stageId, attempt2.stageAttemptId) - .get.isZombie = true - taskScheduler.submitTasks(attempt3) - assert(!failedTaskSet) - } - test("don't schedule more tasks after a taskset is zombie") { val taskScheduler = setupScheduler() diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 132caef0978fb..ec17710b33ad4 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -34,11 +34,17 @@ import org.apache.spark.util.{AccumulatorV2, ManualClock} class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler) extends DAGScheduler(sc) { + val abortedPartitions = new mutable.HashSet[Int] override def taskStarted(task: Task[_], taskInfo: TaskInfo) { taskScheduler.startedTasks += taskInfo.index } + override def tasksAborted(stageId: Int, tasks: Seq[Task[_]]): Unit = { + for (task <- tasks) { + abortedPartitions += task.partitionId + } + } override def taskEnded( task: Task[_], reason: TaskEndReason, @@ -415,6 +421,92 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg } } + test("pending tasks should be aborted after first fetch failure") { + val rescheduleDelay = 300L + val conf = new SparkConf(). + set("spark.scheduler.executorTaskBlacklistTime", rescheduleDelay.toString). + // don't wait to jump locality levels in this test + set("spark.locality.wait", "0") + + sc = new SparkContext("local", "test", conf) + // two executors on same host, one on different. + val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), + ("exec1.1", "host1"), ("exec2", "host2")) + // affinity to exec1 on host1 - which we will fail. + val taskSet = FakeTask.createTaskSet(4) + val clock = new ManualClock + clock.advance(1) + val manager = new TaskSetManager(sched, taskSet, 4, None, clock) + + val offerResult1 = manager.resourceOffer("exec1", "host1", ANY) + assert(offerResult1.isDefined, "Expect resource offer to return a task") + + assert(offerResult1.get.index === 0) + assert(offerResult1.get.executorId === "exec1") + + val offerResult2 = manager.resourceOffer("exec2", "host2", ANY) + assert(offerResult2.isDefined, "Expect resource offer to return a task") + + assert(offerResult2.get.index === 1) + assert(offerResult2.get.executorId === "exec2") + // At this point, we have 2 tasks running and 2 pending. First fetch failure should + // abort all the pending tasks but the running tasks should not be aborted. + manager.handleFailedTask(offerResult1.get.taskId, TaskState.FINISHED, + FetchFailed(BlockManagerId("exec-host2", "host2", 12345), 0, 0, 0, "ignored")) + val dagScheduler = sched.dagScheduler.asInstanceOf[FakeDAGScheduler] + assert(dagScheduler.abortedPartitions.size === 2) + + dagScheduler.abortedPartitions.clear() + // Second fetch failure should not notify the DagScheduler of the aborted tasks. + + manager.handleFailedTask(offerResult1.get.taskId, TaskState.FINISHED, + FetchFailed(BlockManagerId("exec-host2", "host2", 12345), 0, 0, 0, "ignored")) + assert(dagScheduler.abortedPartitions.size === 0) + } + + test("Failed tasks should be aborted in zombie mode") { + val rescheduleDelay = 300L + val conf = new SparkConf(). + set("spark.scheduler.executorTaskBlacklistTime", rescheduleDelay.toString). + // don't wait to jump locality levels in this test + set("spark.locality.wait", "0") + + sc = new SparkContext("local", "test", conf) + // two executors on same host, one on different. + val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), + ("exec1.1", "host1"), ("exec2", "host2")) + // affinity to exec1 on host1 - which we will fail. + val taskSet = FakeTask.createTaskSet(4) + val clock = new ManualClock + clock.advance(1) + val manager = new TaskSetManager(sched, taskSet, 4, None, clock) + + val offerResult1 = manager.resourceOffer("exec1", "host1", ANY) + assert(offerResult1.isDefined, "Expect resource offer to return a task") + + assert(offerResult1.get.index === 0) + assert(offerResult1.get.executorId === "exec1") + + val offerResult2 = manager.resourceOffer("exec2", "host2", ANY) + assert(offerResult2.isDefined, "Expect resource offer to return a task") + + assert(offerResult2.get.index === 1) + assert(offerResult2.get.executorId === "exec2") + // At this point, we have 2 tasks running and 2 pending. First fetch failure should + // abort all the pending tasks but the running tasks should not be aborted. + manager.handleFailedTask(offerResult1.get.taskId, TaskState.FINISHED, + FetchFailed(BlockManagerId("exec-host2", "host2", 12345), 0, 0, 0, "ignored")) + val dagScheduler = sched.dagScheduler.asInstanceOf[FakeDAGScheduler] + assert(dagScheduler.abortedPartitions.size === 2) + + dagScheduler.abortedPartitions.clear() + + val exceptionFailure = new ExceptionFailure(new SparkException("fondue?"), Seq.empty) + manager.handleFailedTask(offerResult1.get.taskId, TaskState.FINISHED, exceptionFailure) + + assert(dagScheduler.abortedPartitions.size === 1) + } + test("executors should be blacklisted after task failure, in spite of locality preferences") { val rescheduleDelay = 300L val conf = new SparkConf().