diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index f5d80bbcf355..1dc4797b1928 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -108,7 +108,7 @@ public void insertAll(Iterator> records) throws IOException { final File file = tempShuffleBlockIdPlusFile._2(); final BlockId blockId = tempShuffleBlockIdPlusFile._1(); partitionWriters[i] = - blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics).open(); + blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics); } // Creating the file to write to and creating a disk writer both involve interacting with // the disk, and can take a long time in aggregate when we open many files, so should be @@ -143,7 +143,12 @@ public long[] writePartitionedFile( boolean threwException = true; try { for (int i = 0; i < numPartitions; i++) { - final FileInputStream in = new FileInputStream(partitionWriters[i].fileSegment().file()); + File file = partitionWriters[i].fileSegment().file(); + if (!file.exists()) { + lengths[i] = 0; + continue; + } + final FileInputStream in = new FileInputStream(file); boolean copyThrewException = true; try { lengths[i] = Utils.copyStream(in, out, false, transferToEnabled); @@ -151,7 +156,7 @@ public long[] writePartitionedFile( } finally { Closeables.close(in, copyThrewException); } - if (!partitionWriters[i].fileSegment().file().delete()) { + if (!file.delete()) { logger.error("Unable to delete file for partition {}", i); } }