Skip to content

Commit cdf2b04

Browse files
witgotgravescs
authored andcommitted
[SPARK-1930] The Container is running beyond physical memory limits, so as to be killed
Author: witgo <[email protected]> Closes apache#894 from witgo/SPARK-1930 and squashes the following commits: 564307e [witgo] Update the running-on-yarn.md 3747515 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1930 172647b [witgo] add memoryOverhead docs a0ff545 [witgo] leaving only two configs a17bda2 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1930 478ca15 [witgo] Merge branch 'master' into SPARK-1930 d1244a1 [witgo] Merge branch 'master' into SPARK-1930 8b967ae [witgo] Merge branch 'master' into SPARK-1930 655a820 [witgo] review commit 71859a7 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1930 e3c531d [witgo] review commit e16f190 [witgo] different memoryOverhead ffa7569 [witgo] review commit 5c9581f [witgo] Merge branch 'master' into SPARK-1930 9a6bcf2 [witgo] review commit 8fae45a [witgo] fix NullPointerException e0dcc16 [witgo] Adding configuration items b6a989c [witgo] Fix container memory beyond limit, were killed
1 parent 4fdb491 commit cdf2b04

File tree

7 files changed

+46
-18
lines changed

7 files changed

+46
-18
lines changed

docs/running-on-yarn.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,20 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
6767
The address of the Spark history server (i.e. host.com:18080). The address should not contain a scheme (http://). Defaults to not being set since the history server is an optional service. This address is given to the YARN ResourceManager when the Spark application finishes to link the application from the ResourceManager UI to the Spark history server UI.
6868
</td>
6969
</tr>
70+
<tr>
71+
<td><code>spark.yarn.executor.memoryOverhead</code></td>
72+
<td>384</code></td>
73+
<td>
74+
The amount of off heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc.
75+
</td>
76+
</tr>
77+
<tr>
78+
<td><code>spark.yarn.driver.memoryOverhead</code></td>
79+
<td>384</code></td>
80+
<td>
81+
The amount of off heap memory (in megabytes) to be allocated per driver. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc.
82+
</td>
83+
</tr>
7084
</table>
7185

7286
By default, Spark on YARN will use a Spark jar installed locally, but the Spark JAR can also be in a world-readable location on HDFS. This allows YARN to cache it on nodes so that it doesn't need to be distributed each time an application runs. To point to a JAR on HDFS, `export SPARK_JAR=hdfs:///some/path`.

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
7171

7272
val capability = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
7373
// Memory for the ApplicationMaster.
74-
capability.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
74+
capability.setMemory(args.amMemory + memoryOverhead)
7575
amContainer.setResource(capability)
7676

7777
appContext.setQueue(args.amQueue)
@@ -115,7 +115,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
115115
val minResMemory = newApp.getMinimumResourceCapability().getMemory()
116116
val amMemory = ((args.amMemory / minResMemory) * minResMemory) +
117117
((if ((args.amMemory % minResMemory) == 0) 0 else minResMemory) -
118-
YarnAllocationHandler.MEMORY_OVERHEAD)
118+
memoryOverhead)
119119
amMemory
120120
}
121121

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,13 +92,15 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
9292

9393
appAttemptId = getApplicationAttemptId()
9494
resourceManager = registerWithResourceManager()
95+
9596
val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
9697

9798
// Compute number of threads for akka
9899
val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
99100

