Skip to content

Commit fd3928b

Browse files
committed
Do not unregister executor outputs unduly
1 parent 9883918 commit fd3928b

File tree

4 files changed

+33
-15
lines changed

4 files changed

+33
-15
lines changed

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
178178
return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses)
179179
}
180180
} else {
181+
logError("Missing all output locations for shuffle " + shuffleId)
181182
throw new MetadataFetchFailedException(
182183
shuffleId, reduceId, "Missing all output locations for shuffle " + shuffleId)
183184
}
@@ -348,7 +349,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
348349
new ConcurrentHashMap[Int, Array[MapStatus]]
349350
}
350351

351-
private[spark] object MapOutputTracker {
352+
private[spark] object MapOutputTracker extends Logging {
352353

353354
// Serialize an array of map output locations into an efficient byte format so that we can send
354355
// it to reduce tasks. We do this by compressing the serialized bytes using GZIP. They will
@@ -381,6 +382,7 @@ private[spark] object MapOutputTracker {
381382
statuses.map {
382383
status =>
383384
if (status == null) {
385+
logError("Missing an output location for shuffle " + shuffleId)
384386
throw new MetadataFetchFailedException(
385387
shuffleId, reduceId, "Missing an output location for shuffle " + shuffleId)
386388
} else {

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1091,7 +1091,7 @@ class DAGScheduler(
10911091

10921092
// TODO: mark the executor as failed only if there were lots of fetch failures on it
10931093
if (bmAddress != null) {
1094-
handleExecutorLost(bmAddress.executorId, Some(task.epoch))
1094+
handleExecutorLost(bmAddress.executorId, fetchFailed = true, Some(task.epoch))
10951095
}
10961096

10971097
case ExceptionFailure(className, description, stackTrace, metrics) =>
@@ -1111,25 +1111,35 @@ class DAGScheduler(
11111111
* Responds to an executor being lost. This is called inside the event loop, so it assumes it can
11121112
* modify the scheduler's internal state. Use executorLost() to post a loss event from outside.
11131113
*
1114+
* We will also assume that we've lost all shuffle blocks associated with the executor if the
1115+
* executor serves its own blocks (i.e., we're not using external shuffle) OR a FetchFailed
1116+
* occurred, in which case we presume all shuffle data related to this executor to be lost.
1117+
*
11141118
* Optionally the epoch during which the failure was caught can be passed to avoid allowing
11151119
* stray fetch failures from possibly retriggering the detection of a node as lost.
11161120
*/
1117-
private[scheduler] def handleExecutorLost(execId: String, maybeEpoch: Option[Long] = None) {
1121+
private[scheduler] def handleExecutorLost(
1122+
execId: String,
1123+
fetchFailed: Boolean,
1124+
maybeEpoch: Option[Long] = None) {
11181125
val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
11191126
if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {
11201127
failedEpoch(execId) = currentEpoch
11211128
logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
11221129
blockManagerMaster.removeExecutor(execId)
1123-
// TODO: This will be really slow if we keep accumulating shuffle map stages
1124-
for ((shuffleId, stage) <- shuffleToMapStage) {
1125-
stage.removeOutputsOnExecutor(execId)
1126-
val locs = stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray
1127-
mapOutputTracker.registerMapOutputs(shuffleId, locs, changeEpoch = true)
1128-
}
1129-
if (shuffleToMapStage.isEmpty) {
1130-
mapOutputTracker.incrementEpoch()
1130+
1131+
if (!env.blockManager.externalShuffleServiceEnabled || fetchFailed) {
1132+
// TODO: This will be really slow if we keep accumulating shuffle map stages
1133+
for ((shuffleId, stage) <- shuffleToMapStage) {
1134+
stage.removeOutputsOnExecutor(execId)
1135+
val locs = stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray
1136+
mapOutputTracker.registerMapOutputs(shuffleId, locs, changeEpoch = true)
1137+
}
1138+
if (shuffleToMapStage.isEmpty) {
1139+
mapOutputTracker.incrementEpoch()
1140+
}
1141+
clearCacheLocs()
11311142
}
1132-
clearCacheLocs()
11331143
} else {
11341144
logDebug("Additional executor lost message for " + execId +
11351145
"(epoch " + currentEpoch + ")")
@@ -1387,7 +1397,7 @@ private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGSchedule
13871397
dagScheduler.handleExecutorAdded(execId, host)
13881398

13891399
case ExecutorLost(execId) =>
1390-
dagScheduler.handleExecutorLost(execId)
1400+
dagScheduler.handleExecutorLost(execId, fetchFailed = false)
13911401

13921402
case BeginEvent(task, taskInfo) =>
13931403
dagScheduler.handleBeginEvent(task, taskInfo)

core/src/main/scala/org/apache/spark/scheduler/Stage.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,11 @@ private[spark] class Stage(
102102
}
103103
}
104104

105+
/**
106+
* Removes all shuffle outputs associated with this executor. Note that this will also remove
107+
* outputs which are served by an external shuffle server (if one exists), as they are still
108+
* registered with this execId.
109+
*/
105110
def removeOutputsOnExecutor(execId: String) {
106111
var becameUnavailable = false
107112
for (partition <- 0 until numPartitions) {

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -687,10 +687,11 @@ private[spark] class TaskSetManager(
687687
addPendingTask(index, readding=true)
688688
}
689689

690-
// Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage.
690+
// Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage,
691+
// and we are not using an external shuffle server which could serve the shuffle outputs.
691692
// The reason is the next stage wouldn't be able to fetch the data from this dead executor
692693
// so we would need to rerun these tasks on other executors.
693-
if (tasks(0).isInstanceOf[ShuffleMapTask]) {
694+
if (tasks(0).isInstanceOf[ShuffleMapTask] && !env.blockManager.externalShuffleServiceEnabled) {
694695
for ((tid, info) <- taskInfos if info.executorId == execId) {
695696
val index = taskInfos(tid).index
696697
if (successful(index)) {

0 commit comments

Comments
 (0)