From 836248956ff3ef17d44cea37b357e3616b054d64 Mon Sep 17 00:00:00 2001 From: witgo Date: Thu, 5 Jun 2014 00:50:12 +0800 Subject: [PATCH 01/11] yarn.ClientBase spark.yarn.dist.* do not work --- .../apache/spark/deploy/yarn/ClientBase.scala | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index aeb3f0062df3b..c23b533fa9eae 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -220,10 +220,21 @@ trait ClientBase extends Logging { } } + def getArg(arg: String, envVar: String, sysProp: String): String = { + if (arg != null && !arg.isEmpty) { + arg + } else if (System.getenv(envVar) != null && !System.getenv(envVar).isEmpty) { + System.getenv(envVar) + } else { + sparkConf.getOption(sysProp).orNull + } + } var cachedSecondaryJarLinks = ListBuffer.empty[String] - val fileLists = List( (args.addJars, LocalResourceType.FILE, true), - (args.files, LocalResourceType.FILE, false), - (args.archives, LocalResourceType.ARCHIVE, false) ) + val fileLists = List((args.addJars, LocalResourceType.FILE, true), + (getArg(args.files, "SPARK_YARN_DIST_FILES", "spark.yarn.dist.files"), + LocalResourceType.FILE, false), + (getArg(args.archives, "SPARK_YARN_DIST_ARCHIVES", "spark.yarn.dist.archives"), + LocalResourceType.ARCHIVE, false)) fileLists.foreach { case (flist, resType, addToClasspath) => if (flist != null && !flist.isEmpty()) { flist.split(',').foreach { case file: String => From 8bc2f4be4520a88de0c77b13cefe9b4d805018c3 Mon Sep 17 00:00:00 2001 From: witgo Date: Fri, 13 Jun 2014 15:23:07 +0800 Subject: [PATCH 02/11] review commit --- .../main/scala/org/apache/spark/deploy/yarn/ClientBase.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index 22110b6f739fb..debb4725e6b72 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -153,7 +153,7 @@ trait ClientBase extends Logging { val fs = FileSystem.get(conf) val remoteFs = originalPath.getFileSystem(conf) var newPath = originalPath - if (! compareFs(remoteFs, fs)) { + if (!compareFs(remoteFs, fs)) { newPath = new Path(dstDir, originalPath.getName()) logInfo("Uploading " + originalPath + " to " + newPath) FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf) From 9cdff169252062d7252daac54d30d1a3097c3764 Mon Sep 17 00:00:00 2001 From: witgo Date: Fri, 13 Jun 2014 19:47:37 +0800 Subject: [PATCH 03/11] review commit --- .../spark/deploy/SparkSubmitArguments.scala | 10 +++++++++- .../apache/spark/deploy/yarn/ClientBase.scala | 20 +++++-------------- 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index f1032ea8dbada..a2d21a131a82a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -104,7 +104,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { totalExecutorCores = Option(totalExecutorCores) .getOrElse(defaultProperties.get("spark.cores.max").orNull) name = Option(name).getOrElse(defaultProperties.get("spark.app.name").orNull) - jars = Option(jars).getOrElse(defaultProperties.get("spark.jars").orNull) + jars = Option(jars).getOrElse(defaultProperties.get("spark.jars"). + map(p => Utils.resolveURIs(p)).orNull) // This supports env vars in older versions of Spark master = Option(master).getOrElse(System.getenv("MASTER")) @@ -131,6 +132,13 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { if (name == null && primaryResource != null) { name = Utils.stripDirectory(primaryResource) } + + if (master.startsWith("yarn")) { + archives = Option(archives).getOrElse(defaultProperties + .get("spark.yarn.dist.archives").map(p => Utils.resolveURIs(p)).orNull) + files = Option(files).getOrElse(defaultProperties + .get("spark.yarn.dist.files").map(p => Utils.resolveURIs(p)).orNull) + } } /** Ensure that required fields exists. Call this only once all defaults are loaded. */ diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index debb4725e6b72..bf1e775c454b6 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -219,21 +219,10 @@ trait ClientBase extends Logging { } } - def getArg(arg: String, envVar: String, sysProp: String): String = { - if (arg != null && !arg.isEmpty) { - arg - } else if (System.getenv(envVar) != null && !System.getenv(envVar).isEmpty) { - System.getenv(envVar) - } else { - sparkConf.getOption(sysProp).orNull - } - } - var cachedSecondaryJarLinks = ListBuffer.empty[String] - val fileLists = List((args.addJars, LocalResourceType.FILE, true), - (getArg(args.files, "SPARK_YARN_DIST_FILES", "spark.yarn.dist.files"), - LocalResourceType.FILE, false), - (getArg(args.archives, "SPARK_YARN_DIST_ARCHIVES", "spark.yarn.dist.archives"), - LocalResourceType.ARCHIVE, false)) + val cachedSecondaryJarLinks = ListBuffer.empty[String] + val fileLists = List( (args.addJars, LocalResourceType.FILE, true), + (args.files, LocalResourceType.FILE, false), + (args.archives, LocalResourceType.ARCHIVE, false) ) fileLists.foreach { case (flist, resType, addToClasspath) => if (flist != null && !flist.isEmpty()) { flist.split(',').foreach { case file: String => @@ -251,6 +240,7 @@ trait ClientBase extends Logging { } } } + logInfo("Prepared Local resources " + localResources) sparkConf.set(ClientBase.CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(",")) UserGroupInformation.getCurrentUser().addCredentials(credentials) From 35d6fa0425138db38563dc020253734d4e9a9607 Mon Sep 17 00:00:00 2001 From: witgo Date: Fri, 13 Jun 2014 22:30:33 +0800 Subject: [PATCH 04/11] move to ClientArguments --- .../apache/spark/deploy/SparkSubmitArguments.scala | 7 ------- .../apache/spark/deploy/yarn/ClientArguments.scala | 11 +++++++++-- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index a2d21a131a82a..abd07dc971359 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -132,13 +132,6 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { if (name == null && primaryResource != null) { name = Utils.stripDirectory(primaryResource) } - - if (master.startsWith("yarn")) { - archives = Option(archives).getOrElse(defaultProperties - .get("spark.yarn.dist.archives").map(p => Utils.resolveURIs(p)).orNull) - files = Option(files).getOrElse(defaultProperties - .get("spark.yarn.dist.files").map(p => Utils.resolveURIs(p)).orNull) - } } /** Ensure that required fields exists. Call this only once all defaults are loaded. */ diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index fd3ef9e1fa2de..3273af8b48595 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -21,8 +21,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap} import org.apache.spark.SparkConf import org.apache.spark.scheduler.InputFormatInfo -import org.apache.spark.util.IntParam -import org.apache.spark.util.MemoryParam +import org.apache.spark.util.{Utils, IntParam, MemoryParam} // TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware ! @@ -45,6 +44,14 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { parseArgs(args.toList) + files = Option(files).getOrElse(sys.env.get("SPARK_YARN_DIST_FILES").orNull) + files = Option(files).getOrElse(sparkConf.getOption("spark.yarn.dist.files").orNull) + files = Option(files).map(p => Utils.resolveURIs(p)).orNull + + archives = Option(archives).getOrElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES").orNull) + archives = Option(archives).getOrElse(sparkConf.getOption("spark.yarn.dist.archives").orNull) + archives = Option(archives).map(p => Utils.resolveURIs(p)).orNull + private def parseArgs(inputArgs: List[String]): Unit = { val userArgsBuffer: ArrayBuffer[String] = new ArrayBuffer[String]() val inputFormatMap: HashMap[String, InputFormatInfo] = new HashMap[String, InputFormatInfo]() From 41bce59cc41cb039e5ec93da0bb0937f3027e61f Mon Sep 17 00:00:00 2001 From: witgo Date: Fri, 13 Jun 2014 23:29:04 +0800 Subject: [PATCH 05/11] review commit --- .../scala/org/apache/spark/deploy/yarn/ClientArguments.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 3273af8b48595..9cb8256e25de1 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -44,11 +44,9 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { parseArgs(args.toList) - files = Option(files).getOrElse(sys.env.get("SPARK_YARN_DIST_FILES").orNull) files = Option(files).getOrElse(sparkConf.getOption("spark.yarn.dist.files").orNull) files = Option(files).map(p => Utils.resolveURIs(p)).orNull - archives = Option(archives).getOrElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES").orNull) archives = Option(archives).getOrElse(sparkConf.getOption("spark.yarn.dist.archives").orNull) archives = Option(archives).map(p => Utils.resolveURIs(p)).orNull From 871f1db727d7d6b38fd277019495a954b5e3fbb9 Mon Sep 17 00:00:00 2001 From: witgo Date: Fri, 13 Jun 2014 23:49:40 +0800 Subject: [PATCH 06/11] add spark.yarn.dist.* documentation --- docs/running-on-yarn.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index af1788f2aa151..a38ed8c48e470 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -67,6 +67,20 @@ Most of the configs are the same for Spark on YARN as for other deployment modes The address of the Spark history server (i.e. host.com:18080). The address should not contain a scheme (http://). Defaults to not being set since the history server is an optional service. This address is given to the YARN ResourceManager when the Spark application finishes to link the application from the ResourceManager UI to the Spark history server UI. + + spark.yarn.dist.archives + (none) + + Comma separated list of archives to be extracted into the working directory of each executor. + + + + spark.yarn.dist.files + (none) + + Comma-separated list of files to be placed in the working directory of each executor. + + By default, Spark on YARN will use a Spark jar installed locally, but the Spark JAR can also be in a world-readable location on HDFS. This allows YARN to cache it on nodes so that it doesn't need to be distributed each time an application runs. To point to a JAR on HDFS, `export SPARK_JAR=hdfs:///some/path`. From 10485491a5e615b67a2309d0cec63bc208bf4e31 Mon Sep 17 00:00:00 2001 From: witgo Date: Sun, 15 Jun 2014 12:01:10 +0800 Subject: [PATCH 07/11] remove Utils.resolveURIs --- .../scala/org/apache/spark/deploy/SparkSubmitArguments.scala | 3 +-- .../scala/org/apache/spark/deploy/yarn/ClientArguments.scala | 3 --- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index abd07dc971359..f1032ea8dbada 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -104,8 +104,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { totalExecutorCores = Option(totalExecutorCores) .getOrElse(defaultProperties.get("spark.cores.max").orNull) name = Option(name).getOrElse(defaultProperties.get("spark.app.name").orNull) - jars = Option(jars).getOrElse(defaultProperties.get("spark.jars"). - map(p => Utils.resolveURIs(p)).orNull) + jars = Option(jars).getOrElse(defaultProperties.get("spark.jars").orNull) // This supports env vars in older versions of Spark master = Option(master).getOrElse(System.getenv("MASTER")) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 9cb8256e25de1..4069dfcd6276e 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -45,10 +45,7 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { parseArgs(args.toList) files = Option(files).getOrElse(sparkConf.getOption("spark.yarn.dist.files").orNull) - files = Option(files).map(p => Utils.resolveURIs(p)).orNull - archives = Option(archives).getOrElse(sparkConf.getOption("spark.yarn.dist.archives").orNull) - archives = Option(archives).map(p => Utils.resolveURIs(p)).orNull private def parseArgs(inputArgs: List[String]): Unit = { val userArgsBuffer: ArrayBuffer[String] = new ArrayBuffer[String]() From c8b4554dd09870cc193ea5c22ece6eda3bbe61e5 Mon Sep 17 00:00:00 2001 From: witgo Date: Tue, 17 Jun 2014 00:40:45 +0800 Subject: [PATCH 08/11] review commit --- .../spark/deploy/yarn/ClientArguments.scala | 20 +++++++++++++++++-- .../cluster/YarnClientSchedulerBackend.scala | 4 +--- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 4069dfcd6276e..e6db4c542d700 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -44,8 +44,24 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { parseArgs(args.toList) - files = Option(files).getOrElse(sparkConf.getOption("spark.yarn.dist.files").orNull) - archives = Option(archives).getOrElse(sparkConf.getOption("spark.yarn.dist.archives").orNull) + // -archives/--files via spark submit or yarn-client defaults to use file:// if not specified + if (sys.props.contains("SPARK_SUBMIT") || (sparkConf.getOption("spark.master").isDefined && + sparkConf.get("spark.master") == "yarn-client")) { + files = Option(files).map(p => Utils.resolveURIs(p)).orNull + archives = Option(archives).map(p => Utils.resolveURIs(p)).orNull + } + + // env variable SPARK_YARN_DIST_ARCHIVES/SPARK_YARN_DIST_FILES set in yarn-client then + // it should default to hdfs:// + files = Option(files).getOrElse(sys.props.get("SPARK_YARN_DIST_FILES").orNull) + archives = Option(archives).getOrElse(sys.props.get("SPARK_YARN_DIST_ARCHIVES").orNull) + + // spark.yarn.dist.archives/spark.yarn.dist.files defaults to use file:// if not specified, + // for both yarn-client and yarn-cluster + files = Option(files).getOrElse(sparkConf.getOption("spark.yarn.dist.files"). + map(p => Utils.resolveURIs(p)).orNull) + archives = Option(archives).getOrElse(sparkConf.getOption("spark.yarn.dist.archives"). + map(p => Utils.resolveURIs(p)).orNull) private def parseArgs(inputArgs: List[String]): Unit = { val userArgsBuffer: ArrayBuffer[String] = new ArrayBuffer[String]() diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 039cf4f276119..412dfe38d55eb 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -70,9 +70,7 @@ private[spark] class YarnClientSchedulerBackend( ("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"), ("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"), ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"), - ("--name", "SPARK_YARN_APP_NAME", "spark.app.name"), - ("--files", "SPARK_YARN_DIST_FILES", "spark.yarn.dist.files"), - ("--archives", "SPARK_YARN_DIST_ARCHIVES", "spark.yarn.dist.archives")) + ("--name", "SPARK_YARN_APP_NAME", "spark.app.name")) .foreach { case (optName, envVar, sysProp) => addArg(optName, envVar, sysProp, argsArrayBuf) } logDebug("ClientArguments called with: " + argsArrayBuf) From e3c110712fde779d70a4df8a54211607d552fc91 Mon Sep 17 00:00:00 2001 From: witgo Date: Tue, 17 Jun 2014 09:51:59 +0800 Subject: [PATCH 09/11] update docs --- docs/running-on-yarn.md | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index d2a799c3f4886..fecd8f2cc2d48 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -79,15 +79,18 @@ Most of the configs are the same for Spark on YARN as for other deployment modes (none) Comma-separated list of files to be placed in the working directory of each executor. - spark.yarn.executor.memoryOverhead - 384 + + + + spark.yarn.executor.memoryOverhead + 384 The amount of off heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. spark.yarn.driver.memoryOverhead - 384 + 384 The amount of off heap memory (in megabytes) to be allocated per driver. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. From 5261b6c362fce2cbaa39fc888a45274947411d78 Mon Sep 17 00:00:00 2001 From: witgo Date: Tue, 17 Jun 2014 17:06:15 +0800 Subject: [PATCH 10/11] fix sys.props.get("SPARK_YARN_DIST_FILES") --- .../scala/org/apache/spark/deploy/yarn/ClientArguments.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index e6db4c542d700..acdc05f5529d3 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -53,8 +53,8 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { // env variable SPARK_YARN_DIST_ARCHIVES/SPARK_YARN_DIST_FILES set in yarn-client then // it should default to hdfs:// - files = Option(files).getOrElse(sys.props.get("SPARK_YARN_DIST_FILES").orNull) - archives = Option(archives).getOrElse(sys.props.get("SPARK_YARN_DIST_ARCHIVES").orNull) + files = Option(files).getOrElse(sys.env.get("SPARK_YARN_DIST_FILES").orNull) + archives = Option(archives).getOrElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES").orNull) // spark.yarn.dist.archives/spark.yarn.dist.files defaults to use file:// if not specified, // for both yarn-client and yarn-cluster From 8117765daa02b03b81c12c1869ef75eda6653392 Mon Sep 17 00:00:00 2001 From: witgo Date: Tue, 17 Jun 2014 23:34:52 +0800 Subject: [PATCH 11/11] review commit --- .../org/apache/spark/deploy/yarn/ClientArguments.scala | 7 ------- 1 file changed, 7 deletions(-) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index acdc05f5529d3..62f9b3cf5ab88 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -44,13 +44,6 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { parseArgs(args.toList) - // -archives/--files via spark submit or yarn-client defaults to use file:// if not specified - if (sys.props.contains("SPARK_SUBMIT") || (sparkConf.getOption("spark.master").isDefined && - sparkConf.get("spark.master") == "yarn-client")) { - files = Option(files).map(p => Utils.resolveURIs(p)).orNull - archives = Option(archives).map(p => Utils.resolveURIs(p)).orNull - } - // env variable SPARK_YARN_DIST_ARCHIVES/SPARK_YARN_DIST_FILES set in yarn-client then // it should default to hdfs:// files = Option(files).getOrElse(sys.env.get("SPARK_YARN_DIST_FILES").orNull)