Skip to content

Commit 078ac0e

Browse files
ericlJoshRosen
authored andcommitted
[SPARK-17370] Shuffle service files not invalidated when a slave is lost
## What changes were proposed in this pull request? DAGScheduler invalidates shuffle files when an executor loss event occurs, but not when the external shuffle service is enabled. This is because when shuffle service is on, the shuffle file lifetime can exceed the executor lifetime. However, it also doesn't invalidate shuffle files when the shuffle service itself is lost (due to whole slave loss). This can cause long hangs when slaves are lost since the file loss is not detected until a subsequent stage attempts to read the shuffle files. The proposed fix is to also invalidate shuffle files when an executor is lost due to a `SlaveLost` event. ## How was this patch tested? Unit tests, also verified on an actual cluster that slave loss invalidates shuffle files immediately as expected. cc mateiz Author: Eric Liang <[email protected]> Closes #14931 from ericl/sc-4439. (cherry picked from commit 649fa4b) Signed-off-by: Josh Rosen <[email protected]>
1 parent e6caceb commit 078ac0e

File tree

12 files changed

+92
-31
lines changed

12 files changed

+92
-31
lines changed

core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ private[deploy] object DeployMessages {
135135
}
136136

137137
case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String],
138-
exitStatus: Option[Int])
138+
exitStatus: Option[Int], workerLost: Boolean)
139139

140140
case class ApplicationRemoved(message: String)
141141

core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,12 +177,12 @@ private[spark] class StandaloneAppClient(
177177
cores))
178178
listener.executorAdded(fullId, workerId, hostPort, cores, memory)
179179

180-
case ExecutorUpdated(id, state, message, exitStatus) =>
180+
case ExecutorUpdated(id, state, message, exitStatus, workerLost) =>
181181
val fullId = appId + "/" + id
182182
val messageText = message.map(s => " (" + s + ")").getOrElse("")
183183
logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText))
184184
if (ExecutorState.isFinished(state)) {
185-
listener.executorRemoved(fullId, message.getOrElse(""), exitStatus)
185+
listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerLost)
186186
}
187187

188188
case MasterChanged(masterRef, masterWebUiUrl) =>

core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,5 +36,6 @@ private[spark] trait StandaloneAppClientListener {
3636
def executorAdded(
3737
fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int): Unit
3838

39-
def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]): Unit
39+
def executorRemoved(
40+
fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit
4041
}

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ private[deploy] class Master(
252252
appInfo.resetRetryCount()
253253
}
254254

255-
exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus))
255+
exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, false))
256256

