From ca58a9920e05635f98d7cd5c78378beb473e00f9 Mon Sep 17 00:00:00 2001 From: mccheah Date: Tue, 16 Apr 2019 19:13:31 -0700 Subject: [PATCH 01/16] [SPARK-28570][CORE][SHUFFLE] Make UnsafeShuffleWriter use the new API. --- .../shuffle/sort/UnsafeShuffleWriter.java | 257 +++++++++--------- .../shuffle/sort/SortShuffleManager.scala | 4 +- .../sort/UnsafeShuffleWriterSuite.java | 56 ++-- 3 files changed, 171 insertions(+), 146 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index 9d05f03613ce9..92a33f6a7dfce 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -17,11 +17,15 @@ package org.apache.spark.shuffle.sort; +import java.nio.channels.Channels; import javax.annotation.Nullable; import java.io.*; import java.nio.channels.FileChannel; +import java.nio.channels.WritableByteChannel; import java.util.Iterator; +import org.apache.spark.shuffle.api.ShuffleExecutorComponents; +import org.apache.spark.shuffle.api.WritableByteChannelWrapper; import scala.Option; import scala.Product2; import scala.collection.JavaConverters; @@ -31,18 +35,17 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.io.ByteStreams; import com.google.common.io.Closeables; -import com.google.common.io.Files; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.spark.*; import org.apache.spark.annotation.Private; +import org.apache.spark.shuffle.api.ShuffleMapOutputWriter; +import org.apache.spark.shuffle.api.ShufflePartitionWriter; import org.apache.spark.internal.config.package$; import org.apache.spark.io.CompressionCodec; import org.apache.spark.io.CompressionCodec$; import org.apache.spark.io.NioBufferedFileInputStream; -import org.apache.commons.io.output.CloseShieldOutputStream; -import org.apache.commons.io.output.CountingOutputStream; import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.network.util.LimitedInputStream; import org.apache.spark.scheduler.MapStatus; @@ -50,10 +53,8 @@ import org.apache.spark.shuffle.ShuffleWriteMetricsReporter; import org.apache.spark.serializer.SerializationStream; import org.apache.spark.serializer.SerializerInstance; -import org.apache.spark.shuffle.IndexShuffleBlockResolver; import org.apache.spark.shuffle.ShuffleWriter; import org.apache.spark.storage.BlockManager; -import org.apache.spark.storage.TimeTrackingOutputStream; import org.apache.spark.unsafe.Platform; import org.apache.spark.util.Utils; @@ -65,15 +66,14 @@ public class UnsafeShuffleWriter extends ShuffleWriter { private static final ClassTag OBJECT_CLASS_TAG = ClassTag$.MODULE$.Object(); @VisibleForTesting - static final int DEFAULT_INITIAL_SORT_BUFFER_SIZE = 4096; static final int DEFAULT_INITIAL_SER_BUFFER_SIZE = 1024 * 1024; private final BlockManager blockManager; - private final IndexShuffleBlockResolver shuffleBlockResolver; private final TaskMemoryManager memoryManager; private final SerializerInstance serializer; private final Partitioner partitioner; private final ShuffleWriteMetricsReporter writeMetrics; + private final ShuffleExecutorComponents shuffleExecutorComponents; private final int shuffleId; private final int mapId; private final TaskContext taskContext; @@ -81,7 +81,6 @@ public class UnsafeShuffleWriter extends ShuffleWriter { private final boolean transferToEnabled; private final int initialSortBufferSize; private final int inputBufferSizeInBytes; - private final int outputBufferSizeInBytes; @Nullable private MapStatus mapStatus; @Nullable private ShuffleExternalSorter sorter; @@ -103,27 +102,15 @@ private static final class MyByteArrayOutputStream extends ByteArrayOutputStream */ private boolean stopping = false; - private class CloseAndFlushShieldOutputStream extends CloseShieldOutputStream { - - CloseAndFlushShieldOutputStream(OutputStream outputStream) { - super(outputStream); - } - - @Override - public void flush() { - // do nothing - } - } - public UnsafeShuffleWriter( BlockManager blockManager, - IndexShuffleBlockResolver shuffleBlockResolver, TaskMemoryManager memoryManager, SerializedShuffleHandle handle, int mapId, TaskContext taskContext, SparkConf sparkConf, - ShuffleWriteMetricsReporter writeMetrics) throws IOException { + ShuffleWriteMetricsReporter writeMetrics, + ShuffleExecutorComponents shuffleExecutorComponents) { final int numPartitions = handle.dependency().partitioner().numPartitions(); if (numPartitions > SortShuffleManager.MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE()) { throw new IllegalArgumentException( @@ -132,7 +119,6 @@ public UnsafeShuffleWriter( " reduce partitions"); } this.blockManager = blockManager; - this.shuffleBlockResolver = shuffleBlockResolver; this.memoryManager = memoryManager; this.mapId = mapId; final ShuffleDependency dep = handle.dependency(); @@ -140,6 +126,7 @@ public UnsafeShuffleWriter( this.serializer = dep.serializer().newInstance(); this.partitioner = dep.partitioner(); this.writeMetrics = writeMetrics; + this.shuffleExecutorComponents = shuffleExecutorComponents; this.taskContext = taskContext; this.sparkConf = sparkConf; this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", true); @@ -147,8 +134,6 @@ public UnsafeShuffleWriter( (int) (long) sparkConf.get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()); this.inputBufferSizeInBytes = (int) (long) sparkConf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024; - this.outputBufferSizeInBytes = - (int) (long) sparkConf.get(package$.MODULE$.SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE()) * 1024; open(); } @@ -230,24 +215,31 @@ void closeAndWriteOutput() throws IOException { serOutputStream = null; final SpillInfo[] spills = sorter.closeAndGetSpills(); sorter = null; + final ShuffleMapOutputWriter mapWriter = shuffleExecutorComponents + .createMapOutputWriter( + shuffleId, + mapId, + taskContext.taskAttemptId(), + partitioner.numPartitions()); final long[] partitionLengths; - final File output = shuffleBlockResolver.getDataFile(shuffleId, mapId); - final File tmp = Utils.tempFileWith(output); try { try { - partitionLengths = mergeSpills(spills, tmp); + partitionLengths = mergeSpills(spills, mapWriter); } finally { for (SpillInfo spill : spills) { - if (spill.file.exists() && ! spill.file.delete()) { + if (spill.file.exists() && !spill.file.delete()) { logger.error("Error while deleting spill file {}", spill.file.getPath()); } } } - shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp); - } finally { - if (tmp.exists() && !tmp.delete()) { - logger.error("Error while deleting temp file {}", tmp.getAbsolutePath()); + mapWriter.commitAllPartitions(); + } catch (Exception e) { + try { + mapWriter.abort(e); + } catch (Exception innerE) { + logger.error("Failed to abort the Map Output Writer", innerE); } + throw e; } mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); } @@ -281,7 +273,8 @@ void forceSorterToSpill() throws IOException { * * @return the partition lengths in the merged file. */ - private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOException { + private long[] mergeSpills(SpillInfo[] spills, + ShuffleMapOutputWriter mapWriter) throws IOException { final boolean compressionEnabled = (boolean) sparkConf.get(package$.MODULE$.SHUFFLE_COMPRESS()); final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.createCodec(sparkConf); final boolean fastMergeEnabled = @@ -289,17 +282,12 @@ private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOExcepti final boolean fastMergeIsSupported = !compressionEnabled || CompressionCodec$.MODULE$.supportsConcatenationOfSerializedStreams(compressionCodec); final boolean encryptionEnabled = blockManager.serializerManager().encryptionEnabled(); + final int numPartitions = partitioner.numPartitions(); + long[] partitionLengths = new long[numPartitions]; try { if (spills.length == 0) { - new FileOutputStream(outputFile).close(); // Create an empty file - return new long[partitioner.numPartitions()]; - } else if (spills.length == 1) { - // Here, we don't need to perform any metrics updates because the bytes written to this - // output file would have already been counted as shuffle bytes written. - Files.move(spills[0].file, outputFile); - return spills[0].partitionLengths; + return partitionLengths; } else { - final long[] partitionLengths; // There are multiple spills to merge, so none of these spill files' lengths were counted // towards our shuffle write count or shuffle write time. If we use the slow merge path, // then the final output file's size won't necessarily be equal to the sum of the spill @@ -316,14 +304,14 @@ private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOExcepti // that doesn't need to interpret the spilled bytes. if (transferToEnabled && !encryptionEnabled) { logger.debug("Using transferTo-based fast merge"); - partitionLengths = mergeSpillsWithTransferTo(spills, outputFile); + partitionLengths = mergeSpillsWithTransferTo(spills, mapWriter); } else { logger.debug("Using fileStream-based fast merge"); - partitionLengths = mergeSpillsWithFileStream(spills, outputFile, null); + partitionLengths = mergeSpillsWithFileStream(spills, mapWriter, null); } } else { logger.debug("Using slow merge"); - partitionLengths = mergeSpillsWithFileStream(spills, outputFile, compressionCodec); + partitionLengths = mergeSpillsWithFileStream(spills, mapWriter, compressionCodec); } // When closing an UnsafeShuffleExternalSorter that has already spilled once but also has // in-memory records, we write out the in-memory records to a file but do not count that @@ -331,13 +319,9 @@ private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOExcepti // to be counted as shuffle write, but this will lead to double-counting of the final // SpillInfo's bytes. writeMetrics.decBytesWritten(spills[spills.length - 1].file.length()); - writeMetrics.incBytesWritten(outputFile.length()); return partitionLengths; } } catch (IOException e) { - if (outputFile.exists() && !outputFile.delete()) { - logger.error("Unable to delete output file {}", outputFile.getPath()); - } throw e; } } @@ -345,73 +329,75 @@ private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOExcepti /** * Merges spill files using Java FileStreams. This code path is typically slower than * the NIO-based merge, {@link UnsafeShuffleWriter#mergeSpillsWithTransferTo(SpillInfo[], - * File)}, and it's mostly used in cases where the IO compression codec does not support - * concatenation of compressed data, when encryption is enabled, or when users have - * explicitly disabled use of {@code transferTo} in order to work around kernel bugs. + * ShuffleMapOutputWriter)}, and it's mostly used in cases where the IO compression codec + * does not support concatenation of compressed data, when encryption is enabled, or when + * users have explicitly disabled use of {@code transferTo} in order to work around kernel bugs. * This code path might also be faster in cases where individual partition size in a spill * is small and UnsafeShuffleWriter#mergeSpillsWithTransferTo method performs many small * disk ios which is inefficient. In those case, Using large buffers for input and output * files helps reducing the number of disk ios, making the file merging faster. * * @param spills the spills to merge. - * @param outputFile the file to write the merged data to. + * @param mapWriter the map output writer to use for output. * @param compressionCodec the IO compression codec, or null if shuffle compression is disabled. * @return the partition lengths in the merged file. */ private long[] mergeSpillsWithFileStream( SpillInfo[] spills, - File outputFile, + ShuffleMapOutputWriter mapWriter, @Nullable CompressionCodec compressionCodec) throws IOException { - assert (spills.length >= 2); final int numPartitions = partitioner.numPartitions(); final long[] partitionLengths = new long[numPartitions]; final InputStream[] spillInputStreams = new InputStream[spills.length]; - final OutputStream bos = new BufferedOutputStream( - new FileOutputStream(outputFile), - outputBufferSizeInBytes); - // Use a counting output stream to avoid having to close the underlying file and ask - // the file system for its size after each partition is written. - final CountingOutputStream mergedFileOutputStream = new CountingOutputStream(bos); - boolean threwException = true; try { for (int i = 0; i < spills.length; i++) { spillInputStreams[i] = new NioBufferedFileInputStream( - spills[i].file, - inputBufferSizeInBytes); + spills[i].file, + inputBufferSizeInBytes); } for (int partition = 0; partition < numPartitions; partition++) { - final long initialFileLength = mergedFileOutputStream.getByteCount(); - // Shield the underlying output stream from close() and flush() calls, so that we can close - // the higher level streams to make sure all data is really flushed and internal state is - // cleaned. - OutputStream partitionOutput = new CloseAndFlushShieldOutputStream( - new TimeTrackingOutputStream(writeMetrics, mergedFileOutputStream)); - partitionOutput = blockManager.serializerManager().wrapForEncryption(partitionOutput); - if (compressionCodec != null) { - partitionOutput = compressionCodec.compressedOutputStream(partitionOutput); - } - for (int i = 0; i < spills.length; i++) { - final long partitionLengthInSpill = spills[i].partitionLengths[partition]; - if (partitionLengthInSpill > 0) { - InputStream partitionInputStream = new LimitedInputStream(spillInputStreams[i], - partitionLengthInSpill, false); - try { - partitionInputStream = blockManager.serializerManager().wrapForEncryption( - partitionInputStream); - if (compressionCodec != null) { - partitionInputStream = compressionCodec.compressedInputStream(partitionInputStream); + boolean copyThrewExecption = true; + ShufflePartitionWriter writer = mapWriter.getPartitionWriter(partition); + OutputStream partitionOutput = writer.openStream(); + try { + // Shield the underlying output stream from close() calls, so that we can close the + // higher level streams to make sure all data is really flushed and internal state + // is cleaned + partitionOutput = blockManager.serializerManager().wrapForEncryption(partitionOutput); + if (compressionCodec != null) { + partitionOutput = compressionCodec.compressedOutputStream(partitionOutput); + } + for (int i = 0; i < spills.length; i++) { + final long partitionLengthInSpill = spills[i].partitionLengths[partition]; + + if (partitionLengthInSpill > 0) { + InputStream partitionInputStream = null; + boolean copySpillThrewException = true; + try { + partitionInputStream = new LimitedInputStream(spillInputStreams[i], + partitionLengthInSpill, false); + partitionInputStream = blockManager.serializerManager().wrapForEncryption( + partitionInputStream); + if (compressionCodec != null) { + partitionInputStream = compressionCodec.compressedInputStream( + partitionInputStream); + } + ByteStreams.copy(partitionInputStream, partitionOutput); + copySpillThrewException = false; + } finally { + Closeables.close(partitionInputStream, copySpillThrewException); } - ByteStreams.copy(partitionInputStream, partitionOutput); - } finally { - partitionInputStream.close(); } } + copyThrewExecption = false; + } finally { + Closeables.close(partitionOutput, copyThrewExecption); } - partitionOutput.flush(); - partitionOutput.close(); - partitionLengths[partition] = (mergedFileOutputStream.getByteCount() - initialFileLength); + long numBytesWritten = writer.getNumBytesWritten(); + partitionLengths[partition] = numBytesWritten; + writeMetrics.incBytesWritten(numBytesWritten); } threwException = false; } finally { @@ -420,7 +406,6 @@ private long[] mergeSpillsWithFileStream( for (InputStream stream : spillInputStreams) { Closeables.close(stream, threwException); } - Closeables.close(mergedFileOutputStream, threwException); } return partitionLengths; } @@ -430,54 +415,49 @@ private long[] mergeSpillsWithFileStream( * This is only safe when the IO compression codec and serializer support concatenation of * serialized streams. * + * @param spills the spills to merge. + * @param mapWriter the map output writer to use for output. * @return the partition lengths in the merged file. */ - private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) throws IOException { - assert (spills.length >= 2); + private long[] mergeSpillsWithTransferTo( + SpillInfo[] spills, + ShuffleMapOutputWriter mapWriter) throws IOException { final int numPartitions = partitioner.numPartitions(); final long[] partitionLengths = new long[numPartitions]; final FileChannel[] spillInputChannels = new FileChannel[spills.length]; final long[] spillInputChannelPositions = new long[spills.length]; - FileChannel mergedFileOutputChannel = null; boolean threwException = true; try { for (int i = 0; i < spills.length; i++) { spillInputChannels[i] = new FileInputStream(spills[i].file).getChannel(); } - // This file needs to opened in append mode in order to work around a Linux kernel bug that - // affects transferTo; see SPARK-3948 for more details. - mergedFileOutputChannel = new FileOutputStream(outputFile, true).getChannel(); - - long bytesWrittenToMergedFile = 0; for (int partition = 0; partition < numPartitions; partition++) { - for (int i = 0; i < spills.length; i++) { - final long partitionLengthInSpill = spills[i].partitionLengths[partition]; - final FileChannel spillInputChannel = spillInputChannels[i]; - final long writeStartTime = System.nanoTime(); - Utils.copyFileStreamNIO( - spillInputChannel, - mergedFileOutputChannel, - spillInputChannelPositions[i], - partitionLengthInSpill); - spillInputChannelPositions[i] += partitionLengthInSpill; - writeMetrics.incWriteTime(System.nanoTime() - writeStartTime); - bytesWrittenToMergedFile += partitionLengthInSpill; - partitionLengths[partition] += partitionLengthInSpill; + boolean copyThrewExecption = true; + ShufflePartitionWriter writer = mapWriter.getPartitionWriter(partition); + WritableByteChannelWrapper resolvedChannel = writer.openChannelWrapper() + .orElseGet(() -> new StreamFallbackChannelWrapper(openStreamUnchecked(writer))); + try { + for (int i = 0; i < spills.length; i++) { + long partitionLengthInSpill = 0L; + partitionLengthInSpill += spills[i].partitionLengths[partition]; + final FileChannel spillInputChannel = spillInputChannels[i]; + final long writeStartTime = System.nanoTime(); + Utils.copyFileStreamNIO( + spillInputChannel, + resolvedChannel.channel(), + spillInputChannelPositions[i], + partitionLengthInSpill); + copyThrewExecption = false; + spillInputChannelPositions[i] += partitionLengthInSpill; + writeMetrics.incWriteTime(System.nanoTime() - writeStartTime); + } + } finally { + Closeables.close(resolvedChannel, copyThrewExecption); } - } - // Check the position after transferTo loop to see if it is in the right position and raise an - // exception if it is incorrect. The position will not be increased to the expected length - // after calling transferTo in kernel version 2.6.32. This issue is described at - // https://bugs.openjdk.java.net/browse/JDK-7052359 and SPARK-3948. - if (mergedFileOutputChannel.position() != bytesWrittenToMergedFile) { - throw new IOException( - "Current position " + mergedFileOutputChannel.position() + " does not equal expected " + - "position " + bytesWrittenToMergedFile + " after transferTo. Please check your kernel" + - " version to see if it is 2.6.32, as there is a kernel bug which will lead to " + - "unexpected behavior when using transferTo. You can set spark.file.transferTo=false " + - "to disable this NIO feature." - ); + long numBytes = writer.getNumBytesWritten(); + partitionLengths[partition] = numBytes; + writeMetrics.incBytesWritten(numBytes); } threwException = false; } finally { @@ -487,7 +467,6 @@ private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) th assert(spillInputChannelPositions[i] == spills[i].file.length()); Closeables.close(spillInputChannels[i], threwException); } - Closeables.close(mergedFileOutputChannel, threwException); } return partitionLengths; } @@ -518,4 +497,30 @@ public Option stop(boolean success) { } } } + + private static OutputStream openStreamUnchecked(ShufflePartitionWriter writer) { + try { + return writer.openStream(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static final class StreamFallbackChannelWrapper implements WritableByteChannelWrapper { + private final WritableByteChannel channel; + + StreamFallbackChannelWrapper(OutputStream fallbackStream) { + this.channel = Channels.newChannel(fallbackStream); + } + + @Override + public WritableByteChannel channel() { + return channel; + } + + @Override + public void close() throws IOException { + channel.close(); + } + } } diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala index 17719f516a0a1..03314bd081fd7 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala @@ -140,13 +140,13 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] => new UnsafeShuffleWriter( env.blockManager, - shuffleBlockResolver, context.taskMemoryManager(), unsafeShuffleHandle, mapId, context, env.conf, - metrics) + metrics, + shuffleExecutorComponents) case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] => new BypassMergeSortShuffleWriter( env.blockManager, diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index 88125a6b93ade..21087ae2614fc 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -19,8 +19,11 @@ import java.io.*; import java.nio.ByteBuffer; +import java.nio.file.Files; import java.util.*; +import org.apache.spark.shuffle.sort.io.LocalDiskShuffleExecutorComponents; +import org.mockito.stubbing.Answer; import scala.Option; import scala.Product2; import scala.Tuple2; @@ -39,6 +42,7 @@ import org.apache.spark.ShuffleDependency; import org.apache.spark.SparkConf; import org.apache.spark.TaskContext; +import org.apache.spark.TaskContext$; import org.apache.spark.executor.ShuffleWriteMetrics; import org.apache.spark.executor.TaskMetrics; import org.apache.spark.io.CompressionCodec$; @@ -65,6 +69,7 @@ public class UnsafeShuffleWriterSuite { + static final int DEFAULT_INITIAL_SORT_BUFFER_SIZE = 4096; static final int NUM_PARTITITONS = 4; TestMemoryManager memoryManager; TaskMemoryManager taskMemoryManager; @@ -85,6 +90,7 @@ public class UnsafeShuffleWriterSuite { @After public void tearDown() { + TaskContext$.MODULE$.unset(); Utils.deleteRecursively(tempDir); final long leakedMemory = taskMemoryManager.cleanUpAllAllocatedMemory(); if (leakedMemory != 0) { @@ -132,14 +138,28 @@ public void setUp() throws IOException { }); when(shuffleBlockResolver.getDataFile(anyInt(), anyInt())).thenReturn(mergedOutputFile); - doAnswer(invocationOnMock -> { + + Answer renameTempAnswer = invocationOnMock -> { partitionSizesInMergedFile = (long[]) invocationOnMock.getArguments()[2]; File tmp = (File) invocationOnMock.getArguments()[3]; - mergedOutputFile.delete(); - tmp.renameTo(mergedOutputFile); + if (!mergedOutputFile.delete()) { + throw new RuntimeException("Failed to delete old merged output file."); + } + if (tmp != null) { + Files.move(tmp.toPath(), mergedOutputFile.toPath()); + } else if (!mergedOutputFile.createNewFile()) { + throw new RuntimeException("Failed to create empty merged output file."); + } return null; - }).when(shuffleBlockResolver) - .writeIndexFileAndCommit(anyInt(), anyInt(), any(long[].class), any(File.class)); + }; + + doAnswer(renameTempAnswer) + .when(shuffleBlockResolver) + .writeIndexFileAndCommit(anyInt(), anyInt(), any(long[].class), any(File.class)); + + doAnswer(renameTempAnswer) + .when(shuffleBlockResolver) + .writeIndexFileAndCommit(anyInt(), anyInt(), any(long[].class), eq(null)); when(diskBlockManager.createTempShuffleBlock()).thenAnswer(invocationOnMock -> { TempShuffleBlockId blockId = new TempShuffleBlockId(UUID.randomUUID()); @@ -151,21 +171,22 @@ public void setUp() throws IOException { when(taskContext.taskMetrics()).thenReturn(taskMetrics); when(shuffleDep.serializer()).thenReturn(serializer); when(shuffleDep.partitioner()).thenReturn(hashPartitioner); + when(taskContext.taskMemoryManager()).thenReturn(taskMemoryManager); + + TaskContext$.MODULE$.setTaskContext(taskContext); } - private UnsafeShuffleWriter createWriter( - boolean transferToEnabled) throws IOException { + private UnsafeShuffleWriter createWriter(boolean transferToEnabled) { conf.set("spark.file.transferTo", String.valueOf(transferToEnabled)); - return new UnsafeShuffleWriter<>( + return new UnsafeShuffleWriter( blockManager, - shuffleBlockResolver, - taskMemoryManager, + taskMemoryManager, new SerializedShuffleHandle<>(0, 1, shuffleDep), 0, // map id taskContext, conf, - taskContext.taskMetrics().shuffleWriteMetrics() - ); + taskContext.taskMetrics().shuffleWriteMetrics(), + new LocalDiskShuffleExecutorComponents(conf, blockManager, shuffleBlockResolver)); } private void assertSpillFilesWereCleanedUp() { @@ -444,10 +465,10 @@ public void writeEnoughRecordsToTriggerSortBufferExpansionAndSpillRadixOn() thro } private void writeEnoughRecordsToTriggerSortBufferExpansionAndSpill() throws Exception { - memoryManager.limit(UnsafeShuffleWriter.DEFAULT_INITIAL_SORT_BUFFER_SIZE * 16); + memoryManager.limit(DEFAULT_INITIAL_SORT_BUFFER_SIZE * 16); final UnsafeShuffleWriter writer = createWriter(false); final ArrayList> dataToWrite = new ArrayList<>(); - for (int i = 0; i < UnsafeShuffleWriter.DEFAULT_INITIAL_SORT_BUFFER_SIZE + 1; i++) { + for (int i = 0; i < DEFAULT_INITIAL_SORT_BUFFER_SIZE + 1; i++) { dataToWrite.add(new Tuple2<>(i, i)); } writer.write(dataToWrite.iterator()); @@ -516,16 +537,15 @@ public void testPeakMemoryUsed() throws Exception { final long numRecordsPerPage = pageSizeBytes / recordLengthBytes; taskMemoryManager = spy(taskMemoryManager); when(taskMemoryManager.pageSizeBytes()).thenReturn(pageSizeBytes); - final UnsafeShuffleWriter writer = - new UnsafeShuffleWriter<>( + final UnsafeShuffleWriter writer = new UnsafeShuffleWriter( blockManager, - shuffleBlockResolver, taskMemoryManager, new SerializedShuffleHandle<>(0, 1, shuffleDep), 0, // map id taskContext, conf, - taskContext.taskMetrics().shuffleWriteMetrics()); + taskContext.taskMetrics().shuffleWriteMetrics(), + new LocalDiskShuffleExecutorComponents(conf, blockManager, shuffleBlockResolver)); // Peak memory should be monotonically increasing. More specifically, every time // we allocate a new page it should increase by exactly the size of the page. From 0ee9311a6647c87f5a1a4f83b0a2557395679f47 Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 30 Jul 2019 17:22:31 -0700 Subject: [PATCH 02/16] Don't do task context things --- .../apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index 21087ae2614fc..a7a2e3f6186b6 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -90,7 +90,6 @@ public class UnsafeShuffleWriterSuite { @After public void tearDown() { - TaskContext$.MODULE$.unset(); Utils.deleteRecursively(tempDir); final long leakedMemory = taskMemoryManager.cleanUpAllAllocatedMemory(); if (leakedMemory != 0) { @@ -172,8 +171,6 @@ public void setUp() throws IOException { when(shuffleDep.serializer()).thenReturn(serializer); when(shuffleDep.partitioner()).thenReturn(hashPartitioner); when(taskContext.taskMemoryManager()).thenReturn(taskMemoryManager); - - TaskContext$.MODULE$.setTaskContext(taskContext); } private UnsafeShuffleWriter createWriter(boolean transferToEnabled) { From 1d08d80f10b38b1ae94ecc3a09e98b5153025249 Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 30 Jul 2019 17:26:27 -0700 Subject: [PATCH 03/16] Import sorting --- .../org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java | 4 ++-- .../apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index 92a33f6a7dfce..1c839c55b5ea9 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -24,8 +24,6 @@ import java.nio.channels.WritableByteChannel; import java.util.Iterator; -import org.apache.spark.shuffle.api.ShuffleExecutorComponents; -import org.apache.spark.shuffle.api.WritableByteChannelWrapper; import scala.Option; import scala.Product2; import scala.collection.JavaConverters; @@ -54,6 +52,8 @@ import org.apache.spark.serializer.SerializationStream; import org.apache.spark.serializer.SerializerInstance; import org.apache.spark.shuffle.ShuffleWriter; +import org.apache.spark.shuffle.api.ShuffleExecutorComponents; +import org.apache.spark.shuffle.api.WritableByteChannelWrapper; import org.apache.spark.storage.BlockManager; import org.apache.spark.unsafe.Platform; import org.apache.spark.util.Utils; diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index a7a2e3f6186b6..8a39d809f149c 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -22,7 +22,6 @@ import java.nio.file.Files; import java.util.*; -import org.apache.spark.shuffle.sort.io.LocalDiskShuffleExecutorComponents; import org.mockito.stubbing.Answer; import scala.Option; import scala.Product2; @@ -42,7 +41,6 @@ import org.apache.spark.ShuffleDependency; import org.apache.spark.SparkConf; import org.apache.spark.TaskContext; -import org.apache.spark.TaskContext$; import org.apache.spark.executor.ShuffleWriteMetrics; import org.apache.spark.executor.TaskMetrics; import org.apache.spark.io.CompressionCodec$; @@ -57,6 +55,7 @@ import org.apache.spark.security.CryptoStreamUtils; import org.apache.spark.serializer.*; import org.apache.spark.shuffle.IndexShuffleBlockResolver; +import org.apache.spark.shuffle.sort.io.LocalDiskShuffleExecutorComponents; import org.apache.spark.storage.*; import org.apache.spark.util.Utils; From a02448c6bd82a03061efe4fb28cf2eb1bcb6e9b8 Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 30 Jul 2019 17:46:19 -0700 Subject: [PATCH 04/16] Move more imports --- .../org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index 1c839c55b5ea9..1cfd886301d6e 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -38,8 +38,6 @@ import org.apache.spark.*; import org.apache.spark.annotation.Private; -import org.apache.spark.shuffle.api.ShuffleMapOutputWriter; -import org.apache.spark.shuffle.api.ShufflePartitionWriter; import org.apache.spark.internal.config.package$; import org.apache.spark.io.CompressionCodec; import org.apache.spark.io.CompressionCodec$; @@ -53,6 +51,8 @@ import org.apache.spark.serializer.SerializerInstance; import org.apache.spark.shuffle.ShuffleWriter; import org.apache.spark.shuffle.api.ShuffleExecutorComponents; +import org.apache.spark.shuffle.api.ShuffleMapOutputWriter; +import org.apache.spark.shuffle.api.ShufflePartitionWriter; import org.apache.spark.shuffle.api.WritableByteChannelWrapper; import org.apache.spark.storage.BlockManager; import org.apache.spark.unsafe.Platform; From 86b2c4f2ee3b9156fd587f89862ae209ccba3ab3 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 31 Jul 2019 11:12:13 -0700 Subject: [PATCH 05/16] Address comments --- .../shuffle/sort/UnsafeShuffleWriter.java | 3 --- .../io/LocalDiskShuffleMapOutputWriter.java | 21 +++++++++++++++++-- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index 1cfd886301d6e..9deb281a45ffe 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -362,9 +362,6 @@ private long[] mergeSpillsWithFileStream( ShufflePartitionWriter writer = mapWriter.getPartitionWriter(partition); OutputStream partitionOutput = writer.openStream(); try { - // Shield the underlying output stream from close() calls, so that we can close the - // higher level streams to make sure all data is really flushed and internal state - // is cleaned partitionOutput = blockManager.serializerManager().wrapForEncryption(partitionOutput); if (compressionCodec != null) { partitionOutput = compressionCodec.compressedOutputStream(partitionOutput); diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java index add4634a61fb5..7bf323ed849d7 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java @@ -24,8 +24,8 @@ import java.io.OutputStream; import java.nio.channels.FileChannel; import java.nio.channels.WritableByteChannel; - import java.util.Optional; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,6 +54,7 @@ public class LocalDiskShuffleMapOutputWriter implements ShuffleMapOutputWriter { private final int bufferSize; private int lastPartitionId = -1; private long currChannelPosition; + private long bytesWrittenToMergedFile = 0L; private final File outputFile; private File outputTempFile; @@ -94,9 +95,21 @@ public ShufflePartitionWriter getPartitionWriter(int reducePartitionId) throws I } return new LocalDiskShufflePartitionWriter(reducePartitionId); } - @Override public void commitAllPartitions() throws IOException { + // Check the position after transferTo loop to see if it is in the right position and raise a + // exception if it is incorrect. The position will not be increased to the expected length + // after calling transferTo in kernel version 2.6.32. This issue is described at + // https://bugs.openjdk.java.net/browse/JDK-7052359 and SPARK-3948. + if (outputFileChannel != null + && outputFileChannel.position() != bytesWrittenToMergedFile) { + throw new IOException( + "Current position " + outputFileChannel.position() + " does not equal expected " + + "position " + bytesWrittenToMergedFile + " after transferTo. Please check your " + + " kernel version to see if it is 2.6.32, as there is a kernel bug which will lead " + + "to unexpected behavior when using transferTo. You can set " + + "spark.file.transferTo=false to disable this NIO feature."); + } cleanUp(); File resolvedTmp = outputTempFile != null && outputTempFile.isFile() ? outputTempFile : null; blockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, resolvedTmp); @@ -124,6 +137,8 @@ private void cleanUp() throws IOException { private void initStream() throws IOException { if (outputFileStream == null) { + // This file needs to opened in append mode in order to work around a Linux kernel bug that + // affects transferTo; see SPARK-3948 for more details. outputFileStream = new FileOutputStream(outputTempFile, true); } if (outputBufferedFileStream == null) { @@ -226,6 +241,7 @@ public void write(byte[] buf, int pos, int length) throws IOException { public void close() { isClosed = true; partitionLengths[partitionId] = count; + bytesWrittenToMergedFile += count; } private void verifyNotClosed() { @@ -256,6 +272,7 @@ public WritableByteChannel channel() { @Override public void close() throws IOException { partitionLengths[partitionId] = getCount(); + bytesWrittenToMergedFile += partitionLengths[partitionId]; } } } From 91d7062761cd4843b2eadd7ded4ba0976ce0f516 Mon Sep 17 00:00:00 2001 From: mcheah Date: Fri, 2 Aug 2019 16:02:50 -0700 Subject: [PATCH 06/16] Put back optimization on single spill file --- .../api/ShuffleExecutorComponents.java | 31 +++- .../api/SingleFileShuffleMapOutputWriter.java | 36 +++++ .../shuffle/sort/UnsafeShuffleWriter.java | 144 ++++++++++-------- .../LocalDiskShuffleExecutorComponents.java | 14 ++ .../io/LocalDiskShuffleMapOutputWriter.java | 1 + .../LocalDiskSingleFileMapOutputWriter.java | 52 +++++++ 6 files changed, 213 insertions(+), 65 deletions(-) create mode 100644 core/src/main/java/org/apache/spark/shuffle/api/SingleFileShuffleMapOutputWriter.java create mode 100644 core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskSingleFileMapOutputWriter.java diff --git a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleExecutorComponents.java b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleExecutorComponents.java index 70c112b78911d..ddac85f074ef2 100644 --- a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleExecutorComponents.java +++ b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleExecutorComponents.java @@ -19,6 +19,7 @@ import java.io.IOException; +import java.util.Optional; import org.apache.spark.annotation.Private; /** @@ -39,17 +40,39 @@ public interface ShuffleExecutorComponents { /** * Called once per map task to create a writer that will be responsible for persisting all the * partitioned bytes written by that map task. - * @param shuffleId Unique identifier for the shuffle the map task is a part of + * + * @param shuffleId Unique identifier for the shuffle the map task is a part of * @param mapId Within the shuffle, the identifier of the map task * @param mapTaskAttemptId Identifier of the task attempt. Multiple attempts of the same map task - * with the same (shuffleId, mapId) pair can be distinguished by the - * different values of mapTaskAttemptId. + * with the same (shuffleId, mapId) pair can be distinguished by the + * different values of mapTaskAttemptId. * @param numPartitions The number of partitions that will be written by the map task. Some of -* these partitions may be empty. + * these partitions may be empty. */ ShuffleMapOutputWriter createMapOutputWriter( int shuffleId, int mapId, long mapTaskAttemptId, int numPartitions) throws IOException; + + /** + * An optional extension for creating a map output writer that can optimize the transfer of a + * single partition file, as the entire result of a map task, to the backing store. + *

+ * Most implementations should return the default {@link Optional#empty()} to indicate that + * they do not support this optimization. This primarily is for backwards-compatibility in + * preserving an optimization in the local disk shuffle storage implementation. + * + * @param shuffleId Unique identifier for the shuffle the map task is a part of + * @param mapId Within the shuffle, the identifier of the map task + * @param mapTaskAttemptId Identifier of the task attempt. Multiple attempts of the same map task + * with the same (shuffleId, mapId) pair can be distinguished by the + * different values of mapTaskAttemptId. + */ + default Optional createSingleFileMapOutputWriter( + int shuffleId, + int mapId, + long mapTaskAttemptId) throws IOException { + return Optional.empty(); + } } diff --git a/core/src/main/java/org/apache/spark/shuffle/api/SingleFileShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/api/SingleFileShuffleMapOutputWriter.java new file mode 100644 index 0000000000000..d57db921afef6 --- /dev/null +++ b/core/src/main/java/org/apache/spark/shuffle/api/SingleFileShuffleMapOutputWriter.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.api; + +import java.io.File; + +import java.io.IOException; +import org.apache.spark.annotation.Private; + +/** + * Optional extension for partition writing that is optimized for transferring a single + * file to the backing store. + */ +@Private +public interface SingleFileShuffleMapOutputWriter { + + /** + * Transfer a file that contains the bytes of all the splits written by this map task. + */ + void transferMapOutputFile(File mapOutputFile, long[] partitionLengths) throws IOException; +} diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index 9deb281a45ffe..81a8dbd5fc9c1 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -18,6 +18,7 @@ package org.apache.spark.shuffle.sort; import java.nio.channels.Channels; +import java.util.Optional; import javax.annotation.Nullable; import java.io.*; import java.nio.channels.FileChannel; @@ -53,6 +54,7 @@ import org.apache.spark.shuffle.api.ShuffleExecutorComponents; import org.apache.spark.shuffle.api.ShuffleMapOutputWriter; import org.apache.spark.shuffle.api.ShufflePartitionWriter; +import org.apache.spark.shuffle.api.SingleFileShuffleMapOutputWriter; import org.apache.spark.shuffle.api.WritableByteChannelWrapper; import org.apache.spark.storage.BlockManager; import org.apache.spark.unsafe.Platform; @@ -215,31 +217,15 @@ void closeAndWriteOutput() throws IOException { serOutputStream = null; final SpillInfo[] spills = sorter.closeAndGetSpills(); sorter = null; - final ShuffleMapOutputWriter mapWriter = shuffleExecutorComponents - .createMapOutputWriter( - shuffleId, - mapId, - taskContext.taskAttemptId(), - partitioner.numPartitions()); final long[] partitionLengths; try { - try { - partitionLengths = mergeSpills(spills, mapWriter); - } finally { - for (SpillInfo spill : spills) { - if (spill.file.exists() && !spill.file.delete()) { - logger.error("Error while deleting spill file {}", spill.file.getPath()); - } + partitionLengths = mergeSpills(spills); + } finally { + for (SpillInfo spill : spills) { + if (spill.file.exists() && !spill.file.delete()) { + logger.error("Error while deleting spill file {}", spill.file.getPath()); } } - mapWriter.commitAllPartitions(); - } catch (Exception e) { - try { - mapWriter.abort(e); - } catch (Exception innerE) { - logger.error("Failed to abort the Map Output Writer", innerE); - } - throw e; } mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); } @@ -273,57 +259,93 @@ void forceSorterToSpill() throws IOException { * * @return the partition lengths in the merged file. */ - private long[] mergeSpills(SpillInfo[] spills, - ShuffleMapOutputWriter mapWriter) throws IOException { + private long[] mergeSpills(SpillInfo[] spills) throws IOException { + long[] partitionLengths; + if (spills.length == 0) { + final ShuffleMapOutputWriter mapWriter = shuffleExecutorComponents + .createMapOutputWriter( + shuffleId, + mapId, + taskContext.taskAttemptId(), + partitioner.numPartitions()); + mapWriter.commitAllPartitions(); + return new long[partitioner.numPartitions()]; + } else if (spills.length == 1) { + Optional maybeSingleFileWriter = + shuffleExecutorComponents.createSingleFileMapOutputWriter( + shuffleId, mapId, taskContext.taskAttemptId()); + if (maybeSingleFileWriter.isPresent()) { + // Here, we don't need to perform any metrics updates because the bytes written to this + // output file would have already been counted as shuffle bytes written. + partitionLengths = spills[0].partitionLengths; + maybeSingleFileWriter.get().transferMapOutputFile(spills[0].file, partitionLengths); + } else { + partitionLengths = mergeSpillsUsingStandardWriter(spills); + } + } else { + partitionLengths = mergeSpillsUsingStandardWriter(spills); + } + return partitionLengths; + } + + private long[] mergeSpillsUsingStandardWriter(SpillInfo[] spills) throws IOException { + long[] partitionLengths; final boolean compressionEnabled = (boolean) sparkConf.get(package$.MODULE$.SHUFFLE_COMPRESS()); final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.createCodec(sparkConf); final boolean fastMergeEnabled = - (boolean) sparkConf.get(package$.MODULE$.SHUFFLE_UNDAFE_FAST_MERGE_ENABLE()); + (boolean) sparkConf.get(package$.MODULE$.SHUFFLE_UNDAFE_FAST_MERGE_ENABLE()); final boolean fastMergeIsSupported = !compressionEnabled || - CompressionCodec$.MODULE$.supportsConcatenationOfSerializedStreams(compressionCodec); + CompressionCodec$.MODULE$.supportsConcatenationOfSerializedStreams(compressionCodec); final boolean encryptionEnabled = blockManager.serializerManager().encryptionEnabled(); - final int numPartitions = partitioner.numPartitions(); - long[] partitionLengths = new long[numPartitions]; + final ShuffleMapOutputWriter mapWriter = shuffleExecutorComponents + .createMapOutputWriter( + shuffleId, + mapId, + taskContext.taskAttemptId(), + partitioner.numPartitions()); try { - if (spills.length == 0) { - return partitionLengths; - } else { - // There are multiple spills to merge, so none of these spill files' lengths were counted - // towards our shuffle write count or shuffle write time. If we use the slow merge path, - // then the final output file's size won't necessarily be equal to the sum of the spill - // files' sizes. To guard against this case, we look at the output file's actual size when - // computing shuffle bytes written. - // - // We allow the individual merge methods to report their own IO times since different merge - // strategies use different IO techniques. We count IO during merge towards the shuffle - // shuffle write time, which appears to be consistent with the "not bypassing merge-sort" - // branch in ExternalSorter. - if (fastMergeEnabled && fastMergeIsSupported) { - // Compression is disabled or we are using an IO compression codec that supports - // decompression of concatenated compressed streams, so we can perform a fast spill merge - // that doesn't need to interpret the spilled bytes. - if (transferToEnabled && !encryptionEnabled) { - logger.debug("Using transferTo-based fast merge"); - partitionLengths = mergeSpillsWithTransferTo(spills, mapWriter); - } else { - logger.debug("Using fileStream-based fast merge"); - partitionLengths = mergeSpillsWithFileStream(spills, mapWriter, null); - } + // There are multiple spills to merge, so none of these spill files' lengths were counted + // towards our shuffle write count or shuffle write time. If we use the slow merge path, + // then the final output file's size won't necessarily be equal to the sum of the spill + // files' sizes. To guard against this case, we look at the output file's actual size when + // computing shuffle bytes written. + // + // We allow the individual merge methods to report their own IO times since different merge + // strategies use different IO techniques. We count IO during merge towards the shuffle + // shuffle write time, which appears to be consistent with the "not bypassing merge-sort" + // branch in ExternalSorter. + if (fastMergeEnabled && fastMergeIsSupported) { + // Compression is disabled or we are using an IO compression codec that supports + // decompression of concatenated compressed streams, so we can perform a fast spill merge + // that doesn't need to interpret the spilled bytes. + if (transferToEnabled && !encryptionEnabled) { + logger.debug("Using transferTo-based fast merge"); + partitionLengths = mergeSpillsWithTransferTo(spills, mapWriter); } else { - logger.debug("Using slow merge"); - partitionLengths = mergeSpillsWithFileStream(spills, mapWriter, compressionCodec); + logger.debug("Using fileStream-based fast merge"); + partitionLengths = mergeSpillsWithFileStream(spills, mapWriter, null); } - // When closing an UnsafeShuffleExternalSorter that has already spilled once but also has - // in-memory records, we write out the in-memory records to a file but do not count that - // final write as bytes spilled (instead, it's accounted as shuffle write). The merge needs - // to be counted as shuffle write, but this will lead to double-counting of the final - // SpillInfo's bytes. - writeMetrics.decBytesWritten(spills[spills.length - 1].file.length()); - return partitionLengths; + } else { + logger.debug("Using slow merge"); + partitionLengths = mergeSpillsWithFileStream(spills, mapWriter, compressionCodec); + } + // When closing an UnsafeShuffleExternalSorter that has already spilled once but also has + // in-memory records, we write out the in-memory records to a file but do not count that + // final write as bytes spilled (instead, it's accounted as shuffle write). The merge needs + // to be counted as shuffle write, but this will lead to double-counting of the final + // SpillInfo's bytes. + writeMetrics.decBytesWritten(spills[spills.length - 1].file.length()); + mapWriter.commitAllPartitions(); + } catch (Exception e) { + try { + mapWriter.abort(e); + } catch (Exception e2) { + logger.warn("Failed to abort writing the map output.", e2); + e.addSuppressed(e2); } - } catch (IOException e) { throw e; } + return partitionLengths; } /** diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java index 02eb710737285..0fadb1fcb7837 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java @@ -19,11 +19,13 @@ import com.google.common.annotations.VisibleForTesting; +import java.util.Optional; import org.apache.spark.SparkConf; import org.apache.spark.SparkEnv; import org.apache.spark.shuffle.api.ShuffleExecutorComponents; import org.apache.spark.shuffle.api.ShuffleMapOutputWriter; import org.apache.spark.shuffle.IndexShuffleBlockResolver; +import org.apache.spark.shuffle.api.SingleFileShuffleMapOutputWriter; import org.apache.spark.storage.BlockManager; public class LocalDiskShuffleExecutorComponents implements ShuffleExecutorComponents { @@ -68,4 +70,16 @@ public ShuffleMapOutputWriter createMapOutputWriter( return new LocalDiskShuffleMapOutputWriter( shuffleId, mapId, numPartitions, blockResolver, sparkConf); } + + @Override + public Optional createSingleFileMapOutputWriter( + int shuffleId, + int mapId, + long mapTaskAttemptId) { + if (blockResolver == null) { + throw new IllegalStateException( + "Executor components must be initialized before getting writers."); + } + return Optional.of(new LocalDiskSingleFileMapOutputWriter(shuffleId, mapId, blockResolver)); + } } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java index 7bf323ed849d7..8d741cca88e5a 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java @@ -95,6 +95,7 @@ public ShufflePartitionWriter getPartitionWriter(int reducePartitionId) throws I } return new LocalDiskShufflePartitionWriter(reducePartitionId); } + @Override public void commitAllPartitions() throws IOException { // Check the position after transferTo loop to see if it is in the right position and raise a diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskSingleFileMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskSingleFileMapOutputWriter.java new file mode 100644 index 0000000000000..ba6d87245806c --- /dev/null +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskSingleFileMapOutputWriter.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.sort.io; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import org.apache.spark.shuffle.IndexShuffleBlockResolver; +import org.apache.spark.shuffle.api.SingleFileShuffleMapOutputWriter; +import org.apache.spark.util.Utils; + +public class LocalDiskSingleFileMapOutputWriter + implements SingleFileShuffleMapOutputWriter { + + private final int shuffleId; + private final int mapId; + private final IndexShuffleBlockResolver blockResolver; + + public LocalDiskSingleFileMapOutputWriter( + int shuffleId, + int mapId, + IndexShuffleBlockResolver blockResolver) { + this.shuffleId = shuffleId; + this.mapId = mapId; + this.blockResolver = blockResolver; + } + + @Override + public void transferMapOutputFile( + File mapOutputFile, + long[] partitionLengths) throws IOException { + File outputFile = blockResolver.getDataFile(shuffleId, mapId); + File tempFile = Utils.tempFileWith(outputFile); + Files.move(mapOutputFile.toPath(), tempFile.toPath()); + blockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tempFile); + } +} From fc798b7fbbe921470fc4425ac4b913ae14b48e46 Mon Sep 17 00:00:00 2001 From: mcheah Date: Fri, 2 Aug 2019 16:05:40 -0700 Subject: [PATCH 07/16] Address other comments --- .../sort/io/LocalDiskShuffleExecutorComponents.java | 3 ++- .../shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java | 9 +++------ 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java index 0fadb1fcb7837..316ec4a817916 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java @@ -17,9 +17,10 @@ package org.apache.spark.shuffle.sort.io; +import java.util.Optional; + import com.google.common.annotations.VisibleForTesting; -import java.util.Optional; import org.apache.spark.SparkConf; import org.apache.spark.SparkEnv; import org.apache.spark.shuffle.api.ShuffleExecutorComponents; diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java index 8d741cca88e5a..db8dadbcd2194 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java @@ -138,8 +138,6 @@ private void cleanUp() throws IOException { private void initStream() throws IOException { if (outputFileStream == null) { - // This file needs to opened in append mode in order to work around a Linux kernel bug that - // affects transferTo; see SPARK-3948 for more details. outputFileStream = new FileOutputStream(outputTempFile, true); } if (outputBufferedFileStream == null) { @@ -148,11 +146,10 @@ private void initStream() throws IOException { } private void initChannel() throws IOException { - if (outputFileStream == null) { - outputFileStream = new FileOutputStream(outputTempFile, true); - } + // This file needs to opened in append mode in order to work around a Linux kernel bug that + // affects transferTo; see SPARK-3948 for more details. if (outputFileChannel == null) { - outputFileChannel = outputFileStream.getChannel(); + outputFileChannel = new FileOutputStream(outputTempFile, true).getChannel(); } } From 48335fa835d1b1a70bdb6a79107d0dd65dd61497 Mon Sep 17 00:00:00 2001 From: mcheah Date: Fri, 16 Aug 2019 15:06:28 -0700 Subject: [PATCH 08/16] Address comments --- .../shuffle/api/ShuffleExecutorComponents.java | 2 +- ...va => SingleSpillShuffleMapOutputWriter.java} | 8 ++++---- .../spark/shuffle/sort/UnsafeShuffleWriter.java | 6 +++--- .../io/LocalDiskShuffleExecutorComponents.java | 6 +++--- ... => LocalDiskSingleSpillMapOutputWriter.java} | 16 +++++++++------- 5 files changed, 20 insertions(+), 18 deletions(-) rename core/src/main/java/org/apache/spark/shuffle/api/{SingleFileShuffleMapOutputWriter.java => SingleSpillShuffleMapOutputWriter.java} (81%) rename core/src/main/java/org/apache/spark/shuffle/sort/io/{LocalDiskSingleFileMapOutputWriter.java => LocalDiskSingleSpillMapOutputWriter.java} (75%) diff --git a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleExecutorComponents.java b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleExecutorComponents.java index ddac85f074ef2..f8df49b91380f 100644 --- a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleExecutorComponents.java +++ b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleExecutorComponents.java @@ -69,7 +69,7 @@ ShuffleMapOutputWriter createMapOutputWriter( * with the same (shuffleId, mapId) pair can be distinguished by the * different values of mapTaskAttemptId. */ - default Optional createSingleFileMapOutputWriter( + default Optional createSingleFileMapOutputWriter( int shuffleId, int mapId, long mapTaskAttemptId) throws IOException { diff --git a/core/src/main/java/org/apache/spark/shuffle/api/SingleFileShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/api/SingleSpillShuffleMapOutputWriter.java similarity index 81% rename from core/src/main/java/org/apache/spark/shuffle/api/SingleFileShuffleMapOutputWriter.java rename to core/src/main/java/org/apache/spark/shuffle/api/SingleSpillShuffleMapOutputWriter.java index d57db921afef6..cad8dcfda52bc 100644 --- a/core/src/main/java/org/apache/spark/shuffle/api/SingleFileShuffleMapOutputWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/api/SingleSpillShuffleMapOutputWriter.java @@ -18,8 +18,8 @@ package org.apache.spark.shuffle.api; import java.io.File; - import java.io.IOException; + import org.apache.spark.annotation.Private; /** @@ -27,10 +27,10 @@ * file to the backing store. */ @Private -public interface SingleFileShuffleMapOutputWriter { +public interface SingleSpillShuffleMapOutputWriter { /** - * Transfer a file that contains the bytes of all the splits written by this map task. + * Transfer a file that contains the bytes of all the partitions written by this map task. */ - void transferMapOutputFile(File mapOutputFile, long[] partitionLengths) throws IOException; + void transferMapSpillFile(File mapOutputFile, long[] partitionLengths) throws IOException; } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index 81a8dbd5fc9c1..527a895ba97ec 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -54,7 +54,7 @@ import org.apache.spark.shuffle.api.ShuffleExecutorComponents; import org.apache.spark.shuffle.api.ShuffleMapOutputWriter; import org.apache.spark.shuffle.api.ShufflePartitionWriter; -import org.apache.spark.shuffle.api.SingleFileShuffleMapOutputWriter; +import org.apache.spark.shuffle.api.SingleSpillShuffleMapOutputWriter; import org.apache.spark.shuffle.api.WritableByteChannelWrapper; import org.apache.spark.storage.BlockManager; import org.apache.spark.unsafe.Platform; @@ -271,14 +271,14 @@ private long[] mergeSpills(SpillInfo[] spills) throws IOException { mapWriter.commitAllPartitions(); return new long[partitioner.numPartitions()]; } else if (spills.length == 1) { - Optional maybeSingleFileWriter = + Optional maybeSingleFileWriter = shuffleExecutorComponents.createSingleFileMapOutputWriter( shuffleId, mapId, taskContext.taskAttemptId()); if (maybeSingleFileWriter.isPresent()) { // Here, we don't need to perform any metrics updates because the bytes written to this // output file would have already been counted as shuffle bytes written. partitionLengths = spills[0].partitionLengths; - maybeSingleFileWriter.get().transferMapOutputFile(spills[0].file, partitionLengths); + maybeSingleFileWriter.get().transferMapSpillFile(spills[0].file, partitionLengths); } else { partitionLengths = mergeSpillsUsingStandardWriter(spills); } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java index 316ec4a817916..47aa2e39fe29b 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java @@ -26,7 +26,7 @@ import org.apache.spark.shuffle.api.ShuffleExecutorComponents; import org.apache.spark.shuffle.api.ShuffleMapOutputWriter; import org.apache.spark.shuffle.IndexShuffleBlockResolver; -import org.apache.spark.shuffle.api.SingleFileShuffleMapOutputWriter; +import org.apache.spark.shuffle.api.SingleSpillShuffleMapOutputWriter; import org.apache.spark.storage.BlockManager; public class LocalDiskShuffleExecutorComponents implements ShuffleExecutorComponents { @@ -73,7 +73,7 @@ public ShuffleMapOutputWriter createMapOutputWriter( } @Override - public Optional createSingleFileMapOutputWriter( + public Optional createSingleFileMapOutputWriter( int shuffleId, int mapId, long mapTaskAttemptId) { @@ -81,6 +81,6 @@ public Optional createSingleFileMapOutputWrite throw new IllegalStateException( "Executor components must be initialized before getting writers."); } - return Optional.of(new LocalDiskSingleFileMapOutputWriter(shuffleId, mapId, blockResolver)); + return Optional.of(new LocalDiskSingleSpillMapOutputWriter(shuffleId, mapId, blockResolver)); } } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskSingleFileMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskSingleSpillMapOutputWriter.java similarity index 75% rename from core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskSingleFileMapOutputWriter.java rename to core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskSingleSpillMapOutputWriter.java index ba6d87245806c..18ef1dff25167 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskSingleFileMapOutputWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskSingleSpillMapOutputWriter.java @@ -21,17 +21,17 @@ import java.io.IOException; import java.nio.file.Files; import org.apache.spark.shuffle.IndexShuffleBlockResolver; -import org.apache.spark.shuffle.api.SingleFileShuffleMapOutputWriter; +import org.apache.spark.shuffle.api.SingleSpillShuffleMapOutputWriter; import org.apache.spark.util.Utils; -public class LocalDiskSingleFileMapOutputWriter - implements SingleFileShuffleMapOutputWriter { +public class LocalDiskSingleSpillMapOutputWriter + implements SingleSpillShuffleMapOutputWriter { private final int shuffleId; private final int mapId; private final IndexShuffleBlockResolver blockResolver; - public LocalDiskSingleFileMapOutputWriter( + public LocalDiskSingleSpillMapOutputWriter( int shuffleId, int mapId, IndexShuffleBlockResolver blockResolver) { @@ -41,12 +41,14 @@ public LocalDiskSingleFileMapOutputWriter( } @Override - public void transferMapOutputFile( - File mapOutputFile, + public void transferMapSpillFile( + File mapSpillFile, long[] partitionLengths) throws IOException { + // The map spill file already has the proper format, and it contains all of the partition data. + // So just transfer it directly to the destination without any merging. File outputFile = blockResolver.getDataFile(shuffleId, mapId); File tempFile = Utils.tempFileWith(outputFile); - Files.move(mapOutputFile.toPath(), tempFile.toPath()); + Files.move(mapSpillFile.toPath(), tempFile.toPath()); blockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tempFile); } } From cf046dfa7608696feeda2a645e6fed55180f8cd7 Mon Sep 17 00:00:00 2001 From: mcheah Date: Fri, 16 Aug 2019 17:11:37 -0700 Subject: [PATCH 09/16] Fix typo --- .../org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index 527a895ba97ec..0522421279737 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -312,8 +312,8 @@ private long[] mergeSpillsUsingStandardWriter(SpillInfo[] spills) throws IOExcep // // We allow the individual merge methods to report their own IO times since different merge // strategies use different IO techniques. We count IO during merge towards the shuffle - // shuffle write time, which appears to be consistent with the "not bypassing merge-sort" - // branch in ExternalSorter. + // write time, which appears to be consistent with the "not bypassing merge-sort" branch in + // ExternalSorter. if (fastMergeEnabled && fastMergeIsSupported) { // Compression is disabled or we are using an IO compression codec that supports // decompression of concatenated compressed streams, so we can perform a fast spill merge From 97cd3e7e570ba842f7cb343c978ea614ebf2cdef Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 26 Aug 2019 15:33:39 -0700 Subject: [PATCH 10/16] Address comment --- .../org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index 0522421279737..406b8b8a7a111 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -458,8 +458,7 @@ private long[] mergeSpillsWithTransferTo( .orElseGet(() -> new StreamFallbackChannelWrapper(openStreamUnchecked(writer))); try { for (int i = 0; i < spills.length; i++) { - long partitionLengthInSpill = 0L; - partitionLengthInSpill += spills[i].partitionLengths[partition]; + long partitionLengthInSpill = spills[i].partitionLengths[partition]; final FileChannel spillInputChannel = spillInputChannels[i]; final long writeStartTime = System.nanoTime(); Utils.copyFileStreamNIO( From 0cb1a4433f737381fa557e6015539b533f8ed91d Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 26 Aug 2019 15:36:48 -0700 Subject: [PATCH 11/16] Fix merge conflicts --- .../shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java index 4847253caf07e..3463bd6f1775b 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java @@ -97,8 +97,7 @@ public ShufflePartitionWriter getPartitionWriter(int reducePartitionId) throws I } @Override -<<<<<<< HEAD - public void commitAllPartitions() throws IOException { + public long[] commitAllPartitions() throws IOException { // Check the position after transferTo loop to see if it is in the right position and raise a // exception if it is incorrect. The position will not be increased to the expected length // after calling transferTo in kernel version 2.6.32. This issue is described at @@ -112,11 +111,6 @@ public void commitAllPartitions() throws IOException { "to unexpected behavior when using transferTo. You can set " + "spark.file.transferTo=false to disable this NIO feature."); } -||||||| merged common ancestors - public void commitAllPartitions() throws IOException { -======= - public long[] commitAllPartitions() throws IOException { ->>>>>>> origin/master cleanUp(); File resolvedTmp = outputTempFile != null && outputTempFile.isFile() ? outputTempFile : null; blockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, resolvedTmp); From 41ceaea9d40c297c34ec7753b19b69bf623dd976 Mon Sep 17 00:00:00 2001 From: mcheah Date: Fri, 30 Aug 2019 15:59:56 -0700 Subject: [PATCH 12/16] Don't store partition lengths twice --- .../spark/shuffle/sort/UnsafeShuffleWriter.java | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index 406b8b8a7a111..ed286b3d3abfb 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -320,14 +320,14 @@ private long[] mergeSpillsUsingStandardWriter(SpillInfo[] spills) throws IOExcep // that doesn't need to interpret the spilled bytes. if (transferToEnabled && !encryptionEnabled) { logger.debug("Using transferTo-based fast merge"); - partitionLengths = mergeSpillsWithTransferTo(spills, mapWriter); + mergeSpillsWithTransferTo(spills, mapWriter); } else { logger.debug("Using fileStream-based fast merge"); - partitionLengths = mergeSpillsWithFileStream(spills, mapWriter, null); + mergeSpillsWithFileStream(spills, mapWriter, null); } } else { logger.debug("Using slow merge"); - partitionLengths = mergeSpillsWithFileStream(spills, mapWriter, compressionCodec); + mergeSpillsWithFileStream(spills, mapWriter, compressionCodec); } // When closing an UnsafeShuffleExternalSorter that has already spilled once but also has // in-memory records, we write out the in-memory records to a file but do not count that @@ -335,7 +335,7 @@ private long[] mergeSpillsUsingStandardWriter(SpillInfo[] spills) throws IOExcep // to be counted as shuffle write, but this will lead to double-counting of the final // SpillInfo's bytes. writeMetrics.decBytesWritten(spills[spills.length - 1].file.length()); - mapWriter.commitAllPartitions(); + partitionLengths = mapWriter.commitAllPartitions(); } catch (Exception e) { try { mapWriter.abort(e); @@ -364,12 +364,11 @@ private long[] mergeSpillsUsingStandardWriter(SpillInfo[] spills) throws IOExcep * @param compressionCodec the IO compression codec, or null if shuffle compression is disabled. * @return the partition lengths in the merged file. */ - private long[] mergeSpillsWithFileStream( + private void mergeSpillsWithFileStream( SpillInfo[] spills, ShuffleMapOutputWriter mapWriter, @Nullable CompressionCodec compressionCodec) throws IOException { final int numPartitions = partitioner.numPartitions(); - final long[] partitionLengths = new long[numPartitions]; final InputStream[] spillInputStreams = new InputStream[spills.length]; boolean threwException = true; @@ -415,7 +414,6 @@ private long[] mergeSpillsWithFileStream( Closeables.close(partitionOutput, copyThrewExecption); } long numBytesWritten = writer.getNumBytesWritten(); - partitionLengths[partition] = numBytesWritten; writeMetrics.incBytesWritten(numBytesWritten); } threwException = false; @@ -426,7 +424,6 @@ private long[] mergeSpillsWithFileStream( Closeables.close(stream, threwException); } } - return partitionLengths; } /** @@ -438,11 +435,10 @@ private long[] mergeSpillsWithFileStream( * @param mapWriter the map output writer to use for output. * @return the partition lengths in the merged file. */ - private long[] mergeSpillsWithTransferTo( + private void mergeSpillsWithTransferTo( SpillInfo[] spills, ShuffleMapOutputWriter mapWriter) throws IOException { final int numPartitions = partitioner.numPartitions(); - final long[] partitionLengths = new long[numPartitions]; final FileChannel[] spillInputChannels = new FileChannel[spills.length]; final long[] spillInputChannelPositions = new long[spills.length]; From c125a1443351cefc57f81c9c24eea20acd0c87d3 Mon Sep 17 00:00:00 2001 From: mcheah Date: Fri, 30 Aug 2019 16:02:42 -0700 Subject: [PATCH 13/16] Fix build --- .../java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index ed286b3d3abfb..a94e8af56c96f 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -470,7 +470,6 @@ private void mergeSpillsWithTransferTo( Closeables.close(resolvedChannel, copyThrewExecption); } long numBytes = writer.getNumBytesWritten(); - partitionLengths[partition] = numBytes; writeMetrics.incBytesWritten(numBytes); } threwException = false; @@ -482,7 +481,6 @@ private void mergeSpillsWithTransferTo( Closeables.close(spillInputChannels[i], threwException); } } - return partitionLengths; } @Override From f8f95f335f46f842cd5a41fa8cb53a1f16ef7141 Mon Sep 17 00:00:00 2001 From: mcheah Date: Fri, 6 Sep 2019 17:57:05 -0700 Subject: [PATCH 14/16] Address comments --- .../spark/shuffle/sort/UnsafeShuffleWriter.java | 17 ++++++++--------- .../io/LocalDiskShuffleMapOutputWriter.java | 3 +-- .../shuffle/sort/UnsafeShuffleWriterSuite.java | 2 +- 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index a94e8af56c96f..c18df43e5e746 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -268,8 +268,7 @@ private long[] mergeSpills(SpillInfo[] spills) throws IOException { mapId, taskContext.taskAttemptId(), partitioner.numPartitions()); - mapWriter.commitAllPartitions(); - return new long[partitioner.numPartitions()]; + return mapWriter.commitAllPartitions(); } else if (spills.length == 1) { Optional maybeSingleFileWriter = shuffleExecutorComponents.createSingleFileMapOutputWriter( @@ -379,7 +378,7 @@ private void mergeSpillsWithFileStream( inputBufferSizeInBytes); } for (int partition = 0; partition < numPartitions; partition++) { - boolean copyThrewExecption = true; + boolean copyThrewException = true; ShufflePartitionWriter writer = mapWriter.getPartitionWriter(partition); OutputStream partitionOutput = writer.openStream(); try { @@ -409,9 +408,9 @@ private void mergeSpillsWithFileStream( } } } - copyThrewExecption = false; + copyThrewException = false; } finally { - Closeables.close(partitionOutput, copyThrewExecption); + Closeables.close(partitionOutput, copyThrewException); } long numBytesWritten = writer.getNumBytesWritten(); writeMetrics.incBytesWritten(numBytesWritten); @@ -458,10 +457,10 @@ private void mergeSpillsWithTransferTo( final FileChannel spillInputChannel = spillInputChannels[i]; final long writeStartTime = System.nanoTime(); Utils.copyFileStreamNIO( - spillInputChannel, - resolvedChannel.channel(), - spillInputChannelPositions[i], - partitionLengthInSpill); + spillInputChannel, + resolvedChannel.channel(), + spillInputChannelPositions[i], + partitionLengthInSpill); copyThrewExecption = false; spillInputChannelPositions[i] += partitionLengthInSpill; writeMetrics.incWriteTime(System.nanoTime() - writeStartTime); diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java index 3463bd6f1775b..444cdc4270ecd 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java @@ -102,8 +102,7 @@ public long[] commitAllPartitions() throws IOException { // exception if it is incorrect. The position will not be increased to the expected length // after calling transferTo in kernel version 2.6.32. This issue is described at // https://bugs.openjdk.java.net/browse/JDK-7052359 and SPARK-3948. - if (outputFileChannel != null - && outputFileChannel.position() != bytesWrittenToMergedFile) { + if (outputFileChannel != null && outputFileChannel.position() != bytesWrittenToMergedFile) { throw new IOException( "Current position " + outputFileChannel.position() + " does not equal expected " + "position " + bytesWrittenToMergedFile + " after transferTo. Please check your " + diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index de56a8d0434e6..72201ffaf1e60 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -137,7 +137,7 @@ public void setUp() throws IOException { when(shuffleBlockResolver.getDataFile(anyInt(), anyInt())).thenReturn(mergedOutputFile); - Answer renameTempAnswer = invocationOnMock -> { + Answer renameTempAnswer = invocationOnMock -> { partitionSizesInMergedFile = (long[]) invocationOnMock.getArguments()[2]; File tmp = (File) invocationOnMock.getArguments()[3]; if (!mergedOutputFile.delete()) { From 94a36534285759972987a35249bf063bb0f84ba0 Mon Sep 17 00:00:00 2001 From: mcheah Date: Sat, 7 Sep 2019 23:26:01 -0700 Subject: [PATCH 15/16] Fix one more typo --- .../org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index c18df43e5e746..dad7cc427f24e 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -447,7 +447,7 @@ private void mergeSpillsWithTransferTo( spillInputChannels[i] = new FileInputStream(spills[i].file).getChannel(); } for (int partition = 0; partition < numPartitions; partition++) { - boolean copyThrewExecption = true; + boolean copyThrewException = true; ShufflePartitionWriter writer = mapWriter.getPartitionWriter(partition); WritableByteChannelWrapper resolvedChannel = writer.openChannelWrapper() .orElseGet(() -> new StreamFallbackChannelWrapper(openStreamUnchecked(writer))); @@ -461,12 +461,12 @@ private void mergeSpillsWithTransferTo( resolvedChannel.channel(), spillInputChannelPositions[i], partitionLengthInSpill); - copyThrewExecption = false; + copyThrewException = false; spillInputChannelPositions[i] += partitionLengthInSpill; writeMetrics.incWriteTime(System.nanoTime() - writeStartTime); } } finally { - Closeables.close(resolvedChannel, copyThrewExecption); + Closeables.close(resolvedChannel, copyThrewException); } long numBytes = writer.getNumBytesWritten(); writeMetrics.incBytesWritten(numBytes); From 85a101dff8a7a58d293c283857a219b6e73510de Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 9 Sep 2019 18:59:23 -0700 Subject: [PATCH 16/16] Address comments --- .../org/apache/spark/shuffle/api/ShuffleExecutorComponents.java | 2 +- .../java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java | 2 +- .../shuffle/sort/io/LocalDiskSingleSpillMapOutputWriter.java | 1 + .../main/scala/org/apache/spark/internal/config/package.scala | 2 +- .../org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java | 2 +- 5 files changed, 5 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleExecutorComponents.java b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleExecutorComponents.java index f8df49b91380f..804119cd06fa6 100644 --- a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleExecutorComponents.java +++ b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleExecutorComponents.java @@ -18,8 +18,8 @@ package org.apache.spark.shuffle.api; import java.io.IOException; - import java.util.Optional; + import org.apache.spark.annotation.Private; /** diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index dad7cc427f24e..f59bddc993639 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -292,7 +292,7 @@ private long[] mergeSpillsUsingStandardWriter(SpillInfo[] spills) throws IOExcep final boolean compressionEnabled = (boolean) sparkConf.get(package$.MODULE$.SHUFFLE_COMPRESS()); final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.createCodec(sparkConf); final boolean fastMergeEnabled = - (boolean) sparkConf.get(package$.MODULE$.SHUFFLE_UNDAFE_FAST_MERGE_ENABLE()); + (boolean) sparkConf.get(package$.MODULE$.SHUFFLE_UNSAFE_FAST_MERGE_ENABLE()); final boolean fastMergeIsSupported = !compressionEnabled || CompressionCodec$.MODULE$.supportsConcatenationOfSerializedStreams(compressionCodec); final boolean encryptionEnabled = blockManager.serializerManager().encryptionEnabled(); diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskSingleSpillMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskSingleSpillMapOutputWriter.java index 18ef1dff25167..6b0a797a61b52 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskSingleSpillMapOutputWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskSingleSpillMapOutputWriter.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.IOException; import java.nio.file.Files; + import org.apache.spark.shuffle.IndexShuffleBlockResolver; import org.apache.spark.shuffle.api.SingleSpillShuffleMapOutputWriter; import org.apache.spark.util.Utils; diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index b898413ac8d76..158a4b7cfa55a 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1026,7 +1026,7 @@ package object config { .booleanConf .createWithDefault(false) - private[spark] val SHUFFLE_UNDAFE_FAST_MERGE_ENABLE = + private[spark] val SHUFFLE_UNSAFE_FAST_MERGE_ENABLE = ConfigBuilder("spark.shuffle.unsafe.fastMergeEnabled") .doc("Whether to perform a fast spill merge.") .booleanConf diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index 72201ffaf1e60..1022111897a49 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -408,7 +408,7 @@ public void mergeSpillsWithFileStreamAndCompressionAndEncryption() throws Except @Test public void mergeSpillsWithCompressionAndEncryptionSlowPath() throws Exception { - conf.set(package$.MODULE$.SHUFFLE_UNDAFE_FAST_MERGE_ENABLE(), false); + conf.set(package$.MODULE$.SHUFFLE_UNSAFE_FAST_MERGE_ENABLE(), false); testMergingSpills(false, LZ4CompressionCodec.class.getName(), true); }