Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa

val capability = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
// Memory for the ApplicationMaster.
capability.setMemory(args.amMemory + memoryOverhead)
capability.setMemory(args.amMemory + amMemoryOverhead)
amContainer.setResource(capability)

appContext.setQueue(args.amQueue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,12 @@ trait ClientBase extends Logging {
val APP_FILE_PERMISSION: FsPermission =
FsPermission.createImmutable(Integer.parseInt("644", 8).toShort)

// Additional memory overhead - in mb.
protected def memoryOverhead: Int = sparkConf.getInt("spark.yarn.driver.memoryOverhead",
// Additional memory overhead for Application Master - in mb.
protected def amMemoryOverhead: Int = sparkConf.getInt("spark.yarn.driver.memoryOverhead",
YarnAllocationHandler.MEMORY_OVERHEAD)

// Additional memory overhead for Executor - in mb.
private def execMemoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
YarnAllocationHandler.MEMORY_OVERHEAD)

// TODO(harvey): This could just go in ClientArguments.
Expand All @@ -76,10 +80,10 @@ trait ClientBase extends Logging {
"Error: You must specify a user jar when running in standalone mode!"),
(args.userClass == null) -> "Error: You must specify a user class!",
(args.numExecutors <= 0) -> "Error: You must specify at least 1 executor!",
(args.amMemory <= memoryOverhead) -> ("Error: AM memory size must be" +
"greater than: " + memoryOverhead),
(args.executorMemory <= memoryOverhead) -> ("Error: Executor memory size" +
"must be greater than: " + memoryOverhead.toString)
(args.amMemory <= amMemoryOverhead) -> ("Error: AM memory size must be" +
"greater than: " + amMemoryOverhead),
(args.executorMemory <= execMemoryOverhead) -> ("Error: Executor memory size" +
"must be greater than: " + execMemoryOverhead.toString)
).foreach { case(cond, errStr) =>
if (cond) {
logError(errStr)
Expand All @@ -97,19 +101,20 @@ trait ClientBase extends Logging {
logInfo("Max mem capabililty of a single resource in this cluster " + maxMem)

// If we have requested more then the clusters max for a single resource then exit.
if (args.executorMemory > maxMem) {
val execMem = args.executorMemory + execMemoryOverhead
if (execMem > maxMem) {
val errorMessage =
"Required executor memory (%d MB), is above the max threshold (%d MB) of this cluster."
.format(args.executorMemory, maxMem)
"Required executor memory (%d MB) along with overhead ( %d MB), is above the max threshold (%d MB) of this cluster."
.format(args.executorMemory, execMemoryOverhead, maxMem)

logError(errorMessage)
throw new IllegalArgumentException(errorMessage)
}
val amMem = args.amMemory + memoryOverhead
val amMem = args.amMemory + amMemoryOverhead
if (amMem > maxMem) {

val errorMessage = "Required AM memory (%d) is above the max threshold (%d) of this cluster."
.format(args.amMemory, maxMem)
val errorMessage = "Required AM memory (%d) alng with overhead, is above the max threshold (%d) of this cluster."
.format(args.amMemory, amMemoryOverhead, maxMem)
logError(errorMessage)
throw new IllegalArgumentException(errorMessage)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa

// Memory for the ApplicationMaster.
val memoryResource = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
memoryResource.setMemory(args.amMemory + memoryOverhead)
memoryResource.setMemory(args.amMemory + amMemoryOverhead)
appContext.setResource(memoryResource)

// Finally, submit and monitor the application.
Expand Down