Skip to content

Commit c07cbb3

Browse files
ericlJoshRosen
authored andcommitted
[SPARK-17371] Resubmitted shuffle outputs can get deleted by zombie map tasks
## What changes were proposed in this pull request? It seems that old shuffle map tasks hanging around after a stage resubmit will delete intended shuffle output files on stop(), causing downstream stages to fail even after successful resubmit completion. This can happen easily if the prior map task is waiting for a network timeout when its stage is resubmitted. This can cause unnecessary stage resubmits, sometimes multiple times as fetch fails cause a cascade of shuffle file invalidations, and confusing FetchFailure messages that report shuffle index files missing from the local disk. Given that IndexShuffleBlockResolver commits data atomically, it seems unnecessary to ever delete committed task output: even in the rare case that a task is failed after it finishes committing shuffle output, it should be safe to retain that output. ## How was this patch tested? Prior to the fix proposed in #14931, I was able to reproduce this behavior by killing slaves in the middle of a large shuffle. After this patch, stages were no longer resubmitted multiple times due to shuffle index loss. cc JoshRosen vanzin Author: Eric Liang <[email protected]> Closes #14932 from ericl/dont-remove-committed-files.
1 parent 175b434 commit c07cbb3

File tree

3 files changed

+0
-5
lines changed

3 files changed

+0
-5
lines changed

core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,6 @@ public Option<MapStatus> stop(boolean success) {
238238
partitionWriters = null;
239239
}
240240
}
241-
shuffleBlockResolver.removeDataByMap(shuffleId, mapId);
242241
return None$.empty();
243242
}
244243
}

core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -465,8 +465,6 @@ public Option<MapStatus> stop(boolean success) {
465465
}
466466
return Option.apply(mapStatus);
467467
} else {
468-
// The map task failed, so delete our output data.
469-
shuffleBlockResolver.removeDataByMap(shuffleId, mapId);
470468
return Option.apply(null);
471469
}
472470
}

core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,6 @@ private[spark] class SortShuffleWriter[K, V, C](
8383
if (success) {
8484
return Option(mapStatus)
8585
} else {
86-
// The map task failed, so delete our output data.
87-
shuffleBlockResolver.removeDataByMap(dep.shuffleId, mapId)
8886
return None
8987
}
9088
} finally {

0 commit comments

Comments
 (0)