Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ private[deploy] object DeployMessages {
}

case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String],
exitStatus: Option[Int])
exitStatus: Option[Int], workerLost: Boolean)

case class ApplicationRemoved(message: String)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,12 @@ private[spark] class StandaloneAppClient(
cores))
listener.executorAdded(fullId, workerId, hostPort, cores, memory)

case ExecutorUpdated(id, state, message, exitStatus) =>
case ExecutorUpdated(id, state, message, exitStatus, workerLost) =>
val fullId = appId + "/" + id
val messageText = message.map(s => " (" + s + ")").getOrElse("")
logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText))
if (ExecutorState.isFinished(state)) {
listener.executorRemoved(fullId, message.getOrElse(""), exitStatus)
listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerLost)
}

case MasterChanged(masterRef, masterWebUiUrl) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,6 @@ private[spark] trait StandaloneAppClientListener {
def executorAdded(
fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int): Unit

def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]): Unit
def executorRemoved(
fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ private[deploy] class Master(
appInfo.resetRetryCount()
}

exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus))
exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, false))

if (ExecutorState.isFinished(state)) {
// Remove this executor from the worker and app
Expand Down Expand Up @@ -766,7 +766,7 @@ private[deploy] class Master(
for (exec <- worker.executors.values) {
logInfo("Telling app of lost executor: " + exec.id)
exec.application.driver.send(ExecutorUpdated(
exec.id, ExecutorState.LOST, Some("worker lost"), None))
exec.id, ExecutorState.LOST, Some("worker lost"), None, workerLost = true))
exec.state = ExecutorState.LOST
exec.application.removeExecutor(exec)
}
Expand Down
24 changes: 15 additions & 9 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,8 @@ class DAGScheduler(
/**
* Called by TaskScheduler implementation when an executor fails.
*/
def executorLost(execId: String): Unit = {
eventProcessLoop.post(ExecutorLost(execId))
def executorLost(execId: String, reason: ExecutorLossReason): Unit = {
eventProcessLoop.post(ExecutorLost(execId, reason))
}

/**
Expand Down Expand Up @@ -1281,7 +1281,7 @@ class DAGScheduler(

// TODO: mark the executor as failed only if there were lots of fetch failures on it
if (bmAddress != null) {
handleExecutorLost(bmAddress.executorId, fetchFailed = true, Some(task.epoch))
handleExecutorLost(bmAddress.executorId, filesLost = true, Some(task.epoch))
}
}

Expand All @@ -1306,23 +1306,25 @@ class DAGScheduler(
* modify the scheduler's internal state. Use executorLost() to post a loss event from outside.
*
* We will also assume that we've lost all shuffle blocks associated with the executor if the
* executor serves its own blocks (i.e., we're not using external shuffle) OR a FetchFailed
* occurred, in which case we presume all shuffle data related to this executor to be lost.
* executor serves its own blocks (i.e., we're not using external shuffle), the entire slave
* is lost (likely including the shuffle service), or a FetchFailed occurred, in which case we
* presume all shuffle data related to this executor to be lost.
*
* Optionally the epoch during which the failure was caught can be passed to avoid allowing
* stray fetch failures from possibly retriggering the detection of a node as lost.
*/
private[scheduler] def handleExecutorLost(
execId: String,
fetchFailed: Boolean,
filesLost: Boolean,
maybeEpoch: Option[Long] = None) {
val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {
failedEpoch(execId) = currentEpoch
logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
blockManagerMaster.removeExecutor(execId)

if (!env.blockManager.externalShuffleServiceEnabled || fetchFailed) {
if (filesLost || !env.blockManager.externalShuffleServiceEnabled) {
logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch))
// TODO: This will be really slow if we keep accumulating shuffle map stages
for ((shuffleId, stage) <- shuffleIdToMapStage) {
stage.removeOutputsOnExecutor(execId)
Expand Down Expand Up @@ -1624,8 +1626,12 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
case ExecutorAdded(execId, host) =>
dagScheduler.handleExecutorAdded(execId, host)

case ExecutorLost(execId) =>
dagScheduler.handleExecutorLost(execId, fetchFailed = false)
case ExecutorLost(execId, reason) =>
val filesLost = reason match {
case SlaveLost(_, true) => true
case _ => false
}
dagScheduler.handleExecutorLost(execId, filesLost)

case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ private[scheduler] case class CompletionEvent(

private[scheduler] case class ExecutorAdded(execId: String, host: String) extends DAGSchedulerEvent

private[scheduler] case class ExecutorLost(execId: String) extends DAGSchedulerEvent
private[scheduler] case class ExecutorLost(execId: String, reason: ExecutorLossReason)
extends DAGSchedulerEvent

private[scheduler]
case class TaskSetFailed(taskSet: TaskSet, reason: String, exception: Option[Throwable])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ private[spark] object ExecutorKilled extends ExecutorLossReason("Executor killed
*/
private [spark] object LossReasonPending extends ExecutorLossReason("Pending loss reason.")

/**
* @param _message human readable loss reason
* @param workerLost whether the worker is confirmed lost too (i.e. including shuffle service)
*/
private[spark]
case class SlaveLost(_message: String = "Slave lost")
case class SlaveLost(_message: String = "Slave lost", workerLost: Boolean = false)
extends ExecutorLossReason(_message)
Original file line number Diff line number Diff line change
Expand Up @@ -346,15 +346,17 @@ private[spark] class TaskSchedulerImpl(

def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
var failedExecutor: Option[String] = None
var reason: Option[ExecutorLossReason] = None
synchronized {
try {
if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
// We lost this entire executor, so remember that it's gone
val execId = taskIdToExecutorId(tid)

if (executorIdToTaskCount.contains(execId)) {
removeExecutor(execId,
reason = Some(
SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
removeExecutor(execId, reason.get)
failedExecutor = Some(execId)
}
}
Expand Down Expand Up @@ -387,7 +389,8 @@ private[spark] class TaskSchedulerImpl(
}
// Update the DAGScheduler without holding a lock on this, since that can deadlock
if (failedExecutor.isDefined) {
dagScheduler.executorLost(failedExecutor.get)
assert(reason.isDefined)
dagScheduler.executorLost(failedExecutor.get, reason.get)
backend.reviveOffers()
}
}
Expand Down Expand Up @@ -513,7 +516,7 @@ private[spark] class TaskSchedulerImpl(
}
// Call dagScheduler.executorLost without holding the lock on this to prevent deadlock
if (failedExecutor.isDefined) {
dagScheduler.executorLost(failedExecutor.get)
dagScheduler.executorLost(failedExecutor.get, reason)
backend.reviveOffers()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,11 @@ private[spark] class StandaloneSchedulerBackend(
fullId, hostPort, cores, Utils.megabytesToString(memory)))
}

override def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]) {
override def executorRemoved(
fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean) {
val reason: ExecutorLossReason = exitStatus match {
case Some(code) => ExecutorExited(code, exitCausedByApp = true, message)
case None => SlaveLost(message)
case None => SlaveLost(message, workerLost = workerLost)
}
logInfo("Executor %s removed: %s".format(fullId, message))
removeExecutor(fullId.split("/")(1), reason)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ class AppClientSuite
execAddedList.add(id)
}

def executorRemoved(id: String, message: String, exitStatus: Option[Int]): Unit = {
def executorRemoved(
id: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit = {
execRemovedList.add(id)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark._
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.shuffle.MetadataFetchFailedException
import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, LongAccumulator, Utils}

Expand Down Expand Up @@ -201,7 +202,11 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou

override def beforeEach(): Unit = {
super.beforeEach()
sc = new SparkContext("local", "DAGSchedulerSuite")
init(new SparkConf())
}

private def init(testConf: SparkConf): Unit = {
sc = new SparkContext("local", "DAGSchedulerSuite", testConf)
sparkListener.submittedStageInfos.clear()
sparkListener.successfulStages.clear()
sparkListener.failedStages.clear()
Expand Down Expand Up @@ -621,14 +626,53 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
assertDataStructuresEmpty()
}

private val shuffleFileLossTests = Seq(
("slave lost with shuffle service", SlaveLost("", false), true, false),
("worker lost with shuffle service", SlaveLost("", true), true, true),
("worker lost without shuffle service", SlaveLost("", true), false, true),
("executor failure with shuffle service", ExecutorKilled, true, false),
("executor failure without shuffle service", ExecutorKilled, false, true))

for ((eventDescription, event, shuffleServiceOn, expectFileLoss) <- shuffleFileLossTests) {
val maybeLost = if (expectFileLoss) {
"lost"
} else {
"not lost"
}
test(s"shuffle files $maybeLost when $eventDescription") {
// reset the test context with the right shuffle service config
afterEach()
val conf = new SparkConf()
conf.set("spark.shuffle.service.enabled", shuffleServiceOn.toString)
init(conf)
assert(sc.env.blockManager.externalShuffleServiceEnabled == shuffleServiceOn)

val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1))
val shuffleId = shuffleDep.shuffleId
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
submit(reduceRdd, Array(0))
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostB", 1))))
runEvent(ExecutorLost("exec-hostA", event))
if (expectFileLoss) {
intercept[MetadataFetchFailedException] {
mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0)
}
} else {
assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet ===
HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
}
}
}

// Helper function to validate state when creating tests for task failures
private def checkStageId(stageId: Int, attempt: Int, stageAttempt: TaskSet) {
assert(stageAttempt.stageId === stageId)
assert(stageAttempt.stageAttemptId == attempt)
}


// Helper functions to extract commonly used code in Fetch Failure test cases
private def setupStageAbortTest(sc: SparkContext) {
sc.listenerBus.addListener(new EndListener())
Expand Down Expand Up @@ -1110,7 +1154,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou

// pretend we were told hostA went away
val oldEpoch = mapOutputTracker.getEpoch
runEvent(ExecutorLost("exec-hostA"))
runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
val newEpoch = mapOutputTracker.getEpoch
assert(newEpoch > oldEpoch)

Expand Down Expand Up @@ -1241,7 +1285,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
))

// then one executor dies, and a task fails in stage 1
runEvent(ExecutorLost("exec-hostA"))
runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
runEvent(makeCompletionEvent(
taskSets(1).tasks(0),
FetchFailed(null, firstShuffleId, 2, 0, "Fetch failed"),
Expand Down Expand Up @@ -1339,7 +1383,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
makeMapStatus("hostA", reduceRdd.partitions.length)))

// now that host goes down
runEvent(ExecutorLost("exec-hostA"))
runEvent(ExecutorLost("exec-hostA", ExecutorKilled))

// so we resubmit those tasks
runEvent(makeCompletionEvent(taskSets(0).tasks(0), Resubmitted, null))
Expand Down Expand Up @@ -1532,7 +1576,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
submit(reduceRdd, Array(0))
// blockManagerMaster.removeExecutor("exec-hostA")
// pretend we were told hostA went away
runEvent(ExecutorLost("exec-hostA"))
runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
// DAGScheduler will immediately resubmit the stage after it appears to have no pending tasks
// rather than marking it is as failed and waiting.
complete(taskSets(0), Seq(
Expand Down Expand Up @@ -1999,7 +2043,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou

// Pretend host A was lost
val oldEpoch = mapOutputTracker.getEpoch
runEvent(ExecutorLost("exec-hostA"))
runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
val newEpoch = mapOutputTracker.getEpoch
assert(newEpoch > oldEpoch)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)

override def executorAdded(execId: String, host: String) {}

override def executorLost(execId: String) {}
override def executorLost(execId: String, reason: ExecutorLossReason) {}

override def taskSetFailed(
taskSet: TaskSet,
Expand Down