Skip to content

Commit b6a989c

Browse files
committed
Fix container memory beyond limit, were killed
1 parent 549830b commit b6a989c

File tree

6 files changed

+20
-21
lines changed

6 files changed

+20
-21
lines changed

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
7171

7272
val capability = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
7373
// Memory for the ApplicationMaster.
74-
capability.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
74+
capability.setMemory((args.amMemory * YarnAllocationHandler.MEMORY_OVERHEAD).ceil.toInt)
7575
amContainer.setResource(capability)
7676

7777
appContext.setQueue(args.amQueue)

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
9898
val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
9999

100100
if (minimumMemory > 0) {
101-
val mem = args.executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD
101+
val mem = (args.executorMemory * YarnAllocationHandler.MEMORY_OVERHEAD).ceil.toInt
102102
val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
103103

104104
if (numCore > 0) {

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,8 @@ private[yarn] class YarnAllocationHandler(
9999
def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
100100

101101
def isResourceConstraintSatisfied(container: Container): Boolean = {
102-
container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
102+
container.getResource.getMemory >= (executorMemory * YarnAllocationHandler.MEMORY_OVERHEAD).
103+
ceil.toInt
103104
}
104105

105106
def allocateContainers(executorsToRequest: Int) {
@@ -229,7 +230,7 @@ private[yarn] class YarnAllocationHandler(
229230
val containerId = container.getId
230231

231232
assert( container.getResource.getMemory >=
232-
(executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
233+
(executorMemory * YarnAllocationHandler.MEMORY_OVERHEAD).ceil.toInt)
233234

234235
if (numExecutorsRunningNow > maxExecutors) {
235236
logInfo("""Ignoring container %s at host %s, since we already have the required number of
@@ -450,7 +451,7 @@ private[yarn] class YarnAllocationHandler(
450451

451452
if (numExecutors > 0) {
452453
logInfo("Allocating %d executor containers with %d of memory each.".format(numExecutors,
453-
executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
454+
(executorMemory * YarnAllocationHandler.MEMORY_OVERHEAD)).ceil.toInt)
454455
} else {
455456
logDebug("Empty allocation req .. release : " + releasedContainerList)
456457
}
@@ -505,7 +506,7 @@ private[yarn] class YarnAllocationHandler(
505506
val rsrcRequest = Records.newRecord(classOf[ResourceRequest])
506507
val memCapability = Records.newRecord(classOf[Resource])
507508
// There probably is some overhead here, let's reserve a bit more memory.
508-
memCapability.setMemory(executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
509+
memCapability.setMemory((executorMemory * YarnAllocationHandler.MEMORY_OVERHEAD).ceil.toInt)
509510
rsrcRequest.setCapability(memCapability)
510511

511512
val pri = Records.newRecord(classOf[Priority])
@@ -544,8 +545,8 @@ object YarnAllocationHandler {
544545
// request types (like map/reduce in hadoop for example)
545546
val PRIORITY = 1
546547

547-
// Additional memory overhead - in mb
548-
val MEMORY_OVERHEAD = 384
548+
// Additional memory overhead
549+
val MEMORY_OVERHEAD = 1.25D
549550

550551
// Host to rack map - saved from allocation requests
551552
// We are expecting this not to change.

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,7 @@ trait ClientBase extends Logging {
7171
((args.userJar == null && args.amClass == classOf[ApplicationMaster].getName) ->
7272
"Error: You must specify a user jar when running in standalone mode!"),
7373
(args.userClass == null) -> "Error: You must specify a user class!",
74-
(args.numExecutors <= 0) -> "Error: You must specify at least 1 executor!",
75-
(args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: AM memory size must be" +
76-
"greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD),
77-
(args.executorMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: Executor memory size" +
78-
"must be greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD.toString)
74+
(args.numExecutors <= 0) -> "Error: You must specify at least 1 executor!"
7975
).foreach { case(cond, errStr) =>
8076
if (cond) {
8177
logError(errStr)
@@ -98,7 +94,7 @@ trait ClientBase extends Logging {
9894
format(args.executorMemory, maxMem))
9995
System.exit(1)
10096
}
101-
val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD
97+
val amMem = (args.amMemory * YarnAllocationHandler.MEMORY_OVERHEAD).ceil.toInt
10298
if (amMem > maxMem) {
10399
logError("Required AM memory (%d) is above the max threshold (%d) of this cluster".
104100
format(args.amMemory, maxMem))

yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
8484

8585
// Memory for the ApplicationMaster.
8686
val memoryResource = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
87-
memoryResource.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
87+
memoryResource.setMemory((args.amMemory * YarnAllocationHandler.MEMORY_OVERHEAD).ceil.toInt)
8888
appContext.setResource(memoryResource)
8989

9090
// Finally, submit and monitor the application.

yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,8 @@ private[yarn] class YarnAllocationHandler(
106106
def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
107107

108108
def isResourceConstraintSatisfied(container: Container): Boolean = {
109-
container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
109+
container.getResource.getMemory >= (executorMemory * YarnAllocationHandler.MEMORY_OVERHEAD).
110+
ceil.toInt
110111
}
111112

112113
def releaseContainer(container: Container) {
@@ -248,7 +249,8 @@ private[yarn] class YarnAllocationHandler(
248249
val executorHostname = container.getNodeId.getHost
249250
val containerId = container.getId
250251

251-
val executorMemoryOverhead = (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
252+
val executorMemoryOverhead = (executorMemory * YarnAllocationHandler.MEMORY_OVERHEAD).
253+
ceil.toInt
252254
assert(container.getResource.getMemory >= executorMemoryOverhead)
253255

254256
if (numExecutorsRunningNow > maxExecutors) {
@@ -477,7 +479,7 @@ private[yarn] class YarnAllocationHandler(
477479
numPendingAllocate.addAndGet(numExecutors)
478480
logInfo("Will Allocate %d executor containers, each with %d memory".format(
479481
numExecutors,
480-
(executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)))
482+
(executorMemory * YarnAllocationHandler.MEMORY_OVERHEAD).ceil.toInt))
481483
} else {
482484
logDebug("Empty allocation request ...")
483485
}
@@ -537,7 +539,7 @@ private[yarn] class YarnAllocationHandler(
537539
priority: Int
538540
): ArrayBuffer[ContainerRequest] = {
539541

540-
val memoryRequest = executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD
542+
val memoryRequest = (executorMemory * YarnAllocationHandler.MEMORY_OVERHEAD).ceil.toInt
541543
val resource = Resource.newInstance(memoryRequest, executorCores)
542544

543545
val prioritySetting = Records.newRecord(classOf[Priority])
@@ -558,8 +560,8 @@ object YarnAllocationHandler {
558560
// request types (like map/reduce in hadoop for example)
559561
val PRIORITY = 1
560562

561-
// Additional memory overhead - in mb.
562-
val MEMORY_OVERHEAD = 384
563+
// Additional memory overhead.
564+
val MEMORY_OVERHEAD = 1.25D
563565

564566
// Host to rack map - saved from allocation requests. We are expecting this not to change.
565567
// Note that it is possible for this to change : and ResurceManager will indicate that to us via

0 commit comments

Comments
 (0)