Skip to content

Commit 649fa4b

Browse files
ericlJoshRosen
authored andcommitted
[SPARK-17370] Shuffle service files not invalidated when a slave is lost
## What changes were proposed in this pull request? DAGScheduler invalidates shuffle files when an executor loss event occurs, but not when the external shuffle service is enabled. This is because when shuffle service is on, the shuffle file lifetime can exceed the executor lifetime. However, it also doesn't invalidate shuffle files when the shuffle service itself is lost (due to whole slave loss). This can cause long hangs when slaves are lost since the file loss is not detected until a subsequent stage attempts to read the shuffle files. The proposed fix is to also invalidate shuffle files when an executor is lost due to a `SlaveLost` event. ## How was this patch tested? Unit tests, also verified on an actual cluster that slave loss invalidates shuffle files immediately as expected. cc mateiz Author: Eric Liang <[email protected]> Closes #14931 from ericl/sc-4439.
1 parent 76ad89e commit 649fa4b

File tree

12 files changed

+92
-31
lines changed

12 files changed

+92
-31
lines changed

core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ private[deploy] object DeployMessages {
135135
}
136136

137137
case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String],
138-
exitStatus: Option[Int])
138+
exitStatus: Option[Int], workerLost: Boolean)
139139

140140
case class ApplicationRemoved(message: String)
141141

core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,12 +174,12 @@ private[spark] class StandaloneAppClient(
174174
cores))
175175
listener.executorAdded(fullId, workerId, hostPort, cores, memory)
176176

177-
case ExecutorUpdated(id, state, message, exitStatus) =>
177+
case ExecutorUpdated(id, state, message, exitStatus, workerLost) =>
178178
val fullId = appId + "/" + id
179179
val messageText = message.map(s => " (" + s + ")").getOrElse("")
180180
logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText))
181181
if (ExecutorState.isFinished(state)) {
182-
listener.executorRemoved(fullId, message.getOrElse(""), exitStatus)
182+
listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerLost)
183183
}
184184

185185
case MasterChanged(masterRef, masterWebUiUrl) =>

core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,5 +36,6 @@ private[spark] trait StandaloneAppClientListener {
3636
def executorAdded(
3737
fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int): Unit
3838

39-
def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]): Unit
39+
def executorRemoved(
40+
fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit
4041
}

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ private[deploy] class Master(
252252
appInfo.resetRetryCount()
253253
}
254254

255-
exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus))
255+
exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, false))
256256

