Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
def run() {
val appId = runApp()
monitorApplication(appId)
System.exit(0)
}

def logClusterResourceDetails() {
Expand Down Expand Up @@ -179,8 +178,17 @@ object Client {
System.setProperty("SPARK_YARN_MODE", "true")

val sparkConf = new SparkConf
val args = new ClientArguments(argStrings, sparkConf)

new Client(args, sparkConf).run
try {
val args = new ClientArguments(argStrings, sparkConf)
new Client(args, sparkConf).run()
} catch {
case e: Exception => {
Console.err.println(e.getMessage)
System.exit(1)
}
}

System.exit(0)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,11 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {

case Nil =>
if (userClass == null) {
printUsageAndExit(1)
throw new IllegalArgumentException(getUsageMessage())
}

case _ =>
printUsageAndExit(1, args)
throw new IllegalArgumentException(getUsageMessage(args))
}
}

Expand All @@ -138,11 +138,10 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
}


def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
if (unknownParam != null) {
System.err.println("Unknown/unsupported param " + unknownParam)
}
System.err.println(
def getUsageMessage(unknownParam: Any = null): String = {
val message = if (unknownParam != null) s"Unknown/unsupported param $unknownParam\n" else ""

message +
"Usage: org.apache.spark.deploy.yarn.Client [options] \n" +
"Options:\n" +
" --jar JAR_PATH Path to your application's JAR file (required in yarn-cluster mode)\n" +
Expand All @@ -158,8 +157,5 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
" --addJars jars Comma separated list of local jars that want SparkContext.addJar to work with.\n" +
" --files files Comma separated list of files to be distributed with the job.\n" +
" --archives archives Comma separated list of archives to be distributed with the job."
)
System.exit(exitCode)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.util.Records
import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.{SparkException, Logging, SparkConf, SparkContext}

/**
* The entry point (starting in Client#main() and Client#run()) for launching Spark on YARN. The
Expand Down Expand Up @@ -79,7 +79,7 @@ trait ClientBase extends Logging {
).foreach { case(cond, errStr) =>
if (cond) {
logError(errStr)
args.printUsageAndExit(1)
throw new IllegalArgumentException(args.getUsageMessage())
}
}
}
Expand All @@ -94,15 +94,20 @@ trait ClientBase extends Logging {

// If we have requested more then the clusters max for a single resource then exit.
if (args.executorMemory > maxMem) {
logError("Required executor memory (%d MB), is above the max threshold (%d MB) of this cluster.".
format(args.executorMemory, maxMem))
System.exit(1)
val errorMessage =
"Required executor memory (%d MB), is above the max threshold (%d MB) of this cluster."
.format(args.executorMemory, maxMem)

logError(errorMessage)
throw new IllegalArgumentException(errorMessage)
}
val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD
if (amMem > maxMem) {
logError("Required AM memory (%d) is above the max threshold (%d) of this cluster".
format(args.amMemory, maxMem))
System.exit(1)

val errorMessage = "Required AM memory (%d) is above the max threshold (%d) of this cluster."
.format(args.amMemory, maxMem)
logError(errorMessage)
throw new IllegalArgumentException(errorMessage)
}

// We could add checks to make sure the entire cluster has enough resources but that involves
Expand Down Expand Up @@ -186,8 +191,9 @@ trait ClientBase extends Logging {
val delegTokenRenewer = Master.getMasterPrincipal(conf)
if (UserGroupInformation.isSecurityEnabled()) {
if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
logError("Can't get Master Kerberos principal for use as renewer")
System.exit(1)
val errorMessage = "Can't get Master Kerberos principal for use as renewer"
logError(errorMessage)
throw new SparkException(errorMessage)
}
}
val dst = new Path(fs.getHomeDirectory(), appStagingDir)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
def run() {
val appId = runApp()
monitorApplication(appId)
System.exit(0)
}

def logClusterResourceDetails() {
Expand Down Expand Up @@ -186,9 +185,18 @@ object Client {
// see Client#setupLaunchEnv().
System.setProperty("SPARK_YARN_MODE", "true")
val sparkConf = new SparkConf()
val args = new ClientArguments(argStrings, sparkConf)

new Client(args, sparkConf).run()
try {
val args = new ClientArguments(argStrings, sparkConf)
new Client(args, sparkConf).run()
} catch {
case e: Exception => {
Console.err.println(e.getMessage)
System.exit(1)
}
}

System.exit(0)
}

}