From 138cb48f7bcd3eff60865df516925ff0b96d1ad9 Mon Sep 17 00:00:00 2001 From: John Zhao Date: Tue, 22 Apr 2014 15:58:35 -0700 Subject: [PATCH] [SPARK-1516]Throw exception in yarn clinet instead of run system.exit directly. All the changes is in the package of "org.apache.spark.deploy.yarn": 1) Add a ClientException with an exitCode 2) Throws exception in ClinetArguments and ClientBase instead of exit directly 3) in Client's main method, catch exception and exit with the exitCode. After the fix, if user integrate the spark yarn cline into their applications, when the argument is wrong or the running is finished, the application will not exit. And the exit only happens in command line running. --- .../org/apache/spark/deploy/yarn/Client.scala | 14 +++++++--- .../spark/deploy/yarn/ClientArguments.scala | 16 +++++------- .../apache/spark/deploy/yarn/ClientBase.scala | 26 ++++++++++++------- .../org/apache/spark/deploy/yarn/Client.scala | 14 +++++++--- 4 files changed, 44 insertions(+), 26 deletions(-) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 8226207de42b8..4ccddc214c8ad 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -85,7 +85,6 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa def run() { val appId = runApp() monitorApplication(appId) - System.exit(0) } def logClusterResourceDetails() { @@ -179,8 +178,17 @@ object Client { System.setProperty("SPARK_YARN_MODE", "true") val sparkConf = new SparkConf - val args = new ClientArguments(argStrings, sparkConf) - new Client(args, sparkConf).run + try { + val args = new ClientArguments(argStrings, sparkConf) + new Client(args, sparkConf).run() + } catch { + case e: Exception => { + Console.err.println(e.getMessage) + System.exit(1) + } + } + + System.exit(0) } } diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index b2c413b6d267c..fd3ef9e1fa2de 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -125,11 +125,11 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { case Nil => if (userClass == null) { - printUsageAndExit(1) + throw new IllegalArgumentException(getUsageMessage()) } case _ => - printUsageAndExit(1, args) + throw new IllegalArgumentException(getUsageMessage(args)) } } @@ -138,11 +138,10 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { } - def printUsageAndExit(exitCode: Int, unknownParam: Any = null) { - if (unknownParam != null) { - System.err.println("Unknown/unsupported param " + unknownParam) - } - System.err.println( + def getUsageMessage(unknownParam: Any = null): String = { + val message = if (unknownParam != null) s"Unknown/unsupported param $unknownParam\n" else "" + + message + "Usage: org.apache.spark.deploy.yarn.Client [options] \n" + "Options:\n" + " --jar JAR_PATH Path to your application's JAR file (required in yarn-cluster mode)\n" + @@ -158,8 +157,5 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { " --addJars jars Comma separated list of local jars that want SparkContext.addJar to work with.\n" + " --files files Comma separated list of files to be distributed with the job.\n" + " --archives archives Comma separated list of archives to be distributed with the job." - ) - System.exit(exitCode) } - } diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index 29a35680c0e72..6861b503000ca 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -37,7 +37,7 @@ import org.apache.hadoop.yarn.api.protocolrecords._ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.util.Records -import org.apache.spark.{Logging, SparkConf, SparkContext} +import org.apache.spark.{SparkException, Logging, SparkConf, SparkContext} /** * The entry point (starting in Client#main() and Client#run()) for launching Spark on YARN. The @@ -79,7 +79,7 @@ trait ClientBase extends Logging { ).foreach { case(cond, errStr) => if (cond) { logError(errStr) - args.printUsageAndExit(1) + throw new IllegalArgumentException(args.getUsageMessage()) } } } @@ -94,15 +94,20 @@ trait ClientBase extends Logging { // If we have requested more then the clusters max for a single resource then exit. if (args.executorMemory > maxMem) { - logError("Required executor memory (%d MB), is above the max threshold (%d MB) of this cluster.". - format(args.executorMemory, maxMem)) - System.exit(1) + val errorMessage = + "Required executor memory (%d MB), is above the max threshold (%d MB) of this cluster." + .format(args.executorMemory, maxMem) + + logError(errorMessage) + throw new IllegalArgumentException(errorMessage) } val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD if (amMem > maxMem) { - logError("Required AM memory (%d) is above the max threshold (%d) of this cluster". - format(args.amMemory, maxMem)) - System.exit(1) + + val errorMessage = "Required AM memory (%d) is above the max threshold (%d) of this cluster." + .format(args.amMemory, maxMem) + logError(errorMessage) + throw new IllegalArgumentException(errorMessage) } // We could add checks to make sure the entire cluster has enough resources but that involves @@ -186,8 +191,9 @@ trait ClientBase extends Logging { val delegTokenRenewer = Master.getMasterPrincipal(conf) if (UserGroupInformation.isSecurityEnabled()) { if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) { - logError("Can't get Master Kerberos principal for use as renewer") - System.exit(1) + val errorMessage = "Can't get Master Kerberos principal for use as renewer" + logError(errorMessage) + throw new SparkException(errorMessage) } } val dst = new Path(fs.getHomeDirectory(), appStagingDir) diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 24027618c1f35..80a8bceb17269 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -95,7 +95,6 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa def run() { val appId = runApp() monitorApplication(appId) - System.exit(0) } def logClusterResourceDetails() { @@ -186,9 +185,18 @@ object Client { // see Client#setupLaunchEnv(). System.setProperty("SPARK_YARN_MODE", "true") val sparkConf = new SparkConf() - val args = new ClientArguments(argStrings, sparkConf) - new Client(args, sparkConf).run() + try { + val args = new ClientArguments(argStrings, sparkConf) + new Client(args, sparkConf).run() + } catch { + case e: Exception => { + Console.err.println(e.getMessage) + System.exit(1) + } + } + + System.exit(0) } }