@@ -65,7 +65,7 @@ private[spark] class Client(
6565 private val amMemoryOverhead = args.amMemoryOverhead // MB
6666 private val executorMemoryOverhead = args.executorMemoryOverhead // MB
6767 private val distCacheMgr = new ClientDistributedCacheManager ()
68- private val isLaunchingDriver = args.userClass != null
68+ private val isClusterMode = args.userClass != null
6969
7070
7171 def stop (): Unit = yarnClient.stop()
@@ -179,7 +179,7 @@ private[spark] class Client(
179179 * The file is only copied if the source and destination file systems are different. This is used
180180 * for preparing resources for launching the ApplicationMaster container. Exposed for testing.
181181 */
182- def copyFileToRemote (
182+ private [yarn] def copyFileToRemote (
183183 destDir : Path ,
184184 srcPath : Path ,
185185 replication : Short ,
@@ -205,22 +205,6 @@ private[spark] class Client(
205205 fc.resolvePath(qualifiedDestPath)
206206 }
207207
208- /**
209- * Given a local URI, resolve it and return a qualified local path that corresponds to the URI.
210- * This is used for preparing local resources to be included in the container launch context.
211- */
212- private def getQualifiedLocalPath (localURI : URI ): Path = {
213- val qualifiedURI =
214- if (localURI.getScheme == null ) {
215- // If not specified, assume this is in the local filesystem to keep the behavior
216- // consistent with that of Hadoop
217- new URI (FileSystem .getLocal(hadoopConf).makeQualified(new Path (localURI)).toString)
218- } else {
219- localURI
220- }
221- new Path (qualifiedURI)
222- }
223-
224208 /**
225209 * Upload any resources to the distributed cache if needed. If a resource is intended to be
226210 * consumed locally, set up the appropriate config for downstream code to handle it properly.
@@ -269,7 +253,7 @@ private[spark] class Client(
269253 if (! localPath.isEmpty()) {
270254 val localURI = new URI (localPath)
271255 if (localURI.getScheme != LOCAL_SCHEME ) {
272- val src = getQualifiedLocalPath(localURI)
256+ val src = getQualifiedLocalPath(localURI, hadoopConf )
273257 val destPath = copyFileToRemote(dst, src, replication, setPermissions)
274258 val destFs = FileSystem .get(destPath.toUri(), hadoopConf)
275259 distCacheMgr.addResource(destFs, hadoopConf, destPath,
@@ -360,7 +344,7 @@ private[spark] class Client(
360344 // Note that to warn the user about the deprecation in cluster mode, some code from
361345 // SparkConf#validateSettings() is duplicated here (to avoid triggering the condition
362346 // described above).
363- if (isLaunchingDriver ) {
347+ if (isClusterMode ) {
364348 sys.env.get(" SPARK_JAVA_OPTS" ).foreach { value =>
365349 val warning =
366350 s """
@@ -440,7 +424,7 @@ private[spark] class Client(
440424 }
441425
442426 // Include driver-specific java options if we are launching a driver
443- if (isLaunchingDriver ) {
427+ if (isClusterMode ) {
444428 sparkConf.getOption(" spark.driver.extraJavaOptions" )
445429 .orElse(sys.env.get(" SPARK_JAVA_OPTS" ))
446430 .map(Utils .splitCommandString).getOrElse(Seq .empty)
@@ -474,7 +458,7 @@ private[spark] class Client(
474458 javaOpts += (" -Dspark.yarn.app.container.log.dir=" + ApplicationConstants .LOG_DIR_EXPANSION_VAR )
475459
476460 val userClass =
477- if (isLaunchingDriver ) {
461+ if (isClusterMode ) {
478462 Seq (" --class" , YarnSparkHadoopUtil .escapeForShell(args.userClass))
479463 } else {
480464 Nil
@@ -486,7 +470,7 @@ private[spark] class Client(
486470 Nil
487471 }
488472 val amClass =
489- if (isLaunchingDriver ) {
473+ if (isClusterMode ) {
490474 Class .forName(" org.apache.spark.deploy.yarn.ApplicationMaster" ).getName
491475 } else {
492476 Class .forName(" org.apache.spark.deploy.yarn.ExecutorLauncher" ).getName
@@ -623,7 +607,7 @@ private[spark] class Client(
623607 }
624608}
625609
626- private [spark] object Client extends Logging {
610+ object Client extends Logging {
627611 def main (argStrings : Array [String ]) {
628612 if (! sys.props.contains(" SPARK_SUBMIT" )) {
629613 println(" WARNING: This client is deprecated and will be removed in a " +
@@ -699,7 +683,8 @@ private[spark] object Client extends Logging {
699683 * Populate the classpath entry in the given environment map with any application
700684 * classpath specified through the Hadoop and Yarn configurations.
701685 */
702- def populateHadoopClasspath (conf : Configuration , env : HashMap [String , String ]): Unit = {
686+ private [yarn] def populateHadoopClasspath (conf : Configuration , env : HashMap [String , String ])
687+ : Unit = {
703688 val classPathElementsToAdd = getYarnAppClasspath(conf) ++ getMRAppClasspath(conf)
704689 for (c <- classPathElementsToAdd.flatten) {
705690 YarnSparkHadoopUtil .addPathToEnvironment(env, Environment .CLASSPATH .name, c.trim)
@@ -925,4 +910,20 @@ private[spark] object Client extends Logging {
925910 Objects .equal(srcHost, dstHost) && srcUri.getPort() == dstUri.getPort()
926911 }
927912
913+ /**
914+ * Given a local URI, resolve it and return a qualified local path that corresponds to the URI.
915+ * This is used for preparing local resources to be included in the container launch context.
916+ */
917+ private def getQualifiedLocalPath (localURI : URI , hadoopConf : Configuration ): Path = {
918+ val qualifiedURI =
919+ if (localURI.getScheme == null ) {
920+ // If not specified, assume this is in the local filesystem to keep the behavior
921+ // consistent with that of Hadoop
922+ new URI (FileSystem .getLocal(hadoopConf).makeQualified(new Path (localURI)).toString)
923+ } else {
924+ localURI
925+ }
926+ new Path (qualifiedURI)
927+ }
928+
928929}
0 commit comments