diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index cb024d0852d0..73c95d19387c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -170,13 +170,34 @@ private[spark] class DAGScheduler( */ private val cacheLocs = new HashMap[Int, IndexedSeq[Seq[TaskLocation]]] - // For tracking failed nodes, we use the MapOutputTracker's epoch number, which is sent with - // every task. When we detect a node failing, we note the current epoch number and failed - // executor, increment it for new tasks, and use this to ignore stray ShuffleMapTask results. - // - // TODO: Garbage collect information about failure epochs when we know there are no more - // stray messages to detect. - private val failedEpoch = new HashMap[String, Long] + /** + * Tracks the latest epoch of a fully processed error related to the given executor. (We use + * the MapOutputTracker's epoch number, which is sent with every task.) + * + * When an executor fails, it can affect the results of many tasks, and we have to deal with + * all of them consistently. We don't simply ignore all future results from that executor, + * as the failures may have been transient; but we also don't want to "overreact" to follow- + * on errors we receive. Furthermore, we might receive notification of a task success, after + * we find out the executor has actually failed; we'll assume those successes are, in fact, + * simply delayed notifications and the results have been lost, if the tasks started in the + * same or an earlier epoch. In particular, we use this to control when we tell the + * BlockManagerMaster that the BlockManager has been lost. + */ + private val executorFailureEpoch = new HashMap[String, Long] + + /** + * Tracks the latest epoch of a fully processed error where shuffle files have been lost from + * the given executor. + * + * This is closely related to executorFailureEpoch. They only differ for the executor when + * there is an external shuffle service serving shuffle files and we haven't been notified that + * the entire worker has been lost. In that case, when an executor is lost, we do not update + * the shuffleFileLostEpoch; we wait for a fetch failure. This way, if only the executor + * fails, we do not unregister the shuffle data as it can still be served; but if there is + * a failure in the shuffle service (resulting in fetch failure), we unregister the shuffle + * data only once, even if we get many fetch failures. + */ + private val shuffleFileLostEpoch = new HashMap[String, Long] private [scheduler] val outputCommitCoordinator = env.outputCommitCoordinator @@ -1566,7 +1587,8 @@ private[spark] class DAGScheduler( val status = event.result.asInstanceOf[MapStatus] val execId = status.location.executorId logDebug("ShuffleMapTask finished on " + execId) - if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { + if (executorFailureEpoch.contains(execId) && + smt.epoch <= executorFailureEpoch(execId)) { logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") } else { // The epoch of the task is acceptable (i.e., the task was launched after the most @@ -1912,12 +1934,8 @@ private[spark] class DAGScheduler( * modify the scheduler's internal state. Use executorLost() to post a loss event from outside. * * We will also assume that we've lost all shuffle blocks associated with the executor if the - * executor serves its own blocks (i.e., we're not using external shuffle), the entire executor - * process is lost (likely including the shuffle service), or a FetchFailed occurred, in which - * case we presume all shuffle data related to this executor to be lost. - * - * Optionally the epoch during which the failure was caught can be passed to avoid allowing - * stray fetch failures from possibly retriggering the detection of a node as lost. + * executor serves its own blocks (i.e., we're not using an external shuffle service), or the + * entire Standalone worker is lost. */ private[scheduler] def handleExecutorLost( execId: String, @@ -1933,29 +1951,44 @@ private[spark] class DAGScheduler( maybeEpoch = None) } + /** + * Handles removing an executor from the BlockManagerMaster as well as unregistering shuffle + * outputs for the executor or optionally its host. + * + * @param execId executor to be removed + * @param fileLost If true, indicates that we assume we've lost all shuffle blocks associated + * with the executor; this happens if the executor serves its own blocks (i.e., we're not + * using an external shuffle service), the entire Standalone worker is lost, or a FetchFailed + * occurred (in which case we presume all shuffle data related to this executor to be lost). + * @param hostToUnregisterOutputs (optional) executor host if we're unregistering all the + * outputs on the host + * @param maybeEpoch (optional) the epoch during which the failure was caught (this prevents + * reprocessing for follow-on fetch failures) + */ private def removeExecutorAndUnregisterOutputs( execId: String, fileLost: Boolean, hostToUnregisterOutputs: Option[String], maybeEpoch: Option[Long] = None): Unit = { val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch) - if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) { - failedEpoch(execId) = currentEpoch - logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch)) + logDebug(s"Considering removal of executor $execId; " + + s"fileLost: $fileLost, currentEpoch: $currentEpoch") + if (!executorFailureEpoch.contains(execId) || executorFailureEpoch(execId) < currentEpoch) { + executorFailureEpoch(execId) = currentEpoch + logInfo(s"Executor lost: $execId (epoch $currentEpoch)") blockManagerMaster.removeExecutor(execId) - if (fileLost) { - hostToUnregisterOutputs match { - case Some(host) => - logInfo("Shuffle files lost for host: %s (epoch %d)".format(host, currentEpoch)) - mapOutputTracker.removeOutputsOnHost(host) - case None => - logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch)) - mapOutputTracker.removeOutputsOnExecutor(execId) - } - clearCacheLocs() - - } else { - logDebug("Additional executor lost message for %s (epoch %d)".format(execId, currentEpoch)) + clearCacheLocs() + } + if (fileLost && + (!shuffleFileLostEpoch.contains(execId) || shuffleFileLostEpoch(execId) < currentEpoch)) { + shuffleFileLostEpoch(execId) = currentEpoch + hostToUnregisterOutputs match { + case Some(host) => + logInfo(s"Shuffle files lost for host: $host (epoch $currentEpoch)") + mapOutputTracker.removeOutputsOnHost(host) + case None => + logInfo(s"Shuffle files lost for executor: $execId (epoch $currentEpoch)") + mapOutputTracker.removeOutputsOnExecutor(execId) } } } @@ -1981,11 +2014,12 @@ private[spark] class DAGScheduler( } private[scheduler] def handleExecutorAdded(execId: String, host: String): Unit = { - // remove from failedEpoch(execId) ? - if (failedEpoch.contains(execId)) { + // remove from executorFailureEpoch(execId) ? + if (executorFailureEpoch.contains(execId)) { logInfo("Host added was in lost list earlier: " + host) - failedEpoch -= execId + executorFailureEpoch -= execId } + shuffleFileLostEpoch -= execId } private[scheduler] def handleStageCancellation(stageId: Int, reason: Option[String]): Unit = { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 7013832757e3..664cfc88cc41 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -25,6 +25,9 @@ import scala.annotation.meta.param import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map} import scala.util.control.NonFatal +import org.mockito.Mockito.spy +import org.mockito.Mockito.times +import org.mockito.Mockito.verify import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} import org.scalatest.exceptions.TestFailedException import org.scalatest.time.SpanSugar._ @@ -235,6 +238,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi var sparkListener: EventInfoRecordingListener = null + var blockManagerMaster: BlockManagerMaster = null var mapOutputTracker: MapOutputTrackerMaster = null var broadcastManager: BroadcastManager = null var securityMgr: SecurityManager = null @@ -248,17 +252,18 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi */ val cacheLocations = new HashMap[(Int, Int), Seq[BlockManagerId]] // stub out BlockManagerMaster.getLocations to use our cacheLocations - val blockManagerMaster = new BlockManagerMaster(null, null, conf, true) { - override def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = { - blockIds.map { - _.asRDDId.map(id => (id.rddId -> id.splitIndex)).flatMap(key => cacheLocations.get(key)). - getOrElse(Seq()) - }.toIndexedSeq - } - override def removeExecutor(execId: String): Unit = { - // don't need to propagate to the driver, which we don't have - } + class MyBlockManagerMaster(conf: SparkConf) extends BlockManagerMaster(null, null, conf, true) { + override def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = { + blockIds.map { + _.asRDDId.map { id => (id.rddId -> id.splitIndex) + }.flatMap { key => cacheLocations.get(key) + }.getOrElse(Seq()) + }.toIndexedSeq } + override def removeExecutor(execId: String): Unit = { + // don't need to propagate to the driver, which we don't have + } + } /** The list of results that DAGScheduler has collected. */ val results = new HashMap[Int, Any]() @@ -276,6 +281,16 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi override def jobFailed(exception: Exception): Unit = { failure = exception } } + class MyMapOutputTrackerMaster( + conf: SparkConf, + broadcastManager: BroadcastManager) + extends MapOutputTrackerMaster(conf, broadcastManager, true) { + + override def sendTracker(message: Any): Unit = { + // no-op, just so we can stop this to avoid leaking threads + } + } + override def beforeEach(): Unit = { super.beforeEach() init(new SparkConf()) @@ -293,11 +308,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi results.clear() securityMgr = new SecurityManager(conf) broadcastManager = new BroadcastManager(true, conf, securityMgr) - mapOutputTracker = new MapOutputTrackerMaster(conf, broadcastManager, true) { - override def sendTracker(message: Any): Unit = { - // no-op, just so we can stop this to avoid leaking threads - } - } + mapOutputTracker = spy(new MyMapOutputTrackerMaster(conf, broadcastManager)) + blockManagerMaster = spy(new MyBlockManagerMaster(conf)) scheduler = new DAGScheduler( sc, taskScheduler, @@ -548,6 +560,56 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assert(mapStatus2(2).location.host === "hostB") } + test("SPARK-32003: All shuffle files for executor should be cleaned up on fetch failure") { + // reset the test context with the right shuffle service config + afterEach() + val conf = new SparkConf() + conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true") + init(conf) + + val shuffleMapRdd = new MyRDD(sc, 3, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(3)) + val shuffleId = shuffleDep.shuffleId + val reduceRdd = new MyRDD(sc, 3, List(shuffleDep), tracker = mapOutputTracker) + + submit(reduceRdd, Array(0, 1, 2)) + // Map stage completes successfully, + // two tasks are run on an executor on hostA and one on an executor on hostB + completeShuffleMapStageSuccessfully(0, 0, 3, Seq("hostA", "hostA", "hostB")) + // Now the executor on hostA is lost + runEvent(ExecutorLost("hostA-exec", ExecutorExited(-100, false, "Container marked as failed"))) + // Executor is removed but shuffle files are not unregistered + verify(blockManagerMaster, times(1)).removeExecutor("hostA-exec") + verify(mapOutputTracker, times(0)).removeOutputsOnExecutor("hostA-exec") + + // The MapOutputTracker has all the shuffle files + val mapStatuses = mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses + assert(mapStatuses.count(_ != null) === 3) + assert(mapStatuses.count(s => s != null && s.location.executorId == "hostA-exec") === 2) + assert(mapStatuses.count(s => s != null && s.location.executorId == "hostB-exec") === 1) + + // Now a fetch failure from the lost executor occurs + complete(taskSets(1), Seq( + (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0L, 0, 0, "ignored"), null) + )) + // blockManagerMaster.removeExecutor is not called again + // but shuffle files are unregistered + verify(blockManagerMaster, times(1)).removeExecutor("hostA-exec") + verify(mapOutputTracker, times(1)).removeOutputsOnExecutor("hostA-exec") + + // Shuffle files for hostA-exec should be lost + assert(mapStatuses.count(_ != null) === 1) + assert(mapStatuses.count(s => s != null && s.location.executorId == "hostA-exec") === 0) + assert(mapStatuses.count(s => s != null && s.location.executorId == "hostB-exec") === 1) + + // Additional fetch failure from the executor does not result in further call to + // mapOutputTracker.removeOutputsOnExecutor + complete(taskSets(1), Seq( + (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0L, 1, 0, "ignored"), null) + )) + verify(mapOutputTracker, times(1)).removeOutputsOnExecutor("hostA-exec") + } + test("zero split job") { var numResults = 0 var failureReason: Option[Exception] = None @@ -765,8 +827,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi complete(taskSets(1), Seq( (Success, 42), (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0L, 0, 0, "ignored"), null))) - // this will get called - // blockManagerMaster.removeExecutor("hostA-exec") + verify(blockManagerMaster, times(1)).removeExecutor("hostA-exec") // ask the scheduler to try it again scheduler.resubmitFailedStages() // have the 2nd attempt pass @@ -806,11 +867,14 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi submit(reduceRdd, Array(0)) completeShuffleMapStageSuccessfully(0, 0, 1) runEvent(ExecutorLost("hostA-exec", event)) + verify(blockManagerMaster, times(1)).removeExecutor("hostA-exec") if (expectFileLoss) { + verify(mapOutputTracker, times(1)).removeOutputsOnExecutor("hostA-exec") intercept[MetadataFetchFailedException] { mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0) } } else { + verify(mapOutputTracker, times(0)).removeOutputsOnExecutor("hostA-exec") assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) }