Skip to content

Commit 93a4852

Browse files
committed
conservatively only invalidate in standalone mode
1 parent 2430b69 commit 93a4852

File tree

4 files changed

+14
-5
lines changed

4 files changed

+14
-5
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1627,7 +1627,11 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
16271627
dagScheduler.handleExecutorAdded(execId, host)
16281628

16291629
case ExecutorLost(execId, reason) =>
1630-
dagScheduler.handleExecutorLost(execId, filesLost = reason.isInstanceOf[SlaveLost])
1630+
val filesLost = reason match {
1631+
case SlaveLost(_, true) => true
1632+
case _ => false
1633+
}
1634+
dagScheduler.handleExecutorLost(execId, filesLost)
16311635

16321636
case BeginEvent(task, taskInfo) =>
16331637
dagScheduler.handleBeginEvent(task, taskInfo)

core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ private[spark] object ExecutorKilled extends ExecutorLossReason("Executor killed
5151
*/
5252
private [spark] object LossReasonPending extends ExecutorLossReason("Pending loss reason.")
5353

54+
/**
55+
* @param _message human readable loss reason
56+
* @param workerLost whether the worker is confirmed lost too (i.e. including shuffle service)
57+
*/
5458
private[spark]
55-
case class SlaveLost(_message: String = "Slave lost")
59+
case class SlaveLost(_message: String = "Slave lost", workerLost: Boolean = false)
5660
extends ExecutorLossReason(_message)

core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ private[spark] class StandaloneSchedulerBackend(
153153
override def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]) {
154154
val reason: ExecutorLossReason = exitStatus match {
155155
case Some(code) => ExecutorExited(code, exitCausedByApp = true, message)
156-
case None => SlaveLost(message)
156+
case None => SlaveLost(message, workerLost = true /* worker loss event from master */)
157157
}
158158
logInfo("Executor %s removed: %s".format(fullId, message))
159159
removeExecutor(fullId.split("/")(1), reason)

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -627,8 +627,9 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
627627
}
628628

629629
private val shuffleFileLossTests = Seq(
630-
("slave lost with shuffle service", SlaveLost(), true, true),
631-
("slave lost without shuffle service", SlaveLost(), false, true),
630+
("slave lost with shuffle service", SlaveLost("", false), true, false),
631+
("worker lost with shuffle service", SlaveLost("", true), true, true),
632+
("worker lost without shuffle service", SlaveLost("", true), false, true),
632633
("executor failure with shuffle service", ExecutorKilled, true, false),
633634
("executor failure without shuffle service", ExecutorKilled, false, true))
634635

0 commit comments

Comments
 (0)