Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,29 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
</td>
</tr>
<tr>
<td><code>spark.yarn.executor.memoryOverhead</code></td>
<td>384</code></td>
<td><code>spark.yarn.dist.archives</code></td>
<td>(none)</td>
<td>
Comma separated list of archives to be extracted into the working directory of each executor.
</td>
</tr>
<tr>
<td><code>spark.yarn.dist.files</code></td>
<td>(none)</td>
<td>
Comma-separated list of files to be placed in the working directory of each executor.
<td>
</tr>
<tr>
<td><code>spark.yarn.executor.memoryOverhead</code></td>
<td>384</td>
<td>
The amount of off heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc.
</td>
</tr>
<tr>
<td><code>spark.yarn.driver.memoryOverhead</code></td>
<td>384</code></td>
<td>384</td>
<td>
The amount of off heap memory (in megabytes) to be allocated per driver. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc.
</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}

import org.apache.spark.SparkConf
import org.apache.spark.scheduler.InputFormatInfo
import org.apache.spark.util.IntParam
import org.apache.spark.util.MemoryParam
import org.apache.spark.util.{Utils, IntParam, MemoryParam}


// TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware !
Expand All @@ -45,6 +44,18 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {

parseArgs(args.toList)

// env variable SPARK_YARN_DIST_ARCHIVES/SPARK_YARN_DIST_FILES set in yarn-client then
// it should default to hdfs://
files = Option(files).getOrElse(sys.env.get("SPARK_YARN_DIST_FILES").orNull)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't want to add support for the env variables for yarn-cluster mode. We only support them on yarn-client mode for backwards compatibility. Can you remove this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again this is already handled in the YarnClientSchedulerBackend. It reads the env variables and passes in the --files/--archives without being resolveURI extended. The issue with that code is that it also looks at spark.yarn.dist.archives and spark.yarn.dist.files and doesn't resolveURI extend them.

archives = Option(archives).getOrElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES").orNull)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above comment.


// spark.yarn.dist.archives/spark.yarn.dist.files defaults to use file:// if not specified,
// for both yarn-client and yarn-cluster
files = Option(files).getOrElse(sparkConf.getOption("spark.yarn.dist.files").
map(p => Utils.resolveURIs(p)).orNull)
archives = Option(archives).getOrElse(sparkConf.getOption("spark.yarn.dist.archives").
map(p => Utils.resolveURIs(p)).orNull)

private def parseArgs(inputArgs: List[String]): Unit = {
val userArgsBuffer: ArrayBuffer[String] = new ArrayBuffer[String]()
val inputFormatMap: HashMap[String, InputFormatInfo] = new HashMap[String, InputFormatInfo]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ trait ClientBase extends Logging {
val fs = FileSystem.get(conf)
val remoteFs = originalPath.getFileSystem(conf)
var newPath = originalPath
if (! compareFs(remoteFs, fs)) {
if (!compareFs(remoteFs, fs)) {
newPath = new Path(dstDir, originalPath.getName())
logInfo("Uploading " + originalPath + " to " + newPath)
FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf)
Expand Down Expand Up @@ -250,6 +250,7 @@ trait ClientBase extends Logging {
}
}
}
logInfo("Prepared Local resources " + localResources)
sparkConf.set(ClientBase.CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(","))

UserGroupInformation.getCurrentUser().addCredentials(credentials)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,7 @@ private[spark] class YarnClientSchedulerBackend(
("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"),
("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"),
("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"),
("--name", "SPARK_YARN_APP_NAME", "spark.app.name"),
("--files", "SPARK_YARN_DIST_FILES", "spark.yarn.dist.files"),
("--archives", "SPARK_YARN_DIST_ARCHIVES", "spark.yarn.dist.archives"))
("--name", "SPARK_YARN_APP_NAME", "spark.app.name"))
.foreach { case (optName, envVar, sysProp) => addArg(optName, envVar, sysProp, argsArrayBuf) }

logDebug("ClientArguments called with: " + argsArrayBuf)
Expand Down