From 9887ef1dbcd39ab796c77239d5bba0da5e9ba3ab Mon Sep 17 00:00:00 2001 From: x00228947 Date: Wed, 21 Oct 2015 10:30:01 +0800 Subject: [PATCH 1/2] remove invalid code --- .../apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index f5d80bbcf355..cea010f54d8a 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -108,7 +108,7 @@ public void insertAll(Iterator> records) throws IOException { final File file = tempShuffleBlockIdPlusFile._2(); final BlockId blockId = tempShuffleBlockIdPlusFile._1(); partitionWriters[i] = - blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics).open(); + blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics); } // Creating the file to write to and creating a disk writer both involve interacting with // the disk, and can take a long time in aggregate when we open many files, so should be From 2dd4d1e78d1f9a7999d15ef9cb92d97c2f5ad0ea Mon Sep 17 00:00:00 2001 From: x00228947 Date: Wed, 21 Oct 2015 16:47:18 +0800 Subject: [PATCH 2/2] fix issue --- .../spark/shuffle/sort/BypassMergeSortShuffleWriter.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index cea010f54d8a..1dc4797b1928 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -143,7 +143,12 @@ public long[] writePartitionedFile( boolean threwException = true; try { for (int i = 0; i < numPartitions; i++) { - final FileInputStream in = new FileInputStream(partitionWriters[i].fileSegment().file()); + File file = partitionWriters[i].fileSegment().file(); + if (!file.exists()) { + lengths[i] = 0; + continue; + } + final FileInputStream in = new FileInputStream(file); boolean copyThrewException = true; try { lengths[i] = Utils.copyStream(in, out, false, transferToEnabled); @@ -151,7 +156,7 @@ public long[] writePartitionedFile( } finally { Closeables.close(in, copyThrewException); } - if (!partitionWriters[i].fileSegment().file().delete()) { + if (!file.delete()) { logger.error("Unable to delete file for partition {}", i); } }