257257
if (ExecutorState.isFinished(state)) {
258258
// Remove this executor from the worker and app
@@ -766,7 +766,7 @@ private[deploy] class Master(
766766
for (exec <- worker.executors.values) {
767767
logInfo("Telling app of lost executor: " + exec.id)
768768
exec.application.driver.send(ExecutorUpdated(
769-
exec.id, ExecutorState.LOST, Some("worker lost"), None))
769+
exec.id, ExecutorState.LOST, Some("worker lost"), None, workerLost = true))
770770
exec.state = ExecutorState.LOST
771771
exec.application.removeExecutor(exec)
772772
}

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -233,8 +233,8 @@ class DAGScheduler(
233233
/**
234234
* Called by TaskScheduler implementation when an executor fails.
235235
*/
236-
def executorLost(execId: String): Unit = {
237-
eventProcessLoop.post(ExecutorLost(execId))
236+
def executorLost(execId: String, reason: ExecutorLossReason): Unit = {
237+
eventProcessLoop.post(ExecutorLost(execId, reason))
238238
}
239239

240240
/**
@@ -1297,7 +1297,7 @@ class DAGScheduler(
12971297

12981298
// TODO: mark the executor as failed only if there were lots of fetch failures on it
12991299
if (bmAddress != null) {
1300-
handleExecutorLost(bmAddress.executorId, fetchFailed = true, Some(task.epoch))
1300+
handleExecutorLost(bmAddress.executorId, filesLost = true, Some(task.epoch))
13011301
}
13021302
}
13031303

@@ -1323,23 +1323,25 @@ class DAGScheduler(
13231323
* modify the scheduler's internal state. Use executorLost() to post a loss event from outside.
13241324
*
13251325
* We will also assume that we've lost all shuffle blocks associated with the executor if the
1326-
* executor serves its own blocks (i.e., we're not using external shuffle) OR a FetchFailed
1327-
* occurred, in which case we presume all shuffle data related to this executor to be lost.
1326+
* executor serves its own blocks (i.e., we're not using external shuffle), the entire slave
1327+
* is lost (likely including the shuffle service), or a FetchFailed occurred, in which case we
1328+
* presume all shuffle data related to this executor to be lost.
13281329
*
13291330
* Optionally the epoch during which the failure was caught can be passed to avoid allowing
13301331
* stray fetch failures from possibly retriggering the detection of a node as lost.
13311332
*/
13321333
private[scheduler] def handleExecutorLost(
13331334
execId: String,
1334-
fetchFailed: Boolean,
1335+
filesLost: Boolean,
13351336
maybeEpoch: Option[Long] = None) {
13361337
val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
13371338
if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {
13381339
failedEpoch(execId) = currentEpoch
13391340
logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
13401341
blockManagerMaster.removeExecutor(execId)
13411342

1342-
if (!env.blockManager.externalShuffleServiceEnabled || fetchFailed) {
1343+
if (filesLost || !env.blockManager.externalShuffleServiceEnabled) {
1344+
logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch))
13431345
// TODO: This will be really slow if we keep accumulating shuffle map stages
13441346
for ((shuffleId, stage) <- shuffleToMapStage) {
13451347
stage.removeOutputsOnExecutor(execId)
@@ -1643,8 +1645,12 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
16431645
case ExecutorAdded(execId, host) =>
16441646
dagScheduler.handleExecutorAdded(execId, host)
16451647

1646-
case ExecutorLost(execId) =>
1647-
dagScheduler.handleExecutorLost(execId, fetchFailed = false)
1648+
case ExecutorLost(execId, reason) =>
1649+
val filesLost = reason match {
1650+
case SlaveLost(_, true) => true
1651+
case _ => false
1652+
}
1653+
dagScheduler.handleExecutorLost(execId, filesLost)
16481654

16491655
case BeginEvent(task, taskInfo) =>
16501656
dagScheduler.handleBeginEvent(task, taskInfo)

core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ private[scheduler] case class CompletionEvent(
7777

7878
private[scheduler] case class ExecutorAdded(execId: String, host: String) extends DAGSchedulerEvent
7979

80-
private[scheduler] case class ExecutorLost(execId: String) extends DAGSchedulerEvent
80+
private[scheduler] case class ExecutorLost(execId: String, reason: ExecutorLossReason)
81+
extends DAGSchedulerEvent
8182

8283
private[scheduler]
8384
case class TaskSetFailed(taskSet: TaskSet, reason: String, exception: Option[Throwable])

core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ private[spark] object ExecutorKilled extends ExecutorLossReason("Executor killed
5151
*/
5252
private [spark] object LossReasonPending extends ExecutorLossReason("Pending loss reason.")
5353

54+
/**
55+
* @param _message human readable loss reason
56+
* @param workerLost whether the worker is confirmed lost too (i.e. including shuffle service)
57+
*/
5458
private[spark]
55-
case class SlaveLost(_message: String = "Slave lost")
59+
case class SlaveLost(_message: String = "Slave lost", workerLost: Boolean = false)
5660
extends ExecutorLossReason(_message)

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -332,15 +332,17 @@ private[spark] class TaskSchedulerImpl(
332332

333333
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
334334
var failedExecutor: Option[String] = None
335+
var reason: Option[ExecutorLossReason] = None
335336
synchronized {
336337
try {
337338
if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
338339
// We lost this entire executor, so remember that it's gone
339340
val execId = taskIdToExecutorId(tid)
340341

341342
if (executorIdToTaskCount.contains(execId)) {
342-
removeExecutor(execId,
343+
reason = Some(
343344
SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
345+
removeExecutor(execId, reason.get)
344346
failedExecutor = Some(execId)
345347
}
346348
}
@@ -373,7 +375,8 @@ private[spark] class TaskSchedulerImpl(
373375
}
374376
// Update the DAGScheduler without holding a lock on this, since that can deadlock
375377
if (failedExecutor.isDefined) {
376-
dagScheduler.executorLost(failedExecutor.get)
378+
assert(reason.isDefined)
379+
dagScheduler.executorLost(failedExecutor.get, reason.get)
377380
backend.reviveOffers()
378381
}
379382
}
@@ -499,7 +502,7 @@ private[spark] class TaskSchedulerImpl(
499502
}
500503
// Call dagScheduler.executorLost without holding the lock on this to prevent deadlock
501504
if (failedExecutor.isDefined) {
502-
dagScheduler.executorLost(failedExecutor.get)
505+
dagScheduler.executorLost(failedExecutor.get, reason)
503506
backend.reviveOffers()
504507
}
505508
}

core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,10 +148,11 @@ private[spark] class StandaloneSchedulerBackend(
148148
fullId, hostPort, cores, Utils.megabytesToString(memory)))
149149
}
150150

151-
override def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]) {
151+
override def executorRemoved(
152+
fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean) {
152153
val reason: ExecutorLossReason = exitStatus match {
153154
case Some(code) => ExecutorExited(code, exitCausedByApp = true, message)
154-
case None => SlaveLost(message)
155+
case None => SlaveLost(message, workerLost = workerLost)
155156
}
156157
logInfo("Executor %s removed: %s".format(fullId, message))
157158
removeExecutor(fullId.split("/")(1), reason)

core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,8 @@ class AppClientSuite extends SparkFunSuite with LocalSparkContext with BeforeAnd
196196
execAddedList.add(id)
197197
}
198198

199-
def executorRemoved(id: String, message: String, exitStatus: Option[Int]): Unit = {
199+
def executorRemoved(
200+
id: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit = {
200201
execRemovedList.add(id)
201202
}
202203
}

0 commit comments

Comments
 (0)