Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.storage.{BlockStatus, BlockId}
* A [[MemoryManager]] that enforces a soft boundary between execution and storage such that
* either side can borrow memory from the other.
*
* The region shared between execution and storage is a fraction of the total heap space
* The region shared between execution and storage is a fraction of (the total heap space - 300MB)
* configurable through `spark.memory.fraction` (default 0.75). The position of the boundary
* within this space is further determined by `spark.memory.storageFraction` (default 0.5).
* This means the size of the storage region is 0.75 * 0.5 = 0.375 of the heap space by default.
Expand All @@ -48,7 +48,7 @@ import org.apache.spark.storage.{BlockStatus, BlockId}
*/
private[spark] class UnifiedMemoryManager private[memory] (
conf: SparkConf,
maxMemory: Long,
val maxMemory: Long,
private val storageRegionSize: Long,
numCores: Int)
extends MemoryManager(
Expand Down Expand Up @@ -130,6 +130,12 @@ private[spark] class UnifiedMemoryManager private[memory] (

object UnifiedMemoryManager {

// Set aside a fixed amount of memory for non-storage, non-execution purposes.
// This serves a function similar to `spark.memory.fraction`, but guarantees that we reserve
// sufficient memory for the system even for small heaps. E.g. if we have a 1GB JVM, then
// the memory used for execution and storage will be (1024 - 300) * 0.75 = 543MB by default.
private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should document what this is and update the classdoc for UnifiedMemoryManager

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For non-small heap, the above memory doesn't need to be reserved, right ?


def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager = {
val maxMemory = getMaxMemory(conf)
new UnifiedMemoryManager(
Expand All @@ -144,8 +150,16 @@ object UnifiedMemoryManager {
* Return the total amount of memory shared between execution and storage, in bytes.
*/
private def getMaxMemory(conf: SparkConf): Long = {
val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
val reservedMemory = conf.getLong("spark.testing.reservedMemory",
if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
val minSystemMemory = reservedMemory * 1.5
if (systemMemory < minSystemMemory) {
throw new IllegalArgumentException(s"System memory $systemMemory must " +
s"be at least $minSystemMemory. Please use a larger heap size.")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comes out as scientific notation, which made me laugh.

}
val usableMemory = systemMemory - reservedMemory
val memoryFraction = conf.getDouble("spark.memory.fraction", 0.75)
(systemMaxMemory * memoryFraction).toLong
(usableMemory * memoryFraction).toLong
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,4 +182,24 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
assertEnsureFreeSpaceCalled(ms, 850L)
}

test("small heap") {
val systemMemory = 1024 * 1024
val reservedMemory = 300 * 1024
val memoryFraction = 0.8
val conf = new SparkConf()
.set("spark.memory.fraction", memoryFraction.toString)
.set("spark.testing.memory", systemMemory.toString)
.set("spark.testing.reservedMemory", reservedMemory.toString)
val mm = UnifiedMemoryManager(conf, numCores = 1)
val expectedMaxMemory = ((systemMemory - reservedMemory) * memoryFraction).toLong
assert(mm.maxMemory === expectedMaxMemory)

// Try using a system memory that's too small
val conf2 = conf.clone().set("spark.testing.memory", (reservedMemory / 2).toString)
val exception = intercept[IllegalArgumentException] {
UnifiedMemoryManager(conf2, numCores = 1)
}
assert(exception.getMessage.contains("larger heap size"))
}

}
4 changes: 2 additions & 2 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -719,8 +719,8 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.memory.fraction</code></td>
<td>0.75</td>
<td>
Fraction of the heap space used for execution and storage. The lower this is, the more
frequently spills and cached data eviction occur. The purpose of this config is to set
Fraction of (heap space - 300MB) used for execution and storage. The lower this is, the
more frequently spills and cached data eviction occur. The purpose of this config is to set
aside memory for internal metadata, user data structures, and imprecise size estimation
in the case of sparse, unusually large records. Leaving this at the default value is
recommended. For more detail, see <a href="tuning.html#memory-management-overview">
Expand Down
2 changes: 1 addition & 1 deletion docs/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ variety of workloads without requiring user expertise of how memory is divided i
Although there are two relevant configurations, the typical user should not need to adjust them
as the default values are applicable to most workloads:

* `spark.memory.fraction` expresses the size of `M` as a fraction of the total JVM heap space
* `spark.memory.fraction` expresses the size of `M` as a fraction of the (JVM heap space - 300MB)
(default 0.75). The rest of the space (25%) is reserved for user data structures, internal
metadata in Spark, and safeguarding against OOM errors in the case of sparse and unusually
large records.
Expand Down