Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.Arrays;
import java.util.Iterator;

import org.apache.spark.storage.ShuffleBlockId;
import scala.Option;
import scala.Product2;
import scala.collection.JavaConverters;
Expand Down Expand Up @@ -378,43 +377,44 @@ private long[] mergeSpillsWithFileStream(
logger.error("In partition: " + partition);
boolean copyThrewExecption = true;
ShufflePartitionWriter writer = null;
OutputStream partitionOutput;
try {
writer = mapWriter.getNextPartitionWriter();
// Shield the underlying output stream from close() and flush() calls, so we can close
// the higher level streams to make sure all data is really flushed and internal state is
// cleaned.
partitionOutput = writer.toStream();
// Here, we don't need to perform any metrics updates when spills.length < 2 because the
// bytes written to this output file would have already been counted as shuffle bytes written.
if (spills.length >= 2) {
partitionOutput = new TimeTrackingOutputStream(writeMetrics, partitionOutput);
}
partitionOutput = blockManager.serializerManager().wrapForEncryption(partitionOutput);
if (compressionCodec != null) {
partitionOutput = compressionCodec.compressedOutputStream(partitionOutput);
}
for (int i = 0; i < spills.length; i++) {
final long partitionLengthInSpill = spills[i].partitionLengths[partition];
logger.error("PartitionLengthsInSpill: " + partitionLengthInSpill);
OutputStream partitionOutput = null;
try {
partitionOutput = new CloseShieldOutputStream(writer.toStream());
// Here, we don't need to perform any metrics updates when spills.length < 2 because the
// bytes written to this output file would have already been counted as shuffle bytes written.
if (spills.length >= 2) {
partitionOutput = new TimeTrackingOutputStream(writeMetrics, partitionOutput);
}
partitionOutput = blockManager.serializerManager().wrapForEncryption(partitionOutput);
if (compressionCodec != null) {
partitionOutput = compressionCodec.compressedOutputStream(partitionOutput);
}
for (int i = 0; i < spills.length; i++) {
final long partitionLengthInSpill = spills[i].partitionLengths[partition];
logger.error("PartitionLengthsInSpill: " + partitionLengthInSpill);

if (partitionLengthInSpill > 0) {
InputStream partitionInputStream = null;
try {
partitionInputStream = new LimitedInputStream(spillInputStreams[i],
partitionLengthInSpill, false);
partitionInputStream = blockManager.serializerManager().wrapForEncryption(
partitionInputStream);
if (compressionCodec != null) {
partitionInputStream = compressionCodec.compressedInputStream(
partitionInputStream);
if (partitionLengthInSpill > 0) {
InputStream partitionInputStream = null;
try {
partitionInputStream = new LimitedInputStream(spillInputStreams[i],
partitionLengthInSpill, false);
partitionInputStream = blockManager.serializerManager().wrapForEncryption(
partitionInputStream);
if (compressionCodec != null) {
partitionInputStream = compressionCodec.compressedInputStream(
partitionInputStream);
}
ByteStreams.copy(partitionInputStream, partitionOutput);
} finally {
partitionInputStream.close();
}
ByteStreams.copy(partitionInputStream, partitionOutput);
} finally {
partitionInputStream.close();
}
copyThrewExecption = false;
}
copyThrewExecption = false;
} finally {
Closeables.close(partitionOutput, copyThrewExecption);
}
} finally {
logger.error("Closing the writer");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,10 @@ public long getNumBytesWritten() {
}

@Override
public void close() throws IOException {
public void close() {
log.error("Do I have an outputTempFile: " + outputTempFile.exists());
if (stream != null) {
// Closing is a no-op.
stream.close();
}
partitionLengths[partitionId] = getNumBytesWritten();
Expand Down Expand Up @@ -230,18 +231,10 @@ public void write(byte[] buf, int pos, int length) throws IOException {
}

@Override
public void close() throws IOException {
flush();
public void close() {
isClosed = true;
}

@Override
public void flush() throws IOException {
if (!isClosed) {
outputBufferedFileStream.flush();
}
}

private void verifyNotClosed() {
if (isClosed) {
throw new IllegalStateException("Attempting to write to a closed block output stream.");
Expand Down