From b6a989cf36ab0ea2c3a5e9ff8860b22da99163c9 Mon Sep 17 00:00:00 2001 From: witgo Date: Wed, 28 May 2014 00:28:24 +0800 Subject: [PATCH 01/11] Fix container memory beyond limit, were killed --- .../org/apache/spark/deploy/yarn/Client.scala | 2 +- .../spark/deploy/yarn/ExecutorLauncher.scala | 2 +- .../spark/deploy/yarn/YarnAllocationHandler.scala | 13 +++++++------ .../org/apache/spark/deploy/yarn/ClientBase.scala | 8 ++------ .../org/apache/spark/deploy/yarn/Client.scala | 2 +- .../spark/deploy/yarn/YarnAllocationHandler.scala | 14 ++++++++------ 6 files changed, 20 insertions(+), 21 deletions(-) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 8226207de42b..c1b497db8aec 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -71,7 +71,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa val capability = Records.newRecord(classOf[Resource]).asInstanceOf[Resource] // Memory for the ApplicationMaster. - capability.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + capability.setMemory((args.amMemory * YarnAllocationHandler.MEMORY_OVERHEAD).ceil.toInt) amContainer.setResource(capability) appContext.setQueue(args.amQueue) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index a3bd91590fc2..ba064d8244f5 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -98,7 +98,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory() if (minimumMemory > 0) { - val mem = args.executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD + val mem = (args.executorMemory * YarnAllocationHandler.MEMORY_OVERHEAD).ceil.toInt val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0) if (numCore > 0) { diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 856391e52b2d..ca2f6ec00ff6 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -99,7 +99,8 @@ private[yarn] class YarnAllocationHandler( def getNumExecutorsFailed: Int = numExecutorsFailed.intValue def isResourceConstraintSatisfied(container: Container): Boolean = { - container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + container.getResource.getMemory >= (executorMemory * YarnAllocationHandler.MEMORY_OVERHEAD). + ceil.toInt } def allocateContainers(executorsToRequest: Int) { @@ -229,7 +230,7 @@ private[yarn] class YarnAllocationHandler( val containerId = container.getId assert( container.getResource.getMemory >= - (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)) + (executorMemory * YarnAllocationHandler.MEMORY_OVERHEAD).ceil.toInt) if (numExecutorsRunningNow > maxExecutors) { logInfo("""Ignoring container %s at host %s, since we already have the required number of @@ -450,7 +451,7 @@ private[yarn] class YarnAllocationHandler( if (numExecutors > 0) { logInfo("Allocating %d executor containers with %d of memory each.".format(numExecutors, - executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)) + (executorMemory * YarnAllocationHandler.MEMORY_OVERHEAD)).ceil.toInt) } else { logDebug("Empty allocation req .. release : " + releasedContainerList) } @@ -505,7 +506,7 @@ private[yarn] class YarnAllocationHandler( val rsrcRequest = Records.newRecord(classOf[ResourceRequest]) val memCapability = Records.newRecord(classOf[Resource]) // There probably is some overhead here, let's reserve a bit more memory. - memCapability.setMemory(executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + memCapability.setMemory((executorMemory * YarnAllocationHandler.MEMORY_OVERHEAD).ceil.toInt) rsrcRequest.setCapability(memCapability) val pri = Records.newRecord(classOf[Priority]) @@ -544,8 +545,8 @@ object YarnAllocationHandler { // request types (like map/reduce in hadoop for example) val PRIORITY = 1 - // Additional memory overhead - in mb - val MEMORY_OVERHEAD = 384 + // Additional memory overhead + val MEMORY_OVERHEAD = 1.25D // Host to rack map - saved from allocation requests // We are expecting this not to change. diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index aeb3f0062df3..aa9c7917bac2 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -71,11 +71,7 @@ trait ClientBase extends Logging { ((args.userJar == null && args.amClass == classOf[ApplicationMaster].getName) -> "Error: You must specify a user jar when running in standalone mode!"), (args.userClass == null) -> "Error: You must specify a user class!", - (args.numExecutors <= 0) -> "Error: You must specify at least 1 executor!", - (args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: AM memory size must be" + - "greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD), - (args.executorMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: Executor memory size" + - "must be greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD.toString) + (args.numExecutors <= 0) -> "Error: You must specify at least 1 executor!" ).foreach { case(cond, errStr) => if (cond) { logError(errStr) @@ -98,7 +94,7 @@ trait ClientBase extends Logging { format(args.executorMemory, maxMem)) System.exit(1) } - val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD + val amMem = (args.amMemory * YarnAllocationHandler.MEMORY_OVERHEAD).ceil.toInt if (amMem > maxMem) { logError("Required AM memory (%d) is above the max threshold (%d) of this cluster". format(args.amMemory, maxMem)) diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 1b6bfb42a5c1..78cc343af53b 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -84,7 +84,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa // Memory for the ApplicationMaster. val memoryResource = Records.newRecord(classOf[Resource]).asInstanceOf[Resource] - memoryResource.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + memoryResource.setMemory((args.amMemory * YarnAllocationHandler.MEMORY_OVERHEAD).ceil.toInt) appContext.setResource(memoryResource) // Finally, submit and monitor the application. diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index a979fe4d6263..df176cc0701b 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -106,7 +106,8 @@ private[yarn] class YarnAllocationHandler( def getNumExecutorsFailed: Int = numExecutorsFailed.intValue def isResourceConstraintSatisfied(container: Container): Boolean = { - container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + container.getResource.getMemory >= (executorMemory * YarnAllocationHandler.MEMORY_OVERHEAD). + ceil.toInt } def releaseContainer(container: Container) { @@ -248,7 +249,8 @@ private[yarn] class YarnAllocationHandler( val executorHostname = container.getNodeId.getHost val containerId = container.getId - val executorMemoryOverhead = (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + val executorMemoryOverhead = (executorMemory * YarnAllocationHandler.MEMORY_OVERHEAD). + ceil.toInt assert(container.getResource.getMemory >= executorMemoryOverhead) if (numExecutorsRunningNow > maxExecutors) { @@ -477,7 +479,7 @@ private[yarn] class YarnAllocationHandler( numPendingAllocate.addAndGet(numExecutors) logInfo("Will Allocate %d executor containers, each with %d memory".format( numExecutors, - (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))) + (executorMemory * YarnAllocationHandler.MEMORY_OVERHEAD).ceil.toInt)) } else { logDebug("Empty allocation request ...") } @@ -537,7 +539,7 @@ private[yarn] class YarnAllocationHandler( priority: Int ): ArrayBuffer[ContainerRequest] = { - val memoryRequest = executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD + val memoryRequest = (executorMemory * YarnAllocationHandler.MEMORY_OVERHEAD).ceil.toInt val resource = Resource.newInstance(memoryRequest, executorCores) val prioritySetting = Records.newRecord(classOf[Priority]) @@ -558,8 +560,8 @@ object YarnAllocationHandler { // request types (like map/reduce in hadoop for example) val PRIORITY = 1 - // Additional memory overhead - in mb. - val MEMORY_OVERHEAD = 384 + // Additional memory overhead. + val MEMORY_OVERHEAD = 1.25D // Host to rack map - saved from allocation requests. We are expecting this not to change. // Note that it is possible for this to change : and ResurceManager will indicate that to us via From e0dcc165689e26b2029c8bd634818cfa1da6e7da Mon Sep 17 00:00:00 2001 From: witgo Date: Wed, 28 May 2014 11:44:01 +0800 Subject: [PATCH 02/11] Adding configuration items --- .../org/apache/spark/deploy/yarn/Client.scala | 4 +-- .../spark/deploy/yarn/ExecutorLauncher.scala | 29 ++++++++++--------- .../deploy/yarn/YarnAllocationHandler.scala | 15 +++++----- .../apache/spark/deploy/yarn/ClientBase.scala | 11 +++++-- .../org/apache/spark/deploy/yarn/Client.scala | 4 +-- .../deploy/yarn/YarnAllocationHandler.scala | 16 +++++----- 6 files changed, 42 insertions(+), 37 deletions(-) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index c1b497db8aec..e98287ca239f 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -71,7 +71,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa val capability = Records.newRecord(classOf[Resource]).asInstanceOf[Resource] // Memory for the ApplicationMaster. - capability.setMemory((args.amMemory * YarnAllocationHandler.MEMORY_OVERHEAD).ceil.toInt) + capability.setMemory(args.amMemory + memoryOverhead) amContainer.setResource(capability) appContext.setQueue(args.amQueue) @@ -116,7 +116,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa val minResMemory = newApp.getMinimumResourceCapability().getMemory() val amMemory = ((args.amMemory / minResMemory) * minResMemory) + ((if ((args.amMemory % minResMemory) == 0) 0 else minResMemory) - - YarnAllocationHandler.MEMORY_OVERHEAD) + memoryOverhead) amMemory } diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index ba064d8244f5..05e18eb7b7e1 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -92,21 +92,22 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp appAttemptId = getApplicationAttemptId() resourceManager = registerWithResourceManager() - val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster() - // Compute number of threads for akka - val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory() - - if (minimumMemory > 0) { - val mem = (args.executorMemory * YarnAllocationHandler.MEMORY_OVERHEAD).ceil.toInt - val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0) - - if (numCore > 0) { - // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406 - // TODO: Uncomment when hadoop is on a version which has this fixed. - // args.workerCores = numCore - } - } + // // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406 + // // TODO: Uncomment when hadoop is on a version which has this fixed. + // val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster() + // // Compute number of threads for akka + // val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory() + // // Additional memory overhead - in mb. + // val memoryOverhead = sparkConf.getInt("spark.yarn.container.memoryOverhead", 384) + // + // if (minimumMemory > 0) { + // val mem = args.executorMemory + memoryOverhead + // val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0) + // if (numCore > 0) { + // args.workerCores = numCore + // } + // } waitForSparkMaster() diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index ca2f6ec00ff6..b46e300efcee 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -88,6 +88,9 @@ private[yarn] class YarnAllocationHandler( // Containers to be released in next request to RM private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean] + // Additional memory overhead - in mb. + private val memoryOverhead = sparkConf.getInt("spark.yarn.container.memoryOverhead", 384) + private val numExecutorsRunning = new AtomicInteger() // Used to generate a unique id per executor private val executorIdCounter = new AtomicInteger() @@ -99,8 +102,7 @@ private[yarn] class YarnAllocationHandler( def getNumExecutorsFailed: Int = numExecutorsFailed.intValue def isResourceConstraintSatisfied(container: Container): Boolean = { - container.getResource.getMemory >= (executorMemory * YarnAllocationHandler.MEMORY_OVERHEAD). - ceil.toInt + container.getResource.getMemory >= (executorMemory + memoryOverhead) } def allocateContainers(executorsToRequest: Int) { @@ -230,7 +232,7 @@ private[yarn] class YarnAllocationHandler( val containerId = container.getId assert( container.getResource.getMemory >= - (executorMemory * YarnAllocationHandler.MEMORY_OVERHEAD).ceil.toInt) + (executorMemory + memoryOverhead)) if (numExecutorsRunningNow > maxExecutors) { logInfo("""Ignoring container %s at host %s, since we already have the required number of @@ -451,7 +453,7 @@ private[yarn] class YarnAllocationHandler( if (numExecutors > 0) { logInfo("Allocating %d executor containers with %d of memory each.".format(numExecutors, - (executorMemory * YarnAllocationHandler.MEMORY_OVERHEAD)).ceil.toInt) + executorMemory + memoryOverhead)) } else { logDebug("Empty allocation req .. release : " + releasedContainerList) } @@ -506,7 +508,7 @@ private[yarn] class YarnAllocationHandler( val rsrcRequest = Records.newRecord(classOf[ResourceRequest]) val memCapability = Records.newRecord(classOf[Resource]) // There probably is some overhead here, let's reserve a bit more memory. - memCapability.setMemory((executorMemory * YarnAllocationHandler.MEMORY_OVERHEAD).ceil.toInt) + memCapability.setMemory(executorMemory + memoryOverhead) rsrcRequest.setCapability(memCapability) val pri = Records.newRecord(classOf[Priority]) @@ -545,9 +547,6 @@ object YarnAllocationHandler { // request types (like map/reduce in hadoop for example) val PRIORITY = 1 - // Additional memory overhead - val MEMORY_OVERHEAD = 1.25D - // Host to rack map - saved from allocation requests // We are expecting this not to change. // Note that it is possible for this to change : and RM will indicate that to us via update diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index aa9c7917bac2..771723db4fdb 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -58,6 +58,9 @@ trait ClientBase extends Logging { private val SPARK_STAGING: String = ".sparkStaging" private val distCacheMgr = new ClientDistributedCacheManager() + // Additional memory overhead - in mb. + val memoryOverhead = sparkConf.getInt("spark.yarn.container.memoryOverhead", 384) + // Staging directory is private! -> rwx-------- val STAGING_DIR_PERMISSION: FsPermission = FsPermission.createImmutable(Integer.parseInt("700", 8).toShort) @@ -71,7 +74,11 @@ trait ClientBase extends Logging { ((args.userJar == null && args.amClass == classOf[ApplicationMaster].getName) -> "Error: You must specify a user jar when running in standalone mode!"), (args.userClass == null) -> "Error: You must specify a user class!", - (args.numExecutors <= 0) -> "Error: You must specify at least 1 executor!" + (args.numExecutors <= 0) -> "Error: You must specify at least 1 executor!", + (args.amMemory <= memoryOverhead) -> ("Error: AM memory size must be" + + "greater than: " + memoryOverhead), + (args.executorMemory <= memoryOverhead) -> ("Error: Executor memory size" + + "must be greater than: " + memoryOverhead.toString) ).foreach { case(cond, errStr) => if (cond) { logError(errStr) @@ -94,7 +101,7 @@ trait ClientBase extends Logging { format(args.executorMemory, maxMem)) System.exit(1) } - val amMem = (args.amMemory * YarnAllocationHandler.MEMORY_OVERHEAD).ceil.toInt + val amMem = args.amMemory + memoryOverhead if (amMem > maxMem) { logError("Required AM memory (%d) is above the max threshold (%d) of this cluster". format(args.amMemory, maxMem)) diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 78cc343af53b..ac46bd39ce9b 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -84,7 +84,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa // Memory for the ApplicationMaster. val memoryResource = Records.newRecord(classOf[Resource]).asInstanceOf[Resource] - memoryResource.setMemory((args.amMemory * YarnAllocationHandler.MEMORY_OVERHEAD).ceil.toInt) + memoryResource.setMemory(args.amMemory + memoryOverhead) appContext.setResource(memoryResource) // Finally, submit and monitor the application. @@ -118,7 +118,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa // val minResMemory: Int = newApp.getMinimumResourceCapability().getMemory() // var amMemory = ((args.amMemory / minResMemory) * minResMemory) + // ((if ((args.amMemory % minResMemory) == 0) 0 else minResMemory) - - // YarnAllocationHandler.MEMORY_OVERHEAD) + // memoryOverhead ) args.amMemory } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index df176cc0701b..4304fc9a061c 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -90,6 +90,9 @@ private[yarn] class YarnAllocationHandler( // Containers to be released in next request to RM private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean] + // Additional memory overhead - in mb. + private val memoryOverhead = sparkConf.getInt("spark.yarn.container.memoryOverhead", 384) + // Number of container requests that have been sent to, but not yet allocated by the // ApplicationMaster. private val numPendingAllocate = new AtomicInteger() @@ -106,8 +109,7 @@ private[yarn] class YarnAllocationHandler( def getNumExecutorsFailed: Int = numExecutorsFailed.intValue def isResourceConstraintSatisfied(container: Container): Boolean = { - container.getResource.getMemory >= (executorMemory * YarnAllocationHandler.MEMORY_OVERHEAD). - ceil.toInt + container.getResource.getMemory >= (executorMemory + memoryOverhead) } def releaseContainer(container: Container) { @@ -249,8 +251,7 @@ private[yarn] class YarnAllocationHandler( val executorHostname = container.getNodeId.getHost val containerId = container.getId - val executorMemoryOverhead = (executorMemory * YarnAllocationHandler.MEMORY_OVERHEAD). - ceil.toInt + val executorMemoryOverhead = (executorMemory + memoryOverhead) assert(container.getResource.getMemory >= executorMemoryOverhead) if (numExecutorsRunningNow > maxExecutors) { @@ -479,7 +480,7 @@ private[yarn] class YarnAllocationHandler( numPendingAllocate.addAndGet(numExecutors) logInfo("Will Allocate %d executor containers, each with %d memory".format( numExecutors, - (executorMemory * YarnAllocationHandler.MEMORY_OVERHEAD).ceil.toInt)) + (executorMemory + memoryOverhead))) } else { logDebug("Empty allocation request ...") } @@ -539,7 +540,7 @@ private[yarn] class YarnAllocationHandler( priority: Int ): ArrayBuffer[ContainerRequest] = { - val memoryRequest = (executorMemory * YarnAllocationHandler.MEMORY_OVERHEAD).ceil.toInt + val memoryRequest = executorMemory + memoryOverhead val resource = Resource.newInstance(memoryRequest, executorCores) val prioritySetting = Records.newRecord(classOf[Priority]) @@ -560,9 +561,6 @@ object YarnAllocationHandler { // request types (like map/reduce in hadoop for example) val PRIORITY = 1 - // Additional memory overhead. - val MEMORY_OVERHEAD = 1.25D - // Host to rack map - saved from allocation requests. We are expecting this not to change. // Note that it is possible for this to change : and ResurceManager will indicate that to us via // update response to allocate. But we are punting on handling that for now. From 8fae45a149aadedd58b9591768f87288888ab7e0 Mon Sep 17 00:00:00 2001 From: witgo Date: Wed, 28 May 2014 14:21:14 +0800 Subject: [PATCH 03/11] fix NullPointerException --- .../scala/org/apache/spark/deploy/yarn/ClientBase.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index 771723db4fdb..cce1dfca7906 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -58,9 +58,6 @@ trait ClientBase extends Logging { private val SPARK_STAGING: String = ".sparkStaging" private val distCacheMgr = new ClientDistributedCacheManager() - // Additional memory overhead - in mb. - val memoryOverhead = sparkConf.getInt("spark.yarn.container.memoryOverhead", 384) - // Staging directory is private! -> rwx-------- val STAGING_DIR_PERMISSION: FsPermission = FsPermission.createImmutable(Integer.parseInt("700", 8).toShort) @@ -68,6 +65,9 @@ trait ClientBase extends Logging { val APP_FILE_PERMISSION: FsPermission = FsPermission.createImmutable(Integer.parseInt("644", 8).toShort) + // Additional memory overhead - in mb. + def memoryOverhead = sparkConf.getInt("spark.yarn.container.memoryOverhead", 384) + // TODO(harvey): This could just go in ClientArguments. def validateArgs() = { Map( From 9a6bcf256d893277eb8e408a2891546e0528b913 Mon Sep 17 00:00:00 2001 From: witgo Date: Wed, 28 May 2014 21:37:50 +0800 Subject: [PATCH 04/11] review commit --- .../apache/spark/deploy/yarn/YarnAllocationHandler.scala | 6 +++++- .../scala/org/apache/spark/deploy/yarn/ClientBase.scala | 3 ++- .../apache/spark/deploy/yarn/YarnAllocationHandler.scala | 8 +++++++- 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index b46e300efcee..bb554b3e3a76 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -89,7 +89,8 @@ private[yarn] class YarnAllocationHandler( private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean] // Additional memory overhead - in mb. - private val memoryOverhead = sparkConf.getInt("spark.yarn.container.memoryOverhead", 384) + private val memoryOverhead = sparkConf.getInt("spark.yarn.container.memoryOverhead", + YarnAllocationHandler.MEMORY_OVERHEAD) private val numExecutorsRunning = new AtomicInteger() // Used to generate a unique id per executor @@ -547,6 +548,9 @@ object YarnAllocationHandler { // request types (like map/reduce in hadoop for example) val PRIORITY = 1 + // Additional memory overhead - in mb + val MEMORY_OVERHEAD = 384 + // Host to rack map - saved from allocation requests // We are expecting this not to change. // Note that it is possible for this to change : and RM will indicate that to us via update diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index cce1dfca7906..c1ff410c34d8 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -66,7 +66,8 @@ trait ClientBase extends Logging { FsPermission.createImmutable(Integer.parseInt("644", 8).toShort) // Additional memory overhead - in mb. - def memoryOverhead = sparkConf.getInt("spark.yarn.container.memoryOverhead", 384) + def memoryOverhead = sparkConf.getInt("spark.yarn.container.memoryOverhead", + YarnAllocationHandler.MEMORY_OVERHEAD) // TODO(harvey): This could just go in ClientArguments. def validateArgs() = { diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 4304fc9a061c..77a517f9811d 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -90,8 +90,11 @@ private[yarn] class YarnAllocationHandler( // Containers to be released in next request to RM private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean] + // TODO: Here we can dynamically calculate the default value. + // eg: val memoryOverhead = (executorMemory * 0.25D).ceil.toInt // Additional memory overhead - in mb. - private val memoryOverhead = sparkConf.getInt("spark.yarn.container.memoryOverhead", 384) + private val memoryOverhead = sparkConf.getInt("spark.yarn.container.memoryOverhead", + YarnAllocationHandler.MEMORY_OVERHEAD) // Number of container requests that have been sent to, but not yet allocated by the // ApplicationMaster. @@ -561,6 +564,9 @@ object YarnAllocationHandler { // request types (like map/reduce in hadoop for example) val PRIORITY = 1 + // Additional memory overhead - in mb. + val MEMORY_OVERHEAD = 384 + // Host to rack map - saved from allocation requests. We are expecting this not to change. // Note that it is possible for this to change : and ResurceManager will indicate that to us via // update response to allocate. But we are punting on handling that for now. From ffa75696e5ec70b24d8c000bc25db6e84a35b28c Mon Sep 17 00:00:00 2001 From: witgo Date: Sat, 31 May 2014 15:29:51 +0800 Subject: [PATCH 05/11] review commit --- .../spark/deploy/yarn/ExecutorLauncher.scala | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index 05e18eb7b7e1..f9a33998e7ad 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -93,21 +93,21 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp appAttemptId = getApplicationAttemptId() resourceManager = registerWithResourceManager() - // // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406 - // // TODO: Uncomment when hadoop is on a version which has this fixed. - // val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster() - // // Compute number of threads for akka - // val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory() - // // Additional memory overhead - in mb. - // val memoryOverhead = sparkConf.getInt("spark.yarn.container.memoryOverhead", 384) - // - // if (minimumMemory > 0) { - // val mem = args.executorMemory + memoryOverhead - // val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0) - // if (numCore > 0) { - // args.workerCores = numCore - // } - // } + val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster() + + // Compute number of threads for akka + val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory() + + if (minimumMemory > 0) { + val mem = args.executorMemory + sparkConf.getInt("spark.yarn.container.memoryOverhead", 384) + val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0) + + if (numCore > 0) { + // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406 + // TODO: Uncomment when hadoop is on a version which has this fixed. + // args.workerCores = numCore + } + } waitForSparkMaster() From e16f1900362f2cb13131c3e29570230041cc6c7f Mon Sep 17 00:00:00 2001 From: witgo Date: Wed, 4 Jun 2014 13:37:28 +0800 Subject: [PATCH 06/11] different memoryOverhead --- .../deploy/yarn/YarnAllocationHandler.scala | 12 ++++++++++-- .../apache/spark/deploy/yarn/ClientBase.scala | 12 ++++++++++-- .../deploy/yarn/YarnAllocationHandler.scala | 17 +++++++++++++---- 3 files changed, 33 insertions(+), 8 deletions(-) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index bb554b3e3a76..13001438afcf 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -89,8 +89,16 @@ private[yarn] class YarnAllocationHandler( private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean] // Additional memory overhead - in mb. - private val memoryOverhead = sparkConf.getInt("spark.yarn.container.memoryOverhead", - YarnAllocationHandler.MEMORY_OVERHEAD) + private def memoryOverhead: Int = { + var defaultMemoryOverhead = YarnAllocationHandler.MEMORY_OVERHEAD + sparkConf.getOption("spark.yarn.container.memoryOverhead").foreach { s => + defaultMemoryOverhead = s.toInt + } + sparkConf.getOption(s"spark.yarn.executor.memoryOverhead").foreach { s => + defaultMemoryOverhead = s.toInt + } + defaultMemoryOverhead + } private val numExecutorsRunning = new AtomicInteger() // Used to generate a unique id per executor diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index c1ff410c34d8..ced4ed266f0f 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -66,8 +66,16 @@ trait ClientBase extends Logging { FsPermission.createImmutable(Integer.parseInt("644", 8).toShort) // Additional memory overhead - in mb. - def memoryOverhead = sparkConf.getInt("spark.yarn.container.memoryOverhead", - YarnAllocationHandler.MEMORY_OVERHEAD) + protected def memoryOverhead: Int = { + var defaultMemoryOverhead = (args.amMemory * 0.25D).ceil.toInt + sparkConf.getOption("spark.yarn.container.memoryOverhead").foreach { s => + defaultMemoryOverhead = s.toInt + } + sparkConf.getOption(s"spark.yarn.driver.memoryOverhead").foreach { s => + defaultMemoryOverhead = s.toInt + } + defaultMemoryOverhead + } // TODO(harvey): This could just go in ClientArguments. def validateArgs() = { diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 77a517f9811d..233a3f8433ec 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -90,11 +90,20 @@ private[yarn] class YarnAllocationHandler( // Containers to be released in next request to RM private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean] - // TODO: Here we can dynamically calculate the default value. - // eg: val memoryOverhead = (executorMemory * 0.25D).ceil.toInt // Additional memory overhead - in mb. - private val memoryOverhead = sparkConf.getInt("spark.yarn.container.memoryOverhead", - YarnAllocationHandler.MEMORY_OVERHEAD) + private def memoryOverhead: Int = { + // TODO: Here we can dynamically calculate the default value. + // eg: val defaultMemoryOverhead = (executorMemory * 0.25D).ceil.toInt + var defaultMemoryOverhead = YarnAllocationHandler.MEMORY_OVERHEAD + sparkConf.getOption("spark.yarn.container.memoryOverhead").foreach { s => + defaultMemoryOverhead = s.toInt + } + sparkConf.getOption(s"spark.yarn.executor.memoryOverhead").foreach { s => + defaultMemoryOverhead = s.toInt + } + defaultMemoryOverhead + } + // Number of container requests that have been sent to, but not yet allocated by the // ApplicationMaster. From e3c531d515abb090bf954c644ec7d5897c44b5ea Mon Sep 17 00:00:00 2001 From: witgo Date: Fri, 6 Jun 2014 17:33:41 +0800 Subject: [PATCH 07/11] review commit --- .../main/scala/org/apache/spark/deploy/yarn/ClientBase.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index ced4ed266f0f..1c5a5007c6e5 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -67,7 +67,7 @@ trait ClientBase extends Logging { // Additional memory overhead - in mb. protected def memoryOverhead: Int = { - var defaultMemoryOverhead = (args.amMemory * 0.25D).ceil.toInt + var defaultMemoryOverhead = YarnAllocationHandler.MEMORY_OVERHEAD sparkConf.getOption("spark.yarn.container.memoryOverhead").foreach { s => defaultMemoryOverhead = s.toInt } From 655a820312c2998d934288a6ad8bd3fe75bbe62f Mon Sep 17 00:00:00 2001 From: witgo Date: Fri, 6 Jun 2014 17:42:19 +0800 Subject: [PATCH 08/11] review commit --- .../org/apache/spark/deploy/yarn/YarnAllocationHandler.scala | 2 +- .../main/scala/org/apache/spark/deploy/yarn/ClientBase.scala | 2 +- .../org/apache/spark/deploy/yarn/YarnAllocationHandler.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 13001438afcf..1d3a019b53b1 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -94,7 +94,7 @@ private[yarn] class YarnAllocationHandler( sparkConf.getOption("spark.yarn.container.memoryOverhead").foreach { s => defaultMemoryOverhead = s.toInt } - sparkConf.getOption(s"spark.yarn.executor.memoryOverhead").foreach { s => + sparkConf.getOption("spark.yarn.executor.memoryOverhead").foreach { s => defaultMemoryOverhead = s.toInt } defaultMemoryOverhead diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index 1c5a5007c6e5..dc8429e5fd08 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -71,7 +71,7 @@ trait ClientBase extends Logging { sparkConf.getOption("spark.yarn.container.memoryOverhead").foreach { s => defaultMemoryOverhead = s.toInt } - sparkConf.getOption(s"spark.yarn.driver.memoryOverhead").foreach { s => + sparkConf.getOption("spark.yarn.driver.memoryOverhead").foreach { s => defaultMemoryOverhead = s.toInt } defaultMemoryOverhead diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 233a3f8433ec..a517dc8efb22 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -98,7 +98,7 @@ private[yarn] class YarnAllocationHandler( sparkConf.getOption("spark.yarn.container.memoryOverhead").foreach { s => defaultMemoryOverhead = s.toInt } - sparkConf.getOption(s"spark.yarn.executor.memoryOverhead").foreach { s => + sparkConf.getOption("spark.yarn.executor.memoryOverhead").foreach { s => defaultMemoryOverhead = s.toInt } defaultMemoryOverhead From a0ff545715f87dc82be36b0a76a0da5c5edce24d Mon Sep 17 00:00:00 2001 From: witgo Date: Fri, 13 Jun 2014 23:03:55 +0800 Subject: [PATCH 09/11] leaving only two configs --- .../spark/deploy/yarn/ExecutorLauncher.scala | 3 ++- .../spark/deploy/yarn/YarnAllocationHandler.scala | 12 ++---------- .../org/apache/spark/deploy/yarn/ClientBase.scala | 12 ++---------- .../spark/deploy/yarn/YarnAllocationHandler.scala | 15 ++------------- 4 files changed, 8 insertions(+), 34 deletions(-) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index 0134365297e4..bfdb6232f511 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -99,7 +99,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory() if (minimumMemory > 0) { - val mem = args.executorMemory + sparkConf.getInt("spark.yarn.container.memoryOverhead", 384) + val mem = args.executorMemory + sparkConf.getInt("spark.yarn.executor.memoryOverhead", + YarnAllocationHandler.MEMORY_OVERHEAD) val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0) if (numCore > 0) { diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 1d3a019b53b1..80e0162e9f27 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -89,16 +89,8 @@ private[yarn] class YarnAllocationHandler( private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean] // Additional memory overhead - in mb. - private def memoryOverhead: Int = { - var defaultMemoryOverhead = YarnAllocationHandler.MEMORY_OVERHEAD - sparkConf.getOption("spark.yarn.container.memoryOverhead").foreach { s => - defaultMemoryOverhead = s.toInt - } - sparkConf.getOption("spark.yarn.executor.memoryOverhead").foreach { s => - defaultMemoryOverhead = s.toInt - } - defaultMemoryOverhead - } + private def memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead", + YarnAllocationHandler.MEMORY_OVERHEAD) private val numExecutorsRunning = new AtomicInteger() // Used to generate a unique id per executor diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index ef0b6453d685..858bcaa95b40 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -66,16 +66,8 @@ trait ClientBase extends Logging { FsPermission.createImmutable(Integer.parseInt("644", 8).toShort) // Additional memory overhead - in mb. - protected def memoryOverhead: Int = { - var defaultMemoryOverhead = YarnAllocationHandler.MEMORY_OVERHEAD - sparkConf.getOption("spark.yarn.container.memoryOverhead").foreach { s => - defaultMemoryOverhead = s.toInt - } - sparkConf.getOption("spark.yarn.driver.memoryOverhead").foreach { s => - defaultMemoryOverhead = s.toInt - } - defaultMemoryOverhead - } + protected def memoryOverhead: Int = sparkConf.getInt("spark.yarn.driver.memoryOverhead", + YarnAllocationHandler.MEMORY_OVERHEAD) // TODO(harvey): This could just go in ClientArguments. def validateArgs() = { diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index a517dc8efb22..29ccec2adcac 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -91,19 +91,8 @@ private[yarn] class YarnAllocationHandler( private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean] // Additional memory overhead - in mb. - private def memoryOverhead: Int = { - // TODO: Here we can dynamically calculate the default value. - // eg: val defaultMemoryOverhead = (executorMemory * 0.25D).ceil.toInt - var defaultMemoryOverhead = YarnAllocationHandler.MEMORY_OVERHEAD - sparkConf.getOption("spark.yarn.container.memoryOverhead").foreach { s => - defaultMemoryOverhead = s.toInt - } - sparkConf.getOption("spark.yarn.executor.memoryOverhead").foreach { s => - defaultMemoryOverhead = s.toInt - } - defaultMemoryOverhead - } - + private def memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead", + YarnAllocationHandler.MEMORY_OVERHEAD) // Number of container requests that have been sent to, but not yet allocated by the // ApplicationMaster. From 172647bb762dc16aa12b06cd970380d95853ec62 Mon Sep 17 00:00:00 2001 From: witgo Date: Sat, 14 Jun 2014 00:22:38 +0800 Subject: [PATCH 10/11] add memoryOverhead docs --- docs/running-on-yarn.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index af1788f2aa15..d892d640e867 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -67,6 +67,20 @@ Most of the configs are the same for Spark on YARN as for other deployment modes The address of the Spark history server (i.e. host.com:18080). The address should not contain a scheme (http://). Defaults to not being set since the history server is an optional service. This address is given to the YARN ResourceManager when the Spark application finishes to link the application from the ResourceManager UI to the Spark history server UI. + + spark.yarn.executor.memoryOverhead + 384 + + Additional memory overhead in mb for executor. + + + + spark.yarn.driver.memoryOverhead + 384 + + Additional memory overhead in mb for driver. + + By default, Spark on YARN will use a Spark jar installed locally, but the Spark JAR can also be in a world-readable location on HDFS. This allows YARN to cache it on nodes so that it doesn't need to be distributed each time an application runs. To point to a JAR on HDFS, `export SPARK_JAR=hdfs:///some/path`. From 564307e1ed962a7ff4927a20fbee29eec1fa332e Mon Sep 17 00:00:00 2001 From: witgo Date: Mon, 16 Jun 2014 23:15:03 +0800 Subject: [PATCH 11/11] Update the running-on-yarn.md --- docs/running-on-yarn.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index d892d640e867..4243ef480ba3 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -71,14 +71,14 @@ Most of the configs are the same for Spark on YARN as for other deployment modes spark.yarn.executor.memoryOverhead 384 - Additional memory overhead in mb for executor. + The amount of off heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. spark.yarn.driver.memoryOverhead 384 - Additional memory overhead in mb for driver. + The amount of off heap memory (in megabytes) to be allocated per driver. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc.