diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 09717316833a7..8d2bc8e20c0af 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -172,7 +172,8 @@ class DAGScheduler( // For tracking failed nodes, we use the MapOutputTracker's epoch number, which is sent with // every task. When we detect a node failing, we note the current epoch number and failed - // executor, increment it for new tasks, and use this to ignore stray ShuffleMapTask results. + // executor or host, increment it for new tasks, and use this to ignore stray + // ShuffleMapTask results. // // TODO: Garbage collect information about failure epochs when we know there are no more // stray messages to detect. @@ -1348,7 +1349,14 @@ class DAGScheduler( // TODO: mark the executor as failed only if there were lots of fetch failures on it if (bmAddress != null) { - handleExecutorLost(bmAddress.executorId, filesLost = true, Some(task.epoch)) + if (env.blockManager.externalShuffleServiceEnabled) { + val currentEpoch = Some(task.epoch).getOrElse(mapOutputTracker.getEpoch) + removeExecutor(bmAddress.executorId, currentEpoch) + handleExternalShuffleFailure(bmAddress.host, currentEpoch) + } + else { + handleExecutorLost(bmAddress.executorId, filesLost = true, Some(task.epoch)) + } } } @@ -1368,6 +1376,30 @@ class DAGScheduler( } } + /** + * Removes an executor from the driver endpoint. + * + * @param execId id of the executor to be removed + * @param currentEpoch epoch during which the executor failure was caught to avoid allowing + * stray failures from possibly retriggering the detection of an + * executor as lost. + * + * @return boolean value indicating whether the executor was removed or not + */ + private[scheduler] def removeExecutor(execId: String, currentEpoch: Long): Boolean = { + if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) { + failedEpoch(execId) = currentEpoch + logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch)) + blockManagerMaster.removeExecutor(execId) + true + } + else { + logDebug("Additional executor lost message for " + execId + + "(epoch " + currentEpoch + ")") + false + } + } + /** * Responds to an executor being lost. This is called inside the event loop, so it assumes it can * modify the scheduler's internal state. Use executorLost() to post a loss event from outside. @@ -1385,38 +1417,76 @@ class DAGScheduler( filesLost: Boolean, maybeEpoch: Option[Long] = None) { val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch) - if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) { - failedEpoch(execId) = currentEpoch - logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch)) - blockManagerMaster.removeExecutor(execId) + val executorRemoved = removeExecutor(execId, currentEpoch) + if (executorRemoved && (filesLost || !env.blockManager.externalShuffleServiceEnabled)) { + handleInternalShuffleFailure(execId, currentEpoch) + } + } - if (filesLost || !env.blockManager.externalShuffleServiceEnabled) { - logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch)) - // TODO: This will be really slow if we keep accumulating shuffle map stages - for ((shuffleId, stage) <- shuffleIdToMapStage) { - stage.removeOutputsOnExecutor(execId) - mapOutputTracker.registerMapOutputs( - shuffleId, - stage.outputLocInMapOutputTrackerFormat(), - changeEpoch = true) - } - if (shuffleIdToMapStage.isEmpty) { - mapOutputTracker.incrementEpoch() - } - clearCacheLocs() - } + /** + * Responds to an internal shuffle becoming unavailable for an executor. + * + * We will assume that we've lost all the shuffle blocks for the executor. + * + * @param execId id of the executor for which internal shuffle is unavailable + * @param currentEpoch epoch during which the failure was caught. + */ + private[scheduler] def handleInternalShuffleFailure(execId: String, currentEpoch: Long): Unit = { + logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch)) + cleanShuffleOutputs((stage: ShuffleMapStage) => { + stage.removeOutputsOnExecutor(execId) + }) + } + + /** + * Responds to an external shuffle service becoming unavailable on a host. + * + * We will assume that we've lost all the shuffle blocks on that host if FetchFailed occurred + * while external shuffle is being used. + * + * @param host address of the host on which external shuffle is unavailable + * @param currentEpoch epoch during which the failure was caught. This is passed to avoid + * allowing stray fetch failures from possibly retriggering the detection + * of external shuffle service becoming unavailable. + */ + private[scheduler] def handleExternalShuffleFailure(host: String, currentEpoch: Long): Unit = { + if (!failedEpoch.contains(host) || failedEpoch(host) < currentEpoch) { + failedEpoch(host) = currentEpoch + logInfo("Shuffle files lost for host: %s (epoch %d)".format(host, currentEpoch)) + cleanShuffleOutputs((stage: ShuffleMapStage) => { + stage.removeOutputsOnHost(host) + }) } else { - logDebug("Additional executor lost message for " + execId + - "(epoch " + currentEpoch + ")") + logDebug(("Additional Shuffle files " + + "lost message for host: %s (epoch %d)").format(host, currentEpoch)) + } + } + + private[scheduler] def cleanShuffleOutputs(outputsCleaner: ShuffleMapStage => _): Unit = { + // TODO: This will be really slow if we keep accumulating shuffle map stages + for ((shuffleId, stage) <- shuffleIdToMapStage) { + outputsCleaner(stage) + mapOutputTracker.registerMapOutputs( + shuffleId, + stage.outputLocInMapOutputTrackerFormat(), + changeEpoch = true) + } + if (shuffleIdToMapStage.isEmpty) { + mapOutputTracker.incrementEpoch() } + clearCacheLocs() } private[scheduler] def handleExecutorAdded(execId: String, host: String) { // remove from failedEpoch(execId) ? if (failedEpoch.contains(execId)) { - logInfo("Host added was in lost list earlier: " + host) + logInfo("Executor %s added was in lost list earlier.".format(execId)) failedEpoch -= execId } + + if (failedEpoch.contains(host)) { + failedEpoch -= host + } } private[scheduler] def handleStageCancellation(stageId: Int, reason: Option[String]) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala index db4d9efa2270c..0be268d3582fa 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala @@ -132,25 +132,45 @@ private[spark] class ShuffleMapStage( outputLocs.map(_.headOption.orNull) } - /** - * Removes all shuffle outputs associated with this executor. Note that this will also remove - * outputs which are served by an external shuffle server (if one exists), as they are still - * registered with this execId. - */ - def removeOutputsOnExecutor(execId: String): Unit = { + private def removeOutputsHelper(locationChecker: BlockManagerId => Boolean): Boolean = { var becameUnavailable = false for (partition <- 0 until numPartitions) { val prevList = outputLocs(partition) - val newList = prevList.filterNot(_.location.executorId == execId) + val newList = prevList.filterNot(status => locationChecker(status.location)) outputLocs(partition) = newList if (prevList != Nil && newList == Nil) { becameUnavailable = true _numAvailableOutputs -= 1 } } + becameUnavailable + } + + /** + * Removes all shuffle outputs associated with this executor. Note that this will also remove + * outputs which are served by an external shuffle server (if one exists), as they are still + * registered with this execId. + */ + def removeOutputsOnExecutor(execId: String): Unit = { + val becameUnavailable = removeOutputsHelper( + (blockManagerId: BlockManagerId) => { blockManagerId.executorId == execId }) + if (becameUnavailable) { logInfo("%s is now unavailable on executor %s (%d/%d, %s)".format( this, execId, _numAvailableOutputs, numPartitions, isAvailable)) } } + + /** + * Removes all shuffle outputs associated with the external shuffle service on this host. + */ + def removeOutputsOnHost(host: String): Unit = { + val becameUnavailable = removeOutputsHelper( + (blockManagerId: BlockManagerId) => { blockManagerId.host == host }) + + if (becameUnavailable) { + logInfo("%s is now unavailable on host %s (%d/%d, %s)".format( + this, host, _numAvailableOutputs, numPartitions, isAvailable)) + } + } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index a10941b579fe2..ce7e813eb5136 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -674,6 +674,41 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou } } + private val shuffleFetchFailureTests = Seq( + ("fetch failure with external shuffle service enabled", true, Set(0, 1, 2, 4)), + ("fetch failure with internal shuffle service enabled", false, Set(0, 1))) + + for((eventDescription, shuffleServiceOn, expectedPartitionsLost) + <- shuffleFetchFailureTests) { + test(eventDescription) { + afterEach() + val conf = new SparkConf() + conf.set("spark.shuffle.service.enabled", shuffleServiceOn.toString) + init(conf) + assert(sc.env.blockManager.externalShuffleServiceEnabled == shuffleServiceOn) + + val shuffleMapRdd = new MyRDD(sc, 5, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1)) + val shuffleId = shuffleDep.shuffleId + val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker) + submit(reduceRdd, Array(0, 1)) + + complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", reduceRdd.partitions.length, 5, Some("exec-hostA-1"))), + (Success, makeMapStatus("hostA", reduceRdd.partitions.length, 5, Some("exec-hostA-1"))), + (Success, makeMapStatus("hostA", reduceRdd.partitions.length, 5, Some("exec-hostA-2"))), + (Success, makeMapStatus("hostB", reduceRdd.partitions.length, 5, Some("exec-hostB-1"))), + (Success, makeMapStatus("hostA", reduceRdd.partitions.length, 5, Some("exec-hostA-2"))))) + + complete(taskSets(1), Seq( + (Success, 42), + (FetchFailed(makeBlockManagerId("hostA", Some("exec-hostA-1")), + shuffleId, 0, 0, "ignored"), null))) + scheduler.resubmitFailedStages() + assert(taskSets(2).tasks.map(_.partitionId).toSet === expectedPartitionsLost) + } + } + // Helper function to validate state when creating tests for task failures private def checkStageId(stageId: Int, attempt: Int, stageAttempt: TaskSet) { assert(stageAttempt.stageId === stageId) @@ -2330,9 +2365,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou } object DAGSchedulerSuite { - def makeMapStatus(host: String, reduces: Int, sizes: Byte = 2): MapStatus = - MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(sizes)) + def makeMapStatus(host: String, reduces: Int, sizes: Byte = 2, + execId: Option[String] = None): MapStatus = + MapStatus(makeBlockManagerId(host, execId), Array.fill[Long](reduces)(sizes)) - def makeBlockManagerId(host: String): BlockManagerId = - BlockManagerId("exec-" + host, host, 12345) + def makeBlockManagerId(host: String, execId: Option[String] = None): BlockManagerId = + BlockManagerId(execId.getOrElse("exec-" + host), host, 12345) }