Skip to content

Commit acff9a7

Browse files
committed
[SPARK-2503] Lower shuffle output buffer (spark.shuffle.file.buffer.kb) to 32KB.
This can substantially reduce memory usage during shuffle. Author: Reynold Xin <[email protected]> Closes #1781 from rxin/SPARK-2503-spark.shuffle.file.buffer.kb and squashes the following commits: 104b8d8 [Reynold Xin] [SPARK-2503] Lower shuffle output buffer (spark.shuffle.file.buffer.kb) to 32KB.
1 parent cc491f6 commit acff9a7

File tree

5 files changed

+5
-5
lines changed

5 files changed

+5
-5
lines changed

core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ private[spark] class SortShuffleWriter[K, V, C](
4040
private val ser = Serializer.getSerializer(dep.serializer.orNull)
4141

4242
private val conf = SparkEnv.get.conf
43-
private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
43+
private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024
4444

4545
private var sorter: ExternalSorter[K, V, _] = null
4646
private var outputFile: File = null

core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
7373
val sortBasedShuffle =
7474
conf.get("spark.shuffle.manager", "") == classOf[SortShuffleManager].getName
7575

76-
private val bufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
76+
private val bufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024
7777

7878
/**
7979
* Contains all the state related to a particular shuffle. This includes a pool of unused

core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ class ExternalAppendOnlyMap[K, V, C](
101101
private var _memoryBytesSpilled = 0L
102102
private var _diskBytesSpilled = 0L
103103

104-
private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
104+
private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024
105105
private val keyComparator = new HashComparator[K]
106106
private val ser = serializer.newInstance()
107107

core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ private[spark] class ExternalSorter[K, V, C](
8484

8585
private val conf = SparkEnv.get.conf
8686
private val spillingEnabled = conf.getBoolean("spark.shuffle.spill", true)
87-
private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
87+
private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024
8888

8989
// Size of object batches when reading/writing from serializers.
9090
//

docs/configuration.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ Apart from these, the following properties are also available, and may be useful
266266
</tr>
267267
<tr>
268268
<td><code>spark.shuffle.file.buffer.kb</code></td>
269-
<td>100</td>
269+
<td>32</td>
270270
<td>
271271
Size of the in-memory buffer for each shuffle file output stream, in kilobytes. These buffers
272272
reduce the number of disk seeks and system calls made in creating intermediate shuffle files.

0 commit comments

Comments
 (0)