Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ class SparkContext(config: SparkConf) extends Logging {
.orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
.orElse(Option(System.getenv("SPARK_MEM")).map(warnSparkMem))
.map(Utils.memoryStringToMb)
.getOrElse(512)
.getOrElse(1024)

// Environment variables to pass to our executors
private[spark] val executorEnvs = HashMap[String, String]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1045,7 +1045,7 @@ private[spark] object BlockManager extends Logging {

def getMaxMemory(conf: SparkConf): Long = {
val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
(Runtime.getRuntime.maxMemory * memoryFraction).toLong
(Utils.effectiveMaxMemory(conf) * memoryFraction).toLong
}

def getHeartBeatFrequency(conf: SparkConf): Long =
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1022,4 +1022,13 @@ private[spark] object Utils extends Logging {
def getHadoopFileSystem(path: URI): FileSystem = {
FileSystem.get(path, SparkHadoopUtil.get.newConfiguration())
}

/**
* Determine the system's effective maximum memory after taking into account
* `spark.system.reservedMemorySize`
*/
def effectiveMaxMemory(conf: SparkConf) = {
val reservedMemorySize = memoryStringToMb(conf.get("spark.system.reservedMemorySize", "300m"))
Runtime.getRuntime.maxMemory - (1024 * 1024 * reservedMemorySize)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ class ExternalAppendOnlyMap[K, V, C](
private val maxMemoryThreshold = {
val memoryFraction = sparkConf.getDouble("spark.shuffle.memoryFraction", 0.3)
val safetyFraction = sparkConf.getDouble("spark.shuffle.safetyFraction", 0.8)
(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
val effectiveMaxMemory = org.apache.spark.util.Utils.effectiveMaxMemory(sparkConf)
(effectiveMaxMemory * memoryFraction * safetyFraction).toLong
}

// Number of pairs in the in-memory map
Expand Down
26 changes: 17 additions & 9 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ there are at least five properties that you will commonly want to control:
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
<tr>
<td>spark.executor.memory</td>
<td>512m</td>
<td>1g</td>
<td>
Amount of memory to use per executor process, in the same format as JVM memory strings (e.g. <code>512m</code>, <code>2g</code>).
</td>
Expand Down Expand Up @@ -102,24 +102,32 @@ Apart from these, the following properties are also available, and may be useful
<code>reduceByKey</code>, etc) when not set by user.
</td>
</tr>
<tr>
<td>spark.system.reservedMemorySize</td>
<td>300m</td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems a bit high - I spoke with @aarondav and we were thinking 200m might be a better default for this. He said after GC the actual space used ends up being ~100m. The issue with setting it too high is that it might falsely reserve un-used space.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's talk more during the QA phase to figure out the right numbers. I've actually got a test I'm running that still OOMs at 300m, but does fine when at 350m.

<td>
Constant amount of heap to reserve on executors for Spark's own code and user code. Taken into account before calculating
memory available for and shuffle as configured in <code>spark.storage.memoryFraction</code> and
<code>spark.shuffle.memoryFraction</code>.
</td>
</tr>
<tr>
<td>spark.storage.memoryFraction</td>
<td>0.6</td>
<td>
Fraction of Java heap to use for Spark's memory cache. This should not be larger than the "old"
generation of objects in the JVM, which by default is given 0.6 of the heap, but you can increase
it if you configure your own old generation size.
Fraction of Java heap to use for Spark's memory cache, after accounting for <code>spark.system.reservedMemorySize</code>.
The effective size should not be larger than the "old" generation of objects in the JVM.
</td>
</tr>
<tr>
<td>spark.shuffle.memoryFraction</td>
<td>0.3</td>
<td>
Fraction of Java heap to use for aggregation and cogroups during shuffles, if
<code>spark.shuffle.spill</code> is true. At any given time, the collective size of
all in-memory maps used for shuffles is bounded by this limit, beyond which the contents will
begin to spill to disk. If spills are often, consider increasing this value at the expense of
<code>spark.storage.memoryFraction</code>.
Fraction of Java heap to use for aggregation and cogroups during shuffles, after accounting for
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sentence reads a little weird. Move "if spark.shuffle.spill is true," to the front?

<code>spark.system.reservedMemorySize</code>, and if <code>spark.shuffle.spill</code> is true.
At any given time, the collective size of all in-memory maps used for shuffles is bounded by this
limit, beyond which the contents will begin to spill to disk. If spills are often, consider increasing
this value at the expense of <code>spark.storage.memoryFraction</code>.
</td>
</tr>
<tr>
Expand Down
13 changes: 7 additions & 6 deletions docs/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,14 +163,15 @@ their work directories), *not* on your driver program.
**Cache Size Tuning**

One important configuration parameter for GC is the amount of memory that should be used for caching RDDs.
By default, Spark uses 60% of the configured executor memory (`spark.executor.memory`) to
cache RDDs. This means that 40% of memory is available for any objects created during task execution.
By default, Spark caches RDDs using 60% of the configured executor memory (`spark.executor.memory`), after accounting for
the reserved memory defined by `spark.system.reservedMemorySize`. The remaining memory is used for
other objects, for example to perform shuffles in-memory. You can change the amount of heap used for caching
by setting `spark.storage.memoryFraction`. For example, to change this to 50%, you can call
`conf.set("spark.storage.memoryFraction", "0.5")` on your SparkConf.

In case your tasks slow down and you find that your JVM is garbage-collecting frequently or running out of
memory, lowering this value will help reduce the memory consumption. To change this to say 50%, you can call
`conf.set("spark.storage.memoryFraction", "0.5")` on your SparkConf. Combined with the use of serialized caching,
using a smaller cache should be sufficient to mitigate most of the garbage collection problems.
In case you are interested in further tuning the Java GC, continue reading below.
memory, lowering `spark.storage.memoryFraction` or raising `spark.system.reservedMemorySize`
can provide enough headroom for GC.

**Advanced GC Tuning**

Expand Down