From e4653da61e23ec45aff3cd34f3466d3ee2e3f85b Mon Sep 17 00:00:00 2001 From: "twinkle.sachdeva" Date: Thu, 24 Jul 2014 15:41:58 +0530 Subject: [PATCH] SPARK-2604: Incorporating memory overhead of executor during the verifcation of cluster resources, before starting the application --- .../org/apache/spark/deploy/yarn/Client.scala | 2 +- .../apache/spark/deploy/yarn/ClientBase.scala | 29 +++++++++++-------- .../org/apache/spark/deploy/yarn/Client.scala | 2 +- 3 files changed, 19 insertions(+), 14 deletions(-) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 82f79d88a3009..a7e240d18fbbb 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -71,7 +71,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa val capability = Records.newRecord(classOf[Resource]).asInstanceOf[Resource] // Memory for the ApplicationMaster. - capability.setMemory(args.amMemory + memoryOverhead) + capability.setMemory(args.amMemory + amMemoryOverhead) amContainer.setResource(capability) appContext.setQueue(args.amQueue) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index a1298e8f30b5c..64fb6bedb1252 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -65,8 +65,12 @@ trait ClientBase extends Logging { val APP_FILE_PERMISSION: FsPermission = FsPermission.createImmutable(Integer.parseInt("644", 8).toShort) - // Additional memory overhead - in mb. - protected def memoryOverhead: Int = sparkConf.getInt("spark.yarn.driver.memoryOverhead", + // Additional memory overhead for Application Master - in mb. + protected def amMemoryOverhead: Int = sparkConf.getInt("spark.yarn.driver.memoryOverhead", + YarnAllocationHandler.MEMORY_OVERHEAD) + + // Additional memory overhead for Executor - in mb. + private def execMemoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead", YarnAllocationHandler.MEMORY_OVERHEAD) // TODO(harvey): This could just go in ClientArguments. @@ -76,10 +80,10 @@ trait ClientBase extends Logging { "Error: You must specify a user jar when running in standalone mode!"), (args.userClass == null) -> "Error: You must specify a user class!", (args.numExecutors <= 0) -> "Error: You must specify at least 1 executor!", - (args.amMemory <= memoryOverhead) -> ("Error: AM memory size must be" + - "greater than: " + memoryOverhead), - (args.executorMemory <= memoryOverhead) -> ("Error: Executor memory size" + - "must be greater than: " + memoryOverhead.toString) + (args.amMemory <= amMemoryOverhead) -> ("Error: AM memory size must be" + + "greater than: " + amMemoryOverhead), + (args.executorMemory <= execMemoryOverhead) -> ("Error: Executor memory size" + + "must be greater than: " + execMemoryOverhead.toString) ).foreach { case(cond, errStr) => if (cond) { logError(errStr) @@ -97,19 +101,20 @@ trait ClientBase extends Logging { logInfo("Max mem capabililty of a single resource in this cluster " + maxMem) // If we have requested more then the clusters max for a single resource then exit. - if (args.executorMemory > maxMem) { + val execMem = args.executorMemory + execMemoryOverhead + if (execMem > maxMem) { val errorMessage = - "Required executor memory (%d MB), is above the max threshold (%d MB) of this cluster." - .format(args.executorMemory, maxMem) + "Required executor memory (%d MB) along with overhead ( %d MB), is above the max threshold (%d MB) of this cluster." + .format(args.executorMemory, execMemoryOverhead, maxMem) logError(errorMessage) throw new IllegalArgumentException(errorMessage) } - val amMem = args.amMemory + memoryOverhead + val amMem = args.amMemory + amMemoryOverhead if (amMem > maxMem) { - val errorMessage = "Required AM memory (%d) is above the max threshold (%d) of this cluster." - .format(args.amMemory, maxMem) + val errorMessage = "Required AM memory (%d) alng with overhead, is above the max threshold (%d) of this cluster." + .format(args.amMemory, amMemoryOverhead, maxMem) logError(errorMessage) throw new IllegalArgumentException(errorMessage) } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 15f3c4f180ea3..bcbe24ae309d3 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -84,7 +84,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa // Memory for the ApplicationMaster. val memoryResource = Records.newRecord(classOf[Resource]).asInstanceOf[Resource] - memoryResource.setMemory(args.amMemory + memoryOverhead) + memoryResource.setMemory(args.amMemory + amMemoryOverhead) appContext.setResource(memoryResource) // Finally, submit and monitor the application.