Skip to content

Commit f95ac68

Browse files
codeboyyongmengxr
authored andcommitted
[SPARK-1516]Throw exception in yarn client instead of run system.exit directly.
All the changes is in the package of "org.apache.spark.deploy.yarn": 1) Throw exception in ClinetArguments and ClientBase instead of exit directly. 2) in Client's main method, if exception is caught, it will exit with code 1, otherwise exit with code 0. After the fix, if user integrate the spark yarn client into their applications, when the argument is wrong or the running is finished, the application won't be terminated. Author: John Zhao <[email protected]> Closes apache#490 from codeboyyong/jira_1516_systemexit_inyarnclient and squashes the following commits: 138cb48 [John Zhao] [SPARK-1516]Throw exception in yarn clinet instead of run system.exit directly. All the changes is in the package of "org.apache.spark.deploy.yarn": 1) Add a ClientException with an exitCode 2) Throws exception in ClinetArguments and ClientBase instead of exit directly 3) in Client's main method, catch exception and exit with the exitCode.
1 parent 44daec5 commit f95ac68

File tree

4 files changed

+44
-26
lines changed

4 files changed

+44
-26
lines changed

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
8585
def run() {
8686
val appId = runApp()
8787
monitorApplication(appId)
88-
System.exit(0)
8988
}
9089

9190
def logClusterResourceDetails() {
@@ -179,8 +178,17 @@ object Client {
179178
System.setProperty("SPARK_YARN_MODE", "true")
180179

181180
val sparkConf = new SparkConf
182-
val args = new ClientArguments(argStrings, sparkConf)
183181

184-
new Client(args, sparkConf).run
182+
try {
183+
val args = new ClientArguments(argStrings, sparkConf)
184+
new Client(args, sparkConf).run()
185+
} catch {
186+
case e: Exception => {
187+
Console.err.println(e.getMessage)
188+
System.exit(1)
189+
}
190+
}
191+
192+
System.exit(0)
185193
}
186194
}

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -125,11 +125,11 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
125125

126126
case Nil =>
127127
if (userClass == null) {
128-
printUsageAndExit(1)
128+
throw new IllegalArgumentException(getUsageMessage())
129129
}
130130

131131
case _ =>
132-
printUsageAndExit(1, args)
132+
throw new IllegalArgumentException(getUsageMessage(args))
133133
}
134134
}
135135

@@ -138,11 +138,10 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
138138
}
139139

140140

141-
def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
142-
if (unknownParam != null) {
143-
System.err.println("Unknown/unsupported param " + unknownParam)
144-
}
145-
System.err.println(
141+
def getUsageMessage(unknownParam: Any = null): String = {
142+
val message = if (unknownParam != null) s"Unknown/unsupported param $unknownParam\n" else ""
143+
144+
message +
146145
"Usage: org.apache.spark.deploy.yarn.Client [options] \n" +
147146
"Options:\n" +
148147
" --jar JAR_PATH Path to your application's JAR file (required in yarn-cluster mode)\n" +
@@ -158,8 +157,5 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
158157
" --addJars jars Comma separated list of local jars that want SparkContext.addJar to work with.\n" +
159158
" --files files Comma separated list of files to be distributed with the job.\n" +
160159
" --archives archives Comma separated list of archives to be distributed with the job."
161-
)
162-
System.exit(exitCode)
163160
}
164-
165161
}

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import org.apache.hadoop.yarn.api.protocolrecords._
3737
import org.apache.hadoop.yarn.api.records._
3838
import org.apache.hadoop.yarn.conf.YarnConfiguration
3939
import org.apache.hadoop.yarn.util.Records
40-
import org.apache.spark.{Logging, SparkConf, SparkContext}
40+
import org.apache.spark.{SparkException, Logging, SparkConf, SparkContext}
4141

4242
/**
4343
* The entry point (starting in Client#main() and Client#run()) for launching Spark on YARN. The
@@ -79,7 +79,7 @@ trait ClientBase extends Logging {
7979
).foreach { case(cond, errStr) =>
8080
if (cond) {
8181
logError(errStr)
82-
args.printUsageAndExit(1)
82+
throw new IllegalArgumentException(args.getUsageMessage())
8383
}
8484
}
8585
}
@@ -94,15 +94,20 @@ trait ClientBase extends Logging {
9494

9595
// If we have requested more then the clusters max for a single resource then exit.
9696
if (args.executorMemory > maxMem) {
97-
logError("Required executor memory (%d MB), is above the max threshold (%d MB) of this cluster.".
98-
format(args.executorMemory, maxMem))
99-
System.exit(1)
97+
val errorMessage =
98+
"Required executor memory (%d MB), is above the max threshold (%d MB) of this cluster."
99+
.format(args.executorMemory, maxMem)
100+
101+
logError(errorMessage)
102+
throw new IllegalArgumentException(errorMessage)
100103
}
101104
val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD
102105
if (amMem > maxMem) {
103-
logError("Required AM memory (%d) is above the max threshold (%d) of this cluster".
104-
format(args.amMemory, maxMem))
105-
System.exit(1)
106+
107+
val errorMessage = "Required AM memory (%d) is above the max threshold (%d) of this cluster."
108+
.format(args.amMemory, maxMem)
109+
logError(errorMessage)
110+
throw new IllegalArgumentException(errorMessage)
106111
}
107112

108113
// We could add checks to make sure the entire cluster has enough resources but that involves
@@ -186,8 +191,9 @@ trait ClientBase extends Logging {
186191
val delegTokenRenewer = Master.getMasterPrincipal(conf)
187192
if (UserGroupInformation.isSecurityEnabled()) {
188193
if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
189-
logError("Can't get Master Kerberos principal for use as renewer")
190-
System.exit(1)
194+
val errorMessage = "Can't get Master Kerberos principal for use as renewer"
195+
logError(errorMessage)
196+
throw new SparkException(errorMessage)
191197
}
192198
}
193199
val dst = new Path(fs.getHomeDirectory(), appStagingDir)

yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,6 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
9595
def run() {
9696
val appId = runApp()
9797
monitorApplication(appId)
98-
System.exit(0)
9998
}
10099

101100
def logClusterResourceDetails() {
@@ -186,9 +185,18 @@ object Client {
186185
// see Client#setupLaunchEnv().
187186
System.setProperty("SPARK_YARN_MODE", "true")
188187
val sparkConf = new SparkConf()
189-
val args = new ClientArguments(argStrings, sparkConf)
190188

191-
new Client(args, sparkConf).run()
189+
try {
190+
val args = new ClientArguments(argStrings, sparkConf)
191+
new Client(args, sparkConf).run()
192+
} catch {
193+
case e: Exception => {
194+
Console.err.println(e.getMessage)
195+
System.exit(1)
196+
}
197+
}
198+
199+
System.exit(0)
192200
}
193201

194202
}

0 commit comments

Comments
 (0)