100101
if (minimumMemory > 0) {
101-
val mem = args.executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD
102+
val mem = args.executorMemory + sparkConf.getInt("spark.yarn.executor.memoryOverhead",
103+
YarnAllocationHandler.MEMORY_OVERHEAD)
102104
val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
103105

104106
if (numCore > 0) {

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ private[yarn] class YarnAllocationHandler(
8888
// Containers to be released in next request to RM
8989
private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean]
9090

91+
// Additional memory overhead - in mb.
92+
private def memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
93+
YarnAllocationHandler.MEMORY_OVERHEAD)
94+
9195
private val numExecutorsRunning = new AtomicInteger()
9296
// Used to generate a unique id per executor
9397
private val executorIdCounter = new AtomicInteger()
@@ -99,7 +103,7 @@ private[yarn] class YarnAllocationHandler(
99103
def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
100104

101105
def isResourceConstraintSatisfied(container: Container): Boolean = {
102-
container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
106+
container.getResource.getMemory >= (executorMemory + memoryOverhead)
103107
}
104108

105109
def allocateContainers(executorsToRequest: Int) {
@@ -229,7 +233,7 @@ private[yarn] class YarnAllocationHandler(
229233
val containerId = container.getId
230234

231235
assert( container.getResource.getMemory >=
232-
(executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
236+
(executorMemory + memoryOverhead))
233237

234238
if (numExecutorsRunningNow > maxExecutors) {
235239
logInfo("""Ignoring container %s at host %s, since we already have the required number of
@@ -450,7 +454,7 @@ private[yarn] class YarnAllocationHandler(
450454

451455
if (numExecutors > 0) {
452456
logInfo("Allocating %d executor containers with %d of memory each.".format(numExecutors,
453-
executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
457+
executorMemory + memoryOverhead))
454458
} else {
455459
logDebug("Empty allocation req .. release : " + releasedContainerList)
456460
}
@@ -505,7 +509,7 @@ private[yarn] class YarnAllocationHandler(
505509
val rsrcRequest = Records.newRecord(classOf[ResourceRequest])
506510
val memCapability = Records.newRecord(classOf[Resource])
507511
// There probably is some overhead here, let's reserve a bit more memory.
508-
memCapability.setMemory(executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
512+
memCapability.setMemory(executorMemory + memoryOverhead)
509513
rsrcRequest.setCapability(memCapability)
510514

511515
val pri = Records.newRecord(classOf[Priority])

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,17 +65,21 @@ trait ClientBase extends Logging {
6565
val APP_FILE_PERMISSION: FsPermission =
6666
FsPermission.createImmutable(Integer.parseInt("644", 8).toShort)
6767

68+
// Additional memory overhead - in mb.
69+
protected def memoryOverhead: Int = sparkConf.getInt("spark.yarn.driver.memoryOverhead",
70+
YarnAllocationHandler.MEMORY_OVERHEAD)
71+
6872
// TODO(harvey): This could just go in ClientArguments.
6973
def validateArgs() = {
7074
Map(
7175
((args.userJar == null && args.amClass == classOf[ApplicationMaster].getName) ->
7276
"Error: You must specify a user jar when running in standalone mode!"),
7377
(args.userClass == null) -> "Error: You must specify a user class!",
7478
(args.numExecutors <= 0) -> "Error: You must specify at least 1 executor!",
75-
(args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: AM memory size must be" +
76-
"greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD),
77-
(args.executorMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: Executor memory size" +
78-
"must be greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD.toString)
79+
(args.amMemory <= memoryOverhead) -> ("Error: AM memory size must be" +
80+
"greater than: " + memoryOverhead),
81+
(args.executorMemory <= memoryOverhead) -> ("Error: Executor memory size" +
82+
"must be greater than: " + memoryOverhead.toString)
7983
).foreach { case(cond, errStr) =>
8084
if (cond) {
8185
logError(errStr)
@@ -101,7 +105,7 @@ trait ClientBase extends Logging {
101105
logError(errorMessage)
102106
throw new IllegalArgumentException(errorMessage)
103107
}
104-
val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD
108+
val amMem = args.amMemory + memoryOverhead
105109
if (amMem > maxMem) {
106110

107111
val errorMessage = "Required AM memory (%d) is above the max threshold (%d) of this cluster."

yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
8484

8585
// Memory for the ApplicationMaster.
8686
val memoryResource = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
87-
memoryResource.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
87+
memoryResource.setMemory(args.amMemory + memoryOverhead)
8888
appContext.setResource(memoryResource)
8989

9090
// Finally, submit and monitor the application.
@@ -117,7 +117,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
117117
// val minResMemory: Int = newApp.getMinimumResourceCapability().getMemory()
118118
// var amMemory = ((args.amMemory / minResMemory) * minResMemory) +
119119
// ((if ((args.amMemory % minResMemory) == 0) 0 else minResMemory) -
120-
// YarnAllocationHandler.MEMORY_OVERHEAD)
120+
// memoryOverhead )
121121
args.amMemory
122122
}
123123

yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ private[yarn] class YarnAllocationHandler(
9090
// Containers to be released in next request to RM
9191
private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean]
9292

93+
// Additional memory overhead - in mb.
94+
private def memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
95+
YarnAllocationHandler.MEMORY_OVERHEAD)
96+
9397
// Number of container requests that have been sent to, but not yet allocated by the
9498
// ApplicationMaster.
9599
private val numPendingAllocate = new AtomicInteger()
@@ -106,7 +110,7 @@ private[yarn] class YarnAllocationHandler(
106110
def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
107111

108112
def isResourceConstraintSatisfied(container: Container): Boolean = {
109-
container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
113+
container.getResource.getMemory >= (executorMemory + memoryOverhead)
110114
}
111115

112116
def releaseContainer(container: Container) {
@@ -248,7 +252,7 @@ private[yarn] class YarnAllocationHandler(
248252
val executorHostname = container.getNodeId.getHost
249253
val containerId = container.getId
250254

251-
val executorMemoryOverhead = (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
255+
val executorMemoryOverhead = (executorMemory + memoryOverhead)
252256
assert(container.getResource.getMemory >= executorMemoryOverhead)
253257

254258
if (numExecutorsRunningNow > maxExecutors) {
@@ -477,7 +481,7 @@ private[yarn] class YarnAllocationHandler(
477481
numPendingAllocate.addAndGet(numExecutors)
478482
logInfo("Will Allocate %d executor containers, each with %d memory".format(
479483
numExecutors,
480-
(executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)))
484+
(executorMemory + memoryOverhead)))
481485
} else {
482486
logDebug("Empty allocation request ...")
483487
}
@@ -537,7 +541,7 @@ private[yarn] class YarnAllocationHandler(
537541
priority: Int
538542
): ArrayBuffer[ContainerRequest] = {
539543

540-
val memoryRequest = executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD
544+
val memoryRequest = executorMemory + memoryOverhead
541545
val resource = Resource.newInstance(memoryRequest, executorCores)
542546

543547
val prioritySetting = Records.newRecord(classOf[Priority])

0 commit comments

Comments
 (0)