Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 67 additions & 33 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,34 @@ private[spark] class DAGScheduler(
*/
private val cacheLocs = new HashMap[Int, IndexedSeq[Seq[TaskLocation]]]

// For tracking failed nodes, we use the MapOutputTracker's epoch number, which is sent with
// every task. When we detect a node failing, we note the current epoch number and failed
// executor, increment it for new tasks, and use this to ignore stray ShuffleMapTask results.
//
// TODO: Garbage collect information about failure epochs when we know there are no more
// stray messages to detect.
private val failedEpoch = new HashMap[String, Long]
/**
* Tracks the latest epoch of a fully processed error related to the given executor. (We use
* the MapOutputTracker's epoch number, which is sent with every task.)
*
* When an executor fails, it can affect the results of many tasks, and we have to deal with
* all of them consistently. We don't simply ignore all future results from that executor,
* as the failures may have been transient; but we also don't want to "overreact" to follow-
* on errors we receive. Furthermore, we might receive notification of a task success, after
* we find out the executor has actually failed; we'll assume those successes are, in fact,
* simply delayed notifications and the results have been lost, if the tasks started in the
* same or an earlier epoch. In particular, we use this to control when we tell the
* BlockManagerMaster that the BlockManager has been lost.
*/
private val executorFailureEpoch = new HashMap[String, Long]

/**
* Tracks the latest epoch of a fully processed error where shuffle files have been lost from
* the given executor.
*
* This is closely related to executorFailureEpoch. They only differ for the executor when
* there is an external shuffle service serving shuffle files and we haven't been notified that
* the entire worker has been lost. In that case, when an executor is lost, we do not update
* the shuffleFileLostEpoch; we wait for a fetch failure. This way, if only the executor
* fails, we do not unregister the shuffle data as it can still be served; but if there is
* a failure in the shuffle service (resulting in fetch failure), we unregister the shuffle
* data only once, even if we get many fetch failures.
*/
private val shuffleFileLostEpoch = new HashMap[String, Long]

private [scheduler] val outputCommitCoordinator = env.outputCommitCoordinator

Expand Down Expand Up @@ -1389,7 +1410,8 @@ private[spark] class DAGScheduler(
val status = event.result.asInstanceOf[MapStatus]
val execId = status.location.executorId
logDebug("ShuffleMapTask finished on " + execId)
if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
if (executorFailureEpoch.contains(execId) &&
smt.epoch <= executorFailureEpoch(execId)) {
logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")
} else {
// The epoch of the task is acceptable (i.e., the task was launched after the most
Expand Down Expand Up @@ -1725,12 +1747,8 @@ private[spark] class DAGScheduler(
* modify the scheduler's internal state. Use executorLost() to post a loss event from outside.
*
* We will also assume that we've lost all shuffle blocks associated with the executor if the
* executor serves its own blocks (i.e., we're not using external shuffle), the entire slave
* is lost (likely including the shuffle service), or a FetchFailed occurred, in which case we
* presume all shuffle data related to this executor to be lost.
*
* Optionally the epoch during which the failure was caught can be passed to avoid allowing
* stray fetch failures from possibly retriggering the detection of a node as lost.
* executor serves its own blocks (i.e., we're not using an external shuffle service), or the
* entire Standalone worker is lost.
*/
private[scheduler] def handleExecutorLost(
execId: String,
Expand All @@ -1746,29 +1764,44 @@ private[spark] class DAGScheduler(
maybeEpoch = None)
}

/**
* Handles removing an executor from the BlockManagerMaster as well as unregistering shuffle
* outputs for the executor or optionally its host.
*
* @param execId executor to be removed
* @param fileLost If true, indicates that we assume we've lost all shuffle blocks associated
* with the executor; this happens if the executor serves its own blocks (i.e., we're not
* using an external shuffle service), the entire Standalone worker is lost, or a FetchFailed
* occurred (in which case we presume all shuffle data related to this executor to be lost).
* @param hostToUnregisterOutputs (optional) executor host if we're unregistering all the
* outputs on the host
* @param maybeEpoch (optional) the epoch during which the failure was caught (this prevents
* reprocessing for follow-on fetch failures)
*/
private def removeExecutorAndUnregisterOutputs(
execId: String,
fileLost: Boolean,
hostToUnregisterOutputs: Option[String],
maybeEpoch: Option[Long] = None): Unit = {
val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {
failedEpoch(execId) = currentEpoch
logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
logDebug(s"Considering removal of executor $execId; " +
s"fileLost: $fileLost, currentEpoch: $currentEpoch")
if (!executorFailureEpoch.contains(execId) || executorFailureEpoch(execId) < currentEpoch) {
executorFailureEpoch(execId) = currentEpoch
logInfo(s"Executor lost: $execId (epoch $currentEpoch)")
blockManagerMaster.removeExecutor(execId)
if (fileLost) {
hostToUnregisterOutputs match {
case Some(host) =>
logInfo("Shuffle files lost for host: %s (epoch %d)".format(host, currentEpoch))
mapOutputTracker.removeOutputsOnHost(host)
case None =>
logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch))
mapOutputTracker.removeOutputsOnExecutor(execId)
}
clearCacheLocs()

} else {
logDebug("Additional executor lost message for %s (epoch %d)".format(execId, currentEpoch))
clearCacheLocs()
}
if (fileLost &&
(!shuffleFileLostEpoch.contains(execId) || shuffleFileLostEpoch(execId) < currentEpoch)) {
shuffleFileLostEpoch(execId) = currentEpoch
hostToUnregisterOutputs match {
case Some(host) =>
logInfo(s"Shuffle files lost for host: $host (epoch $currentEpoch)")
mapOutputTracker.removeOutputsOnHost(host)
case None =>
logInfo(s"Shuffle files lost for executor: $execId (epoch $currentEpoch)")
mapOutputTracker.removeOutputsOnExecutor(execId)
}
}
}
Expand All @@ -1794,11 +1827,12 @@ private[spark] class DAGScheduler(
}

private[scheduler] def handleExecutorAdded(execId: String, host: String) {
// remove from failedEpoch(execId) ?
if (failedEpoch.contains(execId)) {
// remove from executorFailureEpoch(execId) ?
if (executorFailureEpoch.contains(execId)) {
logInfo("Host added was in lost list earlier: " + host)
failedEpoch -= execId
executorFailureEpoch -= execId
}
shuffleFileLostEpoch -= execId
}

private[scheduler] def handleStageCancellation(stageId: Int, reason: Option[String]) {
Expand Down
101 changes: 84 additions & 17 deletions core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
import scala.language.reflectiveCalls
import scala.util.control.NonFatal

import org.mockito.Mockito.spy
import org.mockito.Mockito.times
import org.mockito.Mockito.verify
import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
import org.scalatest.time.SpanSugar._

Expand Down Expand Up @@ -223,6 +226,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi

var sparkListener: EventInfoRecordingListener = null

var blockManagerMaster: BlockManagerMaster = null
var mapOutputTracker: MapOutputTrackerMaster = null
var broadcastManager: BroadcastManager = null
var securityMgr: SecurityManager = null
Expand All @@ -236,17 +240,18 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
*/
val cacheLocations = new HashMap[(Int, Int), Seq[BlockManagerId]]
// stub out BlockManagerMaster.getLocations to use our cacheLocations
val blockManagerMaster = new BlockManagerMaster(null, conf, true) {
override def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = {
blockIds.map {
_.asRDDId.map(id => (id.rddId -> id.splitIndex)).flatMap(key => cacheLocations.get(key)).
getOrElse(Seq())
}.toIndexedSeq
}
override def removeExecutor(execId: String) {
// don't need to propagate to the driver, which we don't have
}
class MyBlockManagerMaster(conf: SparkConf) extends BlockManagerMaster(null, conf, true) {
override def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = {
blockIds.map {
_.asRDDId.map { id => (id.rddId -> id.splitIndex)
}.flatMap { key => cacheLocations.get(key)
}.getOrElse(Seq())
}.toIndexedSeq
}
override def removeExecutor(execId: String): Unit = {
// don't need to propagate to the driver, which we don't have
}
}

/** The list of results that DAGScheduler has collected. */
val results = new HashMap[Int, Any]()
Expand All @@ -264,6 +269,16 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
override def jobFailed(exception: Exception): Unit = { failure = exception }
}

class MyMapOutputTrackerMaster(
conf: SparkConf,
broadcastManager: BroadcastManager)
extends MapOutputTrackerMaster(conf, broadcastManager, true) {

override def sendTracker(message: Any): Unit = {
// no-op, just so we can stop this to avoid leaking threads
}
}

override def beforeEach(): Unit = {
super.beforeEach()
init(new SparkConf())
Expand All @@ -280,11 +295,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
results.clear()
securityMgr = new SecurityManager(conf)
broadcastManager = new BroadcastManager(true, conf, securityMgr)
mapOutputTracker = new MapOutputTrackerMaster(conf, broadcastManager, true) {
override def sendTracker(message: Any): Unit = {
// no-op, just so we can stop this to avoid leaking threads
}
}
mapOutputTracker = spy(new MyMapOutputTrackerMaster(conf, broadcastManager))
blockManagerMaster = spy(new MyBlockManagerMaster(conf))
scheduler = new DAGScheduler(
sc,
taskScheduler,
Expand Down Expand Up @@ -520,6 +532,59 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
assert(mapStatus2(2).location.host === "hostB")
}

test("SPARK-32003: All shuffle files for executor should be cleaned up on fetch failure") {
// reset the test context with the right shuffle service config
afterEach()
val conf = new SparkConf()
conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
init(conf)

val shuffleMapRdd = new MyRDD(sc, 3, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(3))
val shuffleId = shuffleDep.shuffleId
val reduceRdd = new MyRDD(sc, 3, List(shuffleDep), tracker = mapOutputTracker)

submit(reduceRdd, Array(0, 1, 2))
// Map stage completes successfully,
// two tasks are run on an executor on hostA and one on an executor on hostB
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 3)),
(Success, makeMapStatus("hostA", 3)),
(Success, makeMapStatus("hostB", 3))))
// Now the executor on hostA is lost
runEvent(ExecutorLost("exec-hostA", ExecutorExited(-100, false, "Container marked as failed")))
// Executor is removed but shuffle files are not unregistered
verify(blockManagerMaster, times(1)).removeExecutor("exec-hostA")
verify(mapOutputTracker, times(0)).removeOutputsOnExecutor("exec-hostA")

// The MapOutputTracker has all the shuffle files
val mapStatuses = mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses
assert(mapStatuses.count(_ != null) === 3)
assert(mapStatuses.count(s => s != null && s.location.executorId == "exec-hostA") === 2)
assert(mapStatuses.count(s => s != null && s.location.executorId == "exec-hostB") === 1)

// Now a fetch failure from the lost executor occurs
complete(taskSets(1), Seq(
(FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), null)
))
// blockManagerMaster.removeExecutor is not called again
// but shuffle files are unregistered
verify(blockManagerMaster, times(1)).removeExecutor("exec-hostA")
verify(mapOutputTracker, times(1)).removeOutputsOnExecutor("exec-hostA")

// Shuffle files for exec-hostA should be lost
assert(mapStatuses.count(_ != null) === 1)
assert(mapStatuses.count(s => s != null && s.location.executorId == "exec-hostA") === 0)
assert(mapStatuses.count(s => s != null && s.location.executorId == "exec-hostB") === 1)

// Additional fetch failure from the executor does not result in further call to
// mapOutputTracker.removeOutputsOnExecutor
complete(taskSets(1), Seq(
(FetchFailed(makeBlockManagerId("hostA"), shuffleId, 1, 0, "ignored"), null)
))
verify(mapOutputTracker, times(1)).removeOutputsOnExecutor("exec-hostA")
}

test("zero split job") {
var numResults = 0
var failureReason: Option[Exception] = None
Expand Down Expand Up @@ -741,8 +806,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
complete(taskSets(1), Seq(
(Success, 42),
(FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), null)))
// this will get called
// blockManagerMaster.removeExecutor("exec-hostA")
verify(blockManagerMaster, times(1)).removeExecutor("exec-hostA")
// ask the scheduler to try it again
scheduler.resubmitFailedStages()
// have the 2nd attempt pass
Expand Down Expand Up @@ -785,11 +849,14 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostB", 1))))
runEvent(ExecutorLost("exec-hostA", event))
verify(blockManagerMaster, times(1)).removeExecutor("exec-hostA")
if (expectFileLoss) {
verify(mapOutputTracker, times(1)).removeOutputsOnExecutor("exec-hostA")
intercept[MetadataFetchFailedException] {
mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0)
}
} else {
verify(mapOutputTracker, times(0)).removeOutputsOnExecutor("exec-hostA")
assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet ===
HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
}
Expand Down