From 2b968d3cea708879e77fb4bf6f19ac101f701688 Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 8 Apr 2019 17:18:53 -0700 Subject: [PATCH] Always close the serializer stream. But don't close the partition stream returned by the partition writer. --- .../shuffle/sort/UnsafeShuffleWriter.java | 64 +++++++++---------- .../io/DefaultShuffleMapOutputWriter.java | 13 +--- 2 files changed, 35 insertions(+), 42 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index ba0e208babad..d67caf1cbed5 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -24,7 +24,6 @@ import java.util.Arrays; import java.util.Iterator; -import org.apache.spark.storage.ShuffleBlockId; import scala.Option; import scala.Product2; import scala.collection.JavaConverters; @@ -378,43 +377,44 @@ private long[] mergeSpillsWithFileStream( logger.error("In partition: " + partition); boolean copyThrewExecption = true; ShufflePartitionWriter writer = null; - OutputStream partitionOutput; try { writer = mapWriter.getNextPartitionWriter(); - // Shield the underlying output stream from close() and flush() calls, so we can close - // the higher level streams to make sure all data is really flushed and internal state is - // cleaned. - partitionOutput = writer.toStream(); - // Here, we don't need to perform any metrics updates when spills.length < 2 because the - // bytes written to this output file would have already been counted as shuffle bytes written. - if (spills.length >= 2) { - partitionOutput = new TimeTrackingOutputStream(writeMetrics, partitionOutput); - } - partitionOutput = blockManager.serializerManager().wrapForEncryption(partitionOutput); - if (compressionCodec != null) { - partitionOutput = compressionCodec.compressedOutputStream(partitionOutput); - } - for (int i = 0; i < spills.length; i++) { - final long partitionLengthInSpill = spills[i].partitionLengths[partition]; - logger.error("PartitionLengthsInSpill: " + partitionLengthInSpill); + OutputStream partitionOutput = null; + try { + partitionOutput = new CloseShieldOutputStream(writer.toStream()); + // Here, we don't need to perform any metrics updates when spills.length < 2 because the + // bytes written to this output file would have already been counted as shuffle bytes written. + if (spills.length >= 2) { + partitionOutput = new TimeTrackingOutputStream(writeMetrics, partitionOutput); + } + partitionOutput = blockManager.serializerManager().wrapForEncryption(partitionOutput); + if (compressionCodec != null) { + partitionOutput = compressionCodec.compressedOutputStream(partitionOutput); + } + for (int i = 0; i < spills.length; i++) { + final long partitionLengthInSpill = spills[i].partitionLengths[partition]; + logger.error("PartitionLengthsInSpill: " + partitionLengthInSpill); - if (partitionLengthInSpill > 0) { - InputStream partitionInputStream = null; - try { - partitionInputStream = new LimitedInputStream(spillInputStreams[i], - partitionLengthInSpill, false); - partitionInputStream = blockManager.serializerManager().wrapForEncryption( - partitionInputStream); - if (compressionCodec != null) { - partitionInputStream = compressionCodec.compressedInputStream( - partitionInputStream); + if (partitionLengthInSpill > 0) { + InputStream partitionInputStream = null; + try { + partitionInputStream = new LimitedInputStream(spillInputStreams[i], + partitionLengthInSpill, false); + partitionInputStream = blockManager.serializerManager().wrapForEncryption( + partitionInputStream); + if (compressionCodec != null) { + partitionInputStream = compressionCodec.compressedInputStream( + partitionInputStream); + } + ByteStreams.copy(partitionInputStream, partitionOutput); + } finally { + partitionInputStream.close(); } - ByteStreams.copy(partitionInputStream, partitionOutput); - } finally { - partitionInputStream.close(); } + copyThrewExecption = false; } - copyThrewExecption = false; + } finally { + Closeables.close(partitionOutput, copyThrewExecption); } } finally { logger.error("Closing the writer"); diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java index c4042868e8e6..c12a19cf3092 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java @@ -198,9 +198,10 @@ public long getNumBytesWritten() { } @Override - public void close() throws IOException { + public void close() { log.error("Do I have an outputTempFile: " + outputTempFile.exists()); if (stream != null) { + // Closing is a no-op. stream.close(); } partitionLengths[partitionId] = getNumBytesWritten(); @@ -230,18 +231,10 @@ public void write(byte[] buf, int pos, int length) throws IOException { } @Override - public void close() throws IOException { - flush(); + public void close() { isClosed = true; } - @Override - public void flush() throws IOException { - if (!isClosed) { - outputBufferedFileStream.flush(); - } - } - private void verifyNotClosed() { if (isClosed) { throw new IllegalStateException("Attempting to write to a closed block output stream.");