Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.spark.shuffle.api.SingleSpillShuffleMapOutputWriter;
import org.apache.spark.shuffle.api.WritableByteChannelWrapper;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.TimeTrackingOutputStream;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.util.Utils;

Expand Down Expand Up @@ -382,6 +383,7 @@ private void mergeSpillsWithFileStream(
ShufflePartitionWriter writer = mapWriter.getPartitionWriter(partition);
OutputStream partitionOutput = writer.openStream();
try {
partitionOutput = new TimeTrackingOutputStream(writeMetrics, partitionOutput);
partitionOutput = blockManager.serializerManager().wrapForEncryption(partitionOutput);
if (compressionCodec != null) {
partitionOutput = compressionCodec.compressedOutputStream(partitionOutput);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.{Closeable, IOException, OutputStream}

import org.apache.spark.serializer.{SerializationStream, SerializerInstance, SerializerManager}
import org.apache.spark.shuffle.api.ShufflePartitionWriter
import org.apache.spark.storage.BlockId
import org.apache.spark.storage.{BlockId, TimeTrackingOutputStream}
import org.apache.spark.util.Utils
import org.apache.spark.util.collection.PairsWriter

Expand All @@ -39,6 +39,7 @@ private[spark] class ShufflePartitionPairsWriter(

private var isClosed = false
private var partitionStream: OutputStream = _
private var timeTrackingStream: OutputStream = _
private var wrappedStream: OutputStream = _
private var objOut: SerializationStream = _
private var numRecordsWritten = 0
Expand All @@ -59,7 +60,8 @@ private[spark] class ShufflePartitionPairsWriter(
private def open(): Unit = {
try {
partitionStream = partitionWriter.openStream
wrappedStream = serializerManager.wrapStream(blockId, partitionStream)
timeTrackingStream = new TimeTrackingOutputStream(writeMetrics, partitionStream)
wrappedStream = serializerManager.wrapStream(blockId, timeTrackingStream)
objOut = serializerInstance.serializeStream(wrappedStream)
} catch {
case e: Exception =>
Expand All @@ -78,6 +80,7 @@ private[spark] class ShufflePartitionPairsWriter(
// Setting these to null will prevent the underlying streams from being closed twice
// just in case any stream's close() implementation is not idempotent.
wrappedStream = null
timeTrackingStream = null
partitionStream = null
} {
// Normally closing objOut would close the inner streams as well, but just in case there
Expand All @@ -86,9 +89,15 @@ private[spark] class ShufflePartitionPairsWriter(
wrappedStream = closeIfNonNull(wrappedStream)
// Same as above - if wrappedStream closes then assume it closes underlying
// partitionStream and don't close again in the finally
timeTrackingStream = null
partitionStream = null
} {
partitionStream = closeIfNonNull(partitionStream)
Utils.tryWithSafeFinally {
timeTrackingStream = closeIfNonNull(timeTrackingStream)
partitionStream = null
} {
partitionStream = closeIfNonNull(partitionStream)
}
}
}
updateBytesWritten()
Expand Down