@@ -1046,6 +1046,7 @@ class DAGScheduler(
10461046
10471047 case FetchFailed (bmAddress, shuffleId, mapId, reduceId) =>
10481048 val failedStage = stageIdToStage(task.stageId)
1049+ val mapStage = shuffleToMapStage(shuffleId)
10491050 // It is likely that we receive multiple FetchFailed for a single stage (because we have
10501051 // multiple tasks running concurrently on different executors). In that case, it is possible
10511052 // the fetch failure has already been handled by the executor.
@@ -1056,13 +1057,6 @@ class DAGScheduler(
10561057 logInfo(" Marking " + failedStage + " (" + failedStage.name +
10571058 " ) for resubmision due to a fetch failure" )
10581059
1059- // Mark the map whose fetch failed as broken in the map stage
1060- val mapStage = shuffleToMapStage(shuffleId)
1061- if (mapId != - 1 ) {
1062- mapStage.removeOutputLoc(mapId, bmAddress)
1063- mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
1064- }
1065-
10661060 logInfo(" The failed fetch was from " + mapStage + " (" + mapStage.name +
10671061 " ); marking it for resubmission" )
10681062 if (failedStages.isEmpty && eventProcessActor != null ) {
@@ -1076,6 +1070,13 @@ class DAGScheduler(
10761070 failedStages += failedStage
10771071 failedStages += mapStage
10781072 }
1073+
1074+ // Mark the map whose fetch failed as broken in the map stage
1075+ if (mapId != - 1 ) {
1076+ mapStage.removeOutputLoc(mapId, bmAddress)
1077+ mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
1078+ }
1079+
10791080 // TODO: mark the executor as failed only if there were lots of fetch failures on it
10801081 if (bmAddress != null ) {
10811082 handleExecutorLost(bmAddress.executorId, Some (task.epoch))
0 commit comments