diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f7750514ae13..815912a6283a 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -248,7 +248,7 @@ class SparkContext(config: SparkConf) extends Logging { .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY"))) .orElse(Option(System.getenv("SPARK_MEM")).map(warnSparkMem)) .map(Utils.memoryStringToMb) - .getOrElse(512) + .getOrElse(1024) // Environment variables to pass to our executors private[spark] val executorEnvs = HashMap[String, String]() diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index a2a729130091..e4432b6fe093 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1045,7 +1045,7 @@ private[spark] object BlockManager extends Logging { def getMaxMemory(conf: SparkConf): Long = { val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6) - (Runtime.getRuntime.maxMemory * memoryFraction).toLong + (Utils.effectiveMaxMemory(conf) * memoryFraction).toLong } def getHeartBeatFrequency(conf: SparkConf): Long = diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 59da51f3e029..baeab59efb14 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1022,4 +1022,13 @@ private[spark] object Utils extends Logging { def getHadoopFileSystem(path: URI): FileSystem = { FileSystem.get(path, SparkHadoopUtil.get.newConfiguration()) } + + /** + * Determine the system's effective maximum memory after taking into account + * `spark.system.reservedMemorySize` + */ + def effectiveMaxMemory(conf: SparkConf) = { + val reservedMemorySize = memoryStringToMb(conf.get("spark.system.reservedMemorySize", "300m")) + Runtime.getRuntime.maxMemory - (1024 * 1024 * reservedMemorySize) + } } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index dd01ae821f70..a6fcb9a3d5bb 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -77,7 +77,8 @@ class ExternalAppendOnlyMap[K, V, C]( private val maxMemoryThreshold = { val memoryFraction = sparkConf.getDouble("spark.shuffle.memoryFraction", 0.3) val safetyFraction = sparkConf.getDouble("spark.shuffle.safetyFraction", 0.8) - (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + val effectiveMaxMemory = org.apache.spark.util.Utils.effectiveMaxMemory(sparkConf) + (effectiveMaxMemory * memoryFraction * safetyFraction).toLong } // Number of pairs in the in-memory map diff --git a/docs/configuration.md b/docs/configuration.md index 9c602402f063..a7ffb9244b4c 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -40,7 +40,7 @@ there are at least five properties that you will commonly want to control: Property NameDefaultMeaning spark.executor.memory - 512m + 1g Amount of memory to use per executor process, in the same format as JVM memory strings (e.g. 512m, 2g). @@ -102,24 +102,32 @@ Apart from these, the following properties are also available, and may be useful reduceByKey, etc) when not set by user. + + spark.system.reservedMemorySize + 300m + + Constant amount of heap to reserve on executors for Spark's own code and user code. Taken into account before calculating + memory available for and shuffle as configured in spark.storage.memoryFraction and + spark.shuffle.memoryFraction. + + spark.storage.memoryFraction 0.6 - Fraction of Java heap to use for Spark's memory cache. This should not be larger than the "old" - generation of objects in the JVM, which by default is given 0.6 of the heap, but you can increase - it if you configure your own old generation size. + Fraction of Java heap to use for Spark's memory cache, after accounting for spark.system.reservedMemorySize. + The effective size should not be larger than the "old" generation of objects in the JVM. spark.shuffle.memoryFraction 0.3 - Fraction of Java heap to use for aggregation and cogroups during shuffles, if - spark.shuffle.spill is true. At any given time, the collective size of - all in-memory maps used for shuffles is bounded by this limit, beyond which the contents will - begin to spill to disk. If spills are often, consider increasing this value at the expense of - spark.storage.memoryFraction. + Fraction of Java heap to use for aggregation and cogroups during shuffles, after accounting for + spark.system.reservedMemorySize, and if spark.shuffle.spill is true. + At any given time, the collective size of all in-memory maps used for shuffles is bounded by this + limit, beyond which the contents will begin to spill to disk. If spills are often, consider increasing + this value at the expense of spark.storage.memoryFraction. diff --git a/docs/tuning.md b/docs/tuning.md index 093df3187a78..0873e2f6cae6 100644 --- a/docs/tuning.md +++ b/docs/tuning.md @@ -163,14 +163,15 @@ their work directories), *not* on your driver program. **Cache Size Tuning** One important configuration parameter for GC is the amount of memory that should be used for caching RDDs. -By default, Spark uses 60% of the configured executor memory (`spark.executor.memory`) to -cache RDDs. This means that 40% of memory is available for any objects created during task execution. +By default, Spark caches RDDs using 60% of the configured executor memory (`spark.executor.memory`), after accounting for +the reserved memory defined by `spark.system.reservedMemorySize`. The remaining memory is used for +other objects, for example to perform shuffles in-memory. You can change the amount of heap used for caching +by setting `spark.storage.memoryFraction`. For example, to change this to 50%, you can call +`conf.set("spark.storage.memoryFraction", "0.5")` on your SparkConf. In case your tasks slow down and you find that your JVM is garbage-collecting frequently or running out of -memory, lowering this value will help reduce the memory consumption. To change this to say 50%, you can call -`conf.set("spark.storage.memoryFraction", "0.5")` on your SparkConf. Combined with the use of serialized caching, -using a smaller cache should be sufficient to mitigate most of the garbage collection problems. -In case you are interested in further tuning the Java GC, continue reading below. +memory, lowering `spark.storage.memoryFraction` or raising `spark.system.reservedMemorySize` +can provide enough headroom for GC. **Advanced GC Tuning**