diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index 13fd18c0942b1..ac9d335d63591 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -87,7 +87,7 @@ public class UnsafeShuffleWriter extends ShuffleWriter { private final SparkConf sparkConf; private final boolean transferToEnabled; private final int initialSortBufferSize; - private final int inputBufferSizeInBytes; + private final int mergeBufferSizeInBytes; @Nullable private MapStatus mapStatus; @Nullable private ShuffleExternalSorter sorter; @@ -140,8 +140,8 @@ public UnsafeShuffleWriter( this.transferToEnabled = (boolean) sparkConf.get(package$.MODULE$.SHUFFLE_MERGE_PREFER_NIO()); this.initialSortBufferSize = (int) (long) sparkConf.get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()); - this.inputBufferSizeInBytes = - (int) (long) sparkConf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024; + this.mergeBufferSizeInBytes = + (int) (long) sparkConf.get(package$.MODULE$.SHUFFLE_FILE_MERGE_BUFFER_SIZE()) * 1024; open(); } @@ -372,7 +372,7 @@ private void mergeSpillsWithFileStream( for (int i = 0; i < spills.length; i++) { spillInputStreams[i] = new NioBufferedFileInputStream( spills[i].file, - inputBufferSizeInBytes); + mergeBufferSizeInBytes); // Only convert the partitionLengths when debug level is enabled. if (logger.isDebugEnabled()) { logger.debug("Partition lengths for mapId {} in Spill {}: {}", mapId, i, diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 940d72df5df69..e9e411cc56b51 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1466,6 +1466,14 @@ package object config { s" ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024}.") .createWithDefaultString("32k") + private[spark] val SHUFFLE_FILE_MERGE_BUFFER_SIZE = + ConfigBuilder("spark.shuffle.file.merge.buffer") + .doc("Size of the in-memory buffer for each shuffle file input stream, in KiB unless " + + "otherwise specified. These buffers use off-heap buffers and are related to the number " + + "of files in the shuffle file. Too large buffers should be avoided.") + .version("4.0.0") + .fallbackConf(SHUFFLE_FILE_BUFFER_SIZE) + private[spark] val SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE = ConfigBuilder("spark.shuffle.unsafe.file.output.buffer") .doc("(Deprecated since Spark 4.0, please use 'spark.shuffle.localDisk.file.output.buffer'.)") diff --git a/docs/configuration.md b/docs/configuration.md index 2881660eded62..532da87f5626f 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1029,6 +1029,16 @@ Apart from these, the following properties are also available, and may be useful 1.4.0 + + spark.shuffle.file.merge.buffer + 32k + + Size of the in-memory buffer for each shuffle file input stream, in KiB unless otherwise + specified. These buffers use off-heap buffers and are related to the number of files in + the shuffle file. Too large buffers should be avoided. + + 4.0.0 + spark.shuffle.unsafe.file.output.buffer 32k