From 15718776665db6ebff463587e726bb1e704dabc7 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 1 Dec 2015 15:44:34 -0800 Subject: [PATCH 1/2] Carve out 300MB reserved memory The new space used by storage and execution will be calculated by (JVM size - 300MB) * 75%, the `spark.memory.fraction`. --- .../spark/memory/UnifiedMemoryManager.scala | 16 ++++++++++++--- .../memory/UnifiedMemoryManagerSuite.scala | 20 +++++++++++++++++++ docs/configuration.md | 4 ++-- docs/tuning.md | 2 +- 4 files changed, 36 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala index 8be5b0541909..f992b7afc3d4 100644 --- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala @@ -48,7 +48,7 @@ import org.apache.spark.storage.{BlockStatus, BlockId} */ private[spark] class UnifiedMemoryManager private[memory] ( conf: SparkConf, - maxMemory: Long, + val maxMemory: Long, private val storageRegionSize: Long, numCores: Int) extends MemoryManager( @@ -130,6 +130,8 @@ private[spark] class UnifiedMemoryManager private[memory] ( object UnifiedMemoryManager { + private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024 + def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager = { val maxMemory = getMaxMemory(conf) new UnifiedMemoryManager( @@ -144,8 +146,16 @@ object UnifiedMemoryManager { * Return the total amount of memory shared between execution and storage, in bytes. */ private def getMaxMemory(conf: SparkConf): Long = { - val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory) + val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory) + val reservedMemory = conf.getLong("spark.testing.reservedMemory", + if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES) + val minSystemMemory = reservedMemory * 1.5 + if (systemMemory < minSystemMemory) { + throw new IllegalArgumentException(s"System memory $systemMemory must " + + s"be at least $minSystemMemory. Please use a larger heap size.") + } + val usableMemory = systemMemory - reservedMemory val memoryFraction = conf.getDouble("spark.memory.fraction", 0.75) - (systemMaxMemory * memoryFraction).toLong + (usableMemory * memoryFraction).toLong } } diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala index 8cebe81c3bff..e97c898a4478 100644 --- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala @@ -182,4 +182,24 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes assertEnsureFreeSpaceCalled(ms, 850L) } + test("small heap") { + val systemMemory = 1024 * 1024 + val reservedMemory = 300 * 1024 + val memoryFraction = 0.8 + val conf = new SparkConf() + .set("spark.memory.fraction", memoryFraction.toString) + .set("spark.testing.memory", systemMemory.toString) + .set("spark.testing.reservedMemory", reservedMemory.toString) + val mm = UnifiedMemoryManager(conf, numCores = 1) + val expectedMaxMemory = ((systemMemory - reservedMemory) * memoryFraction).toLong + assert(mm.maxMemory === expectedMaxMemory) + + // Try using a system memory that's too small + val conf2 = conf.clone().set("spark.testing.memory", (reservedMemory / 2).toString) + val exception = intercept[IllegalArgumentException] { + UnifiedMemoryManager(conf2, numCores = 1) + } + assert(exception.getMessage.contains("larger heap size")) + } + } diff --git a/docs/configuration.md b/docs/configuration.md index 741d6b2b37a8..c39b4890851b 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -719,8 +719,8 @@ Apart from these, the following properties are also available, and may be useful spark.memory.fraction 0.75 - Fraction of the heap space used for execution and storage. The lower this is, the more - frequently spills and cached data eviction occur. The purpose of this config is to set + Fraction of (heap space - 300MB) used for execution and storage. The lower this is, the + more frequently spills and cached data eviction occur. The purpose of this config is to set aside memory for internal metadata, user data structures, and imprecise size estimation in the case of sparse, unusually large records. Leaving this at the default value is recommended. For more detail, see diff --git a/docs/tuning.md b/docs/tuning.md index a8fe7a453279..e73ed69ffbbf 100644 --- a/docs/tuning.md +++ b/docs/tuning.md @@ -114,7 +114,7 @@ variety of workloads without requiring user expertise of how memory is divided i Although there are two relevant configurations, the typical user should not need to adjust them as the default values are applicable to most workloads: -* `spark.memory.fraction` expresses the size of `M` as a fraction of the total JVM heap space +* `spark.memory.fraction` expresses the size of `M` as a fraction of the (JVM heap space - 300MB) (default 0.75). The rest of the space (25%) is reserved for user data structures, internal metadata in Spark, and safeguarding against OOM errors in the case of sparse and unusually large records. From 2e9bc4de1e53b722406750b22964b0f1d96fbef8 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 1 Dec 2015 17:01:50 -0800 Subject: [PATCH 2/2] Add some comments --- .../org/apache/spark/memory/UnifiedMemoryManager.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala index f992b7afc3d4..48b4e23433e4 100644 --- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala @@ -26,7 +26,7 @@ import org.apache.spark.storage.{BlockStatus, BlockId} * A [[MemoryManager]] that enforces a soft boundary between execution and storage such that * either side can borrow memory from the other. * - * The region shared between execution and storage is a fraction of the total heap space + * The region shared between execution and storage is a fraction of (the total heap space - 300MB) * configurable through `spark.memory.fraction` (default 0.75). The position of the boundary * within this space is further determined by `spark.memory.storageFraction` (default 0.5). * This means the size of the storage region is 0.75 * 0.5 = 0.375 of the heap space by default. @@ -130,6 +130,10 @@ private[spark] class UnifiedMemoryManager private[memory] ( object UnifiedMemoryManager { + // Set aside a fixed amount of memory for non-storage, non-execution purposes. + // This serves a function similar to `spark.memory.fraction`, but guarantees that we reserve + // sufficient memory for the system even for small heaps. E.g. if we have a 1GB JVM, then + // the memory used for execution and storage will be (1024 - 300) * 0.75 = 543MB by default. private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024 def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager = {