@@ -20,7 +20,7 @@ package org.apache.spark.deploy.yarn
2020import java .net .{InetAddress , URI , UnknownHostException }
2121
2222import scala .collection .JavaConversions ._
23- import scala .collection .mutable .{HashMap , Map }
23+ import scala .collection .mutable .{HashMap , ListBuffer , Map }
2424
2525import org .apache .hadoop .conf .Configuration
2626import org .apache .hadoop .fs ._
@@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.api.protocolrecords._
3535import org .apache .hadoop .yarn .api .records ._
3636import org .apache .hadoop .yarn .conf .YarnConfiguration
3737import org .apache .hadoop .yarn .util .{Apps , Records }
38- import org .apache .spark .{Logging , SparkConf }
38+ import org .apache .spark .{Logging , SparkConf , SparkContext }
3939
4040/**
4141 * The entry point (starting in Client#main() and Client#run()) for launching Spark on YARN. The
@@ -64,7 +64,6 @@ trait ClientBase extends Logging {
6464 // TODO(harvey): This could just go in ClientArguments.
6565 def validateArgs () = {
6666 Map (
67- (System .getenv(" SPARK_JAR" ) == null ) -> " Error: You must set SPARK_JAR environment variable!" ,
6867 ((args.userJar == null && args.amClass == classOf [ApplicationMaster ].getName) ->
6968 " Error: You must specify a user jar when running in standalone mode!" ),
7069 (args.userClass == null ) -> " Error: You must specify a user class!" ,
@@ -202,7 +201,7 @@ trait ClientBase extends Logging {
202201 val statCache : Map [URI , FileStatus ] = HashMap [URI , FileStatus ]()
203202
204203 Map (
205- ClientBase .SPARK_JAR -> System .getenv( " SPARK_JAR " ) , ClientBase .APP_JAR -> args.userJar,
204+ ClientBase .SPARK_JAR -> ClientBase .getSparkJar , ClientBase .APP_JAR -> args.userJar,
206205 ClientBase .LOG4J_PROP -> System .getenv(" SPARK_LOG4J_CONF" )
207206 ).foreach { case (destName, _localPath) =>
208207 val localPath : String = if (_localPath != null ) _localPath.trim() else " "
@@ -306,13 +305,13 @@ trait ClientBase extends Logging {
306305
307306 val amMemory = calculateAMMemory(newApp)
308307
309- var JAVA_OPTS = " "
308+ var JAVA_OPTS = ListBuffer [ String ]()
310309
311310 // Add Xmx for AM memory
312311 JAVA_OPTS += " -Xmx" + amMemory + " m"
313312
314313 val tmpDir = new Path (Environment .PWD .$(), YarnConfiguration .DEFAULT_CONTAINER_TEMP_DIR )
315- JAVA_OPTS += " -Djava.io.tmpdir=" + tmpDir
314+ JAVA_OPTS += " -Djava.io.tmpdir=" + tmpDir
316315
317316 // TODO: Remove once cpuset version is pushed out.
318317 // The context is, default gc for server class machines ends up using all cores to do gc -
@@ -326,24 +325,24 @@ trait ClientBase extends Logging {
326325 if (useConcurrentAndIncrementalGC) {
327326 // In our expts, using (default) throughput collector has severe perf ramifications in
328327 // multi-tenant machines
329- JAVA_OPTS += " -XX:+UseConcMarkSweepGC "
330- JAVA_OPTS += " -XX:+CMSIncrementalMode "
331- JAVA_OPTS += " -XX:+CMSIncrementalPacing "
332- JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 "
333- JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 "
328+ JAVA_OPTS += " -XX:+UseConcMarkSweepGC"
329+ JAVA_OPTS += " -XX:+CMSIncrementalMode"
330+ JAVA_OPTS += " -XX:+CMSIncrementalPacing"
331+ JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0"
332+ JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10"
334333 }
335334
336335 if (args.amClass == classOf [ExecutorLauncher ].getName) {
337336 // If we are being launched in client mode, forward the spark-conf options
338337 // onto the executor launcher
339338 for ((k, v) <- sparkConf.getAll) {
340- JAVA_OPTS += s " -D $k = $v "
339+ JAVA_OPTS += " -D" + k + " = " + " \\\" " + v + " \\\" "
341340 }
342341 } else {
343342 // If we are being launched in standalone mode, capture and forward any spark
344343 // system properties (e.g. set by spark-class).
345344 for ((k, v) <- sys.props.filterKeys(_.startsWith(" spark" ))) {
346- JAVA_OPTS += s " -D $k = $v "
345+ JAVA_OPTS += " -D" + k + " = " + " \\\" " + v + " \\\" "
347346 }
348347 sys.props.get(" spark.driver.extraJavaOptions" ).foreach(opts => JAVA_OPTS += opts)
349348 sys.props.get(" spark.driver.libraryPath" ).foreach(p => JAVA_OPTS += s " -Djava.library.path= $p" )
@@ -354,22 +353,21 @@ trait ClientBase extends Logging {
354353 }
355354
356355 // Command for the ApplicationMaster
357- val commands = List [String ](
358- Environment .JAVA_HOME .$() + " /bin/java" +
359- " -server " +
360- JAVA_OPTS +
361- " " + args.amClass +
362- " --class " + args.userClass +
363- " --jar " + args.userJar +
364- userArgsToString(args) +
365- " --executor-memory " + args.executorMemory +
366- " --executor-cores " + args.executorCores +
367- " --num-executors " + args.numExecutors +
368- " 1> " + ApplicationConstants .LOG_DIR_EXPANSION_VAR + " /stdout" +
369- " 2> " + ApplicationConstants .LOG_DIR_EXPANSION_VAR + " /stderr" )
370-
371- logInfo(" Command for starting the Spark ApplicationMaster: " + commands(0 ))
372- amContainer.setCommands(commands)
356+ val commands = Seq (Environment .JAVA_HOME .$() + " /bin/java" , " -server" ) ++
357+ JAVA_OPTS ++
358+ Seq (args.amClass, " --class" , args.userClass, " --jar " , args.userJar,
359+ userArgsToString(args),
360+ " --executor-memory" , args.executorMemory.toString,
361+ " --executor-cores" , args.executorCores.toString,
362+ " --num-executors " , args.numExecutors.toString,
363+ " 1>" , ApplicationConstants .LOG_DIR_EXPANSION_VAR + " /stdout" ,
364+ " 2>" , ApplicationConstants .LOG_DIR_EXPANSION_VAR + " /stderr" )
365+
366+ logInfo(" Command for starting the Spark ApplicationMaster: " + commands)
367+
368+ // TODO: it would be nicer to just make sure there are no null commands here
369+ val printableCommands = commands.map(s => if (s == null ) " null" else s).toList
370+ amContainer.setCommands(printableCommands)
373371
374372 setupSecurityToken(amContainer)
375373 amContainer
@@ -381,6 +379,8 @@ object ClientBase {
381379 val APP_JAR : String = " app.jar"
382380 val LOG4J_PROP : String = " log4j.properties"
383381
382+ def getSparkJar = sys.env.get(" SPARK_JAR" ).getOrElse(SparkContext .jarOfClass(this .getClass).head)
383+
384384 // Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps
385385 def populateHadoopClasspath (conf : Configuration , env : HashMap [String , String ]) {
386386 val classpathEntries = Option (conf.getStrings(
@@ -433,9 +433,9 @@ object ClientBase {
433433 extraClassPath : Option [String ] = None ) {
434434
435435 /** Add entry to the classpath. */
436- def addClasspathEntry (entry : String ) = pps .addToEnvironment(env, Environment .CLASSPATH .name, entry )
436+ def addClasspathEntry (path : String ) = Apps .addToEnvironment(env, Environment .CLASSPATH .name, path )
437437 /** Add entry to the classpath. Interpreted as a path relative to the working directory. */
438- def addPwdClasspathEntry (path : String ) = addClasspathEntry(Environment .PWD .$() + Path .SEPARATOR + entry)
438+ def addPwdClasspathEntry (entry : String ) = addClasspathEntry(Environment .PWD .$() + Path .SEPARATOR + entry)
439439
440440 extraClassPath.foreach(addClasspathEntry)
441441
0 commit comments