257257
if (ExecutorState.isFinished(state)) {
258258
// Remove this executor from the worker and app
@@ -766,7 +766,7 @@ private[deploy] class Master(
766766
for (exec <- worker.executors.values) {
767767
logInfo("Telling app of lost executor: " + exec.id)
768768
exec.application.driver.send(ExecutorUpdated(
769-
exec.id, ExecutorState.LOST, Some("worker lost"), None))
769+
exec.id, ExecutorState.LOST, Some("worker lost"), None, workerLost = true))
770770
exec.state = ExecutorState.LOST
771771
exec.application.removeExecutor(exec)
772772
}

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -239,8 +239,8 @@ class DAGScheduler(
239239
/**
240240
* Called by TaskScheduler implementation when an executor fails.
241241
*/
242-
def executorLost(execId: String): Unit = {
243-
eventProcessLoop.post(ExecutorLost(execId))
242+
def executorLost(execId: String, reason: ExecutorLossReason): Unit = {
243+
eventProcessLoop.post(ExecutorLost(execId, reason))
244244
}
245245

246246
/**
@@ -1281,7 +1281,7 @@ class DAGScheduler(
12811281

12821282
// TODO: mark the executor as failed only if there were lots of fetch failures on it
12831283
if (bmAddress != null) {
1284-
handleExecutorLost(bmAddress.executorId, fetchFailed = true, Some(task.epoch))
1284+
handleExecutorLost(bmAddress.executorId, filesLost = true, Some(task.epoch))
12851285
}
12861286
}
12871287

@@ -1306,23 +1306,25 @@ class DAGScheduler(
13061306
* modify the scheduler's internal state. Use executorLost() to post a loss event from outside.
13071307
*
13081308
* We will also assume that we've lost all shuffle blocks associated with the executor if the
1309-
* executor serves its own blocks (i.e., we're not using external shuffle) OR a FetchFailed
1310-
* occurred, in which case we presume all shuffle data related to this executor to be lost.
1309+
* executor serves its own blocks (i.e., we're not using external shuffle), the entire slave
1310+
* is lost (likely including the shuffle service), or a FetchFailed occurred, in which case we
1311+
* presume all shuffle data related to this executor to be lost.
13111312
*
13121313
* Optionally the epoch during which the failure was caught can be passed to avoid allowing
13131314
* stray fetch failures from possibly retriggering the detection of a node as lost.
13141315
*/
13151316
private[scheduler] def handleExecutorLost(
13161317
execId: String,
1317-
fetchFailed: Boolean,
1318+
filesLost: Boolean,
13181319
maybeEpoch: Option[Long] = None) {
13191320
val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
13201321
if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {
13211322
failedEpoch(execId) = currentEpoch
13221323
logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
13231324
blockManagerMaster.removeExecutor(execId)
13241325

1325-
if (!env.blockManager.externalShuffleServiceEnabled || fetchFailed) {
1326+
if (filesLost || !env.blockManager.externalShuffleServiceEnabled) {
1327+
logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch))
13261328
// TODO: This will be really slow if we keep accumulating shuffle map stages
13271329
for ((shuffleId, stage) <- shuffleIdToMapStage) {
13281330
stage.removeOutputsOnExecutor(execId)
@@ -1624,8 +1626,12 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
16241626
case ExecutorAdded(execId, host) =>
16251627
dagScheduler.handleExecutorAdded(execId, host)
16261628

1627-
case ExecutorLost(execId) =>
1628-
dagScheduler.handleExecutorLost(execId, fetchFailed = false)
1629+
case ExecutorLost(execId, reason) =>
1630+
val filesLost = reason match {
1631+
case SlaveLost(_, true) => true
1632+
case _ => false
1633+
}
1634+
dagScheduler.handleExecutorLost(execId, filesLost)
16291635

16301636
case BeginEvent(task, taskInfo) =>
16311637
dagScheduler.handleBeginEvent(task, taskInfo)

core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ private[scheduler] case class CompletionEvent(
7777

7878
private[scheduler] case class ExecutorAdded(execId: String, host: String) extends DAGSchedulerEvent
7979

80-
private[scheduler] case class ExecutorLost(execId: String) extends DAGSchedulerEvent
80+
private[scheduler] case class ExecutorLost(execId: String, reason: ExecutorLossReason)
81+
extends DAGSchedulerEvent
8182

8283
private[scheduler]
8384
case class TaskSetFailed(taskSet: TaskSet, reason: String, exception: Option[Throwable])

core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ private[spark] object ExecutorKilled extends ExecutorLossReason("Executor killed
5151
*/
5252
private [spark] object LossReasonPending extends ExecutorLossReason("Pending loss reason.")
5353

54+
/**
55+
* @param _message human readable loss reason
56+
* @param workerLost whether the worker is confirmed lost too (i.e. including shuffle service)
57+
*/
5458
private[spark]
55-
case class SlaveLost(_message: String = "Slave lost")
59+
case class SlaveLost(_message: String = "Slave lost", workerLost: Boolean = false)
5660
extends ExecutorLossReason(_message)

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -346,15 +346,17 @@ private[spark] class TaskSchedulerImpl(
346346

347347
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
348348
var failedExecutor: Option[String] = None
349+
var reason: Option[ExecutorLossReason] = None
349350
synchronized {
350351
try {
351352
if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
352353
// We lost this entire executor, so remember that it's gone
353354
val execId = taskIdToExecutorId(tid)
354355

355356
if (executorIdToTaskCount.contains(execId)) {
356-
removeExecutor(execId,
357+
reason = Some(
357358
SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
359+
removeExecutor(execId, reason.get)
358360
failedExecutor = Some(execId)
359361
}
360362
}
@@ -387,7 +389,8 @@ private[spark] class TaskSchedulerImpl(
387389
}
388390
// Update the DAGScheduler without holding a lock on this, since that can deadlock
389391
if (failedExecutor.isDefined) {
390-
dagScheduler.executorLost(failedExecutor.get)
392+
assert(reason.isDefined)
393+
dagScheduler.executorLost(failedExecutor.get, reason.get)
391394
backend.reviveOffers()
392395
}
393396
}
@@ -513,7 +516,7 @@ private[spark] class TaskSchedulerImpl(
513516
}
514517
// Call dagScheduler.executorLost without holding the lock on this to prevent deadlock
515518
if (failedExecutor.isDefined) {
516-
dagScheduler.executorLost(failedExecutor.get)
519+
dagScheduler.executorLost(failedExecutor.get, reason)
517520
backend.reviveOffers()
518521
}
519522
}

core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,10 +150,11 @@ private[spark] class StandaloneSchedulerBackend(
150150
fullId, hostPort, cores, Utils.megabytesToString(memory)))
151151
}
152152

153-
override def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]) {
153+
override def executorRemoved(
154+
fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean) {
154155
val reason: ExecutorLossReason = exitStatus match {
155156
case Some(code) => ExecutorExited(code, exitCausedByApp = true, message)
156-
case None => SlaveLost(message)
157+
case None => SlaveLost(message, workerLost = workerLost)
157158
}
158159
logInfo("Executor %s removed: %s".format(fullId, message))
159160
removeExecutor(fullId.split("/")(1), reason)

core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,8 @@ class AppClientSuite
210210
execAddedList.add(id)
211211
}
212212

213-
def executorRemoved(id: String, message: String, exitStatus: Option[Int]): Unit = {
213+
def executorRemoved(
214+
id: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit = {
214215
execRemovedList.add(id)
215216
}
216217
}

0 commit comments

Comments
 (0)