Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 17 additions & 27 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,6 @@ object SparkSubmit {
OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
sysProp = "spark.submit.deployMode"),
OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"),
OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars"),
OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars.ivy"),
OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT,
sysProp = "spark.driver.memory"),
Expand All @@ -452,27 +451,15 @@ object SparkSubmit {
OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
sysProp = "spark.driver.extraLibraryPath"),

// Yarn client only
OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"),
// Yarn only
OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.queue"),
OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES,
sysProp = "spark.executor.instances"),
OptionAssigner(args.files, YARN, CLIENT, sysProp = "spark.yarn.dist.files"),
OptionAssigner(args.archives, YARN, CLIENT, sysProp = "spark.yarn.dist.archives"),
OptionAssigner(args.principal, YARN, CLIENT, sysProp = "spark.yarn.principal"),
OptionAssigner(args.keytab, YARN, CLIENT, sysProp = "spark.yarn.keytab"),

// Yarn cluster only
OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name"),
OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = "--driver-memory"),
OptionAssigner(args.driverCores, YARN, CLUSTER, clOption = "--driver-cores"),
OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"),
OptionAssigner(args.executorMemory, YARN, CLUSTER, clOption = "--executor-memory"),
OptionAssigner(args.executorCores, YARN, CLUSTER, clOption = "--executor-cores"),
OptionAssigner(args.files, YARN, CLUSTER, clOption = "--files"),
OptionAssigner(args.archives, YARN, CLUSTER, clOption = "--archives"),
OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"),
OptionAssigner(args.principal, YARN, CLUSTER, clOption = "--principal"),
OptionAssigner(args.keytab, YARN, CLUSTER, clOption = "--keytab"),
OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.jars"),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I put all the additional jars into a configuration spark.yarn.dist.jars, this will be picked by yarn/client and put into distributed cache. So now both in yarn client and cluster mode, additional jars will be put into distributed cache.

Another thing is that do we need to put user jar into distributed cache for yarn client mode, I think it is doable, not sure is there any special concern?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should just leave that as is for now. We can file separate jira if we want to change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So dist.files and dist.archives are public and documented, seems like we should make dist.jars public and document it also in the yarn docs unless someone has reason not to.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will add it to the yarn doc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like there's another config "spark.jars" to handle this property, maybe we don't need to add another, and for dist.jars we could make it as internal use for yarn only.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.jars is for distributing via spark internal mechanisms, this is done via the distributed cache, we should add it to the yarn only section of docs similar to the dist.files and dist.archives.

OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.files"),
OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.archives"),
OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.principal"),
OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.keytab"),

// Other options
OptionAssigner(args.executorCores, STANDALONE | YARN, ALL_DEPLOY_MODES,
Expand All @@ -483,10 +470,11 @@ object SparkSubmit {
sysProp = "spark.cores.max"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES,
sysProp = "spark.files"),
OptionAssigner(args.jars, STANDALONE | MESOS, CLUSTER, sysProp = "spark.jars"),
OptionAssigner(args.driverMemory, STANDALONE | MESOS, CLUSTER,
OptionAssigner(args.jars, LOCAL, CLIENT, sysProp = "spark.jars"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing this and letting it use spark.jars uses the spark addJars method of distributing jars instead of using the yarn distributed cache like it did before. I'm not sure I want to change that, I would rather keep as much as possible consistent using the YARN distributed cache unless we have a reason to change.

If this means we need to add another config like spark.yarn.dist.jars I'm fine with that. We can leave it as undocumented for now if we want since spark.jars/spark.files isn't documented either, thought I'm not quite sure why.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we shouldn't lose the distributed cache functionality.

On that topic, but probably orthogonal to this change, at some point I'd like to see yarn-client somehow also use the distributed cache for these jars. IIRC that doesn't happen at the moment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Reading the rest of the change, if you figure out an easy way to have YARN's Client class handle spark.jars, you could solve both problems here... maybe some check in SparkContext to not distribute them in YARN mode?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks a lot for your suggestions.

OptionAssigner(args.jars, STANDALONE | MESOS, ALL_DEPLOY_MODES, sysProp = "spark.jars"),
OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN, CLUSTER,
sysProp = "spark.driver.memory"),
OptionAssigner(args.driverCores, STANDALONE | MESOS, CLUSTER,
OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN, CLUSTER,
sysProp = "spark.driver.cores"),
OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER,
sysProp = "spark.driver.supervise"),
Expand Down Expand Up @@ -550,6 +538,10 @@ object SparkSubmit {
if (args.isPython) {
sysProps.put("spark.yarn.isPython", "true")
}

if (args.pyFiles != null) {
sysProps("spark.submit.pyFiles") = args.pyFiles
}
}

// assure a keytab is available from any place in a JVM
Expand All @@ -576,9 +568,6 @@ object SparkSubmit {
childMainClass = "org.apache.spark.deploy.yarn.Client"
if (args.isPython) {
childArgs += ("--primary-py-file", args.primaryResource)
if (args.pyFiles != null) {
childArgs += ("--py-files", args.pyFiles)
}
childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
} else if (args.isR) {
val mainFile = new Path(args.primaryResource).getName
Expand Down Expand Up @@ -627,7 +616,8 @@ object SparkSubmit {
"spark.jars",
"spark.files",
"spark.yarn.dist.files",
"spark.yarn.dist.archives")
"spark.yarn.dist.archives",
"spark.yarn.dist.jars")
pathConfigs.foreach { config =>
// Replace old URIs with resolved URIs, if they exist
sysProps.get(config).foreach { oldValue =>
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.internal

import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.network.util.ByteUnit

package object config {

Expand All @@ -33,6 +34,10 @@ package object config {
private[spark] val DRIVER_USER_CLASS_PATH_FIRST =
ConfigBuilder("spark.driver.userClassPathFirst").booleanConf.withDefault(false)

private[spark] val DRIVER_MEMORY = ConfigBuilder("spark.driver.memory")
.bytesConf(ByteUnit.MiB)
.withDefaultString("1g")

private[spark] val EXECUTOR_CLASS_PATH =
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.optional

Expand All @@ -45,6 +50,10 @@ package object config {
private[spark] val EXECUTOR_USER_CLASS_PATH_FIRST =
ConfigBuilder("spark.executor.userClassPathFirst").booleanConf.withDefault(false)

private[spark] val EXECUTOR_MEMORY = ConfigBuilder("spark.executor.memory")
.bytesConf(ByteUnit.MiB)
.withDefaultString("1g")

private[spark] val IS_PYTHON_APP = ConfigBuilder("spark.yarn.isPython").internal
.booleanConf.withDefault(false)

Expand Down Expand Up @@ -73,4 +82,9 @@ package object config {

private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances").intConf.optional

private[spark] val PY_FILES = ConfigBuilder("spark.submit.pyFiles")
.internal
.stringConf
.toSequence
.withDefault(Nil)
}
19 changes: 10 additions & 9 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -199,21 +199,21 @@ class SparkSubmitSuite
val (childArgs, classpath, sysProps, mainClass) = prepareSubmitEnvironment(appArgs)
val childArgsStr = childArgs.mkString(" ")
childArgsStr should include ("--class org.SomeClass")
childArgsStr should include ("--executor-memory 5g")
childArgsStr should include ("--driver-memory 4g")
childArgsStr should include ("--executor-cores 5")
childArgsStr should include ("--arg arg1 --arg arg2")
childArgsStr should include ("--queue thequeue")
childArgsStr should include regex ("--jar .*thejar.jar")
childArgsStr should include regex ("--addJars .*one.jar,.*two.jar,.*three.jar")
childArgsStr should include regex ("--files .*file1.txt,.*file2.txt")
childArgsStr should include regex ("--archives .*archive1.txt,.*archive2.txt")
mainClass should be ("org.apache.spark.deploy.yarn.Client")
classpath should have length (0)

sysProps("spark.executor.memory") should be ("5g")
sysProps("spark.driver.memory") should be ("4g")
sysProps("spark.executor.cores") should be ("5")
sysProps("spark.yarn.queue") should be ("thequeue")
sysProps("spark.yarn.dist.jars") should include regex (".*one.jar,.*two.jar,.*three.jar")
sysProps("spark.yarn.dist.files") should include regex (".*file1.txt,.*file2.txt")
sysProps("spark.yarn.dist.archives") should include regex (".*archive1.txt,.*archive2.txt")
sysProps("spark.app.name") should be ("beauty")
sysProps("spark.ui.enabled") should be ("false")
sysProps("SPARK_SUBMIT") should be ("true")
sysProps.keys should not contain ("spark.jars")
}

test("handles YARN client mode") {
Expand Down Expand Up @@ -249,7 +249,8 @@ class SparkSubmitSuite
sysProps("spark.executor.instances") should be ("6")
sysProps("spark.yarn.dist.files") should include regex (".*file1.txt,.*file2.txt")
sysProps("spark.yarn.dist.archives") should include regex (".*archive1.txt,.*archive2.txt")
sysProps("spark.jars") should include regex (".*one.jar,.*two.jar,.*three.jar,.*thejar.jar")
sysProps("spark.yarn.dist.jars") should include
regex (".*one.jar,.*two.jar,.*three.jar,.*thejar.jar")
sysProps("SPARK_SUBMIT") should be ("true")
sysProps("spark.ui.enabled") should be ("false")
}
Expand Down
7 changes: 7 additions & 0 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,13 @@ If you need a reference to the proper location to put log files in the YARN so t
Comma-separated list of files to be placed in the working directory of each executor.
</td>
</tr>
<tr>
<td><code>spark.yarn.dist.jars</code></td>
<td>(none)</td>
<td>
Comma-separated list of jars to be placed in the working directory of each executor.
</td>
</tr>
<tr>
<td><code>spark.executor.cores</code></td>
<td>1 in YARN mode, all the available cores on the worker in standalone mode.</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ object ApplicationMaster extends Logging {
SignalLogger.register(log)
val amArgs = new ApplicationMasterArguments(args)
SparkHadoopUtil.get.runAsSparkUser { () =>
master = new ApplicationMaster(amArgs, new YarnRMClient(amArgs))
master = new ApplicationMaster(amArgs, new YarnRMClient)
System.exit(master.run())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ class ApplicationMasterArguments(val args: Array[String]) {
var primaryPyFile: String = null
var primaryRFile: String = null
var userArgs: Seq[String] = Nil
var executorMemory = 1024
var executorCores = 1
var propertiesFile: String = null

parseArgs(args.toList)
Expand Down Expand Up @@ -58,18 +56,10 @@ class ApplicationMasterArguments(val args: Array[String]) {
primaryRFile = value
args = tail

case ("--args" | "--arg") :: value :: tail =>
case ("--arg") :: value :: tail =>
userArgsBuffer += value
args = tail

case ("--worker-memory" | "--executor-memory") :: MemoryParam(value) :: tail =>
executorMemory = value
args = tail

case ("--worker-cores" | "--executor-cores") :: IntParam(value) :: tail =>
executorCores = value
args = tail

case ("--properties-file") :: value :: tail =>
propertiesFile = value
args = tail
Expand Down Expand Up @@ -101,12 +91,8 @@ class ApplicationMasterArguments(val args: Array[String]) {
| --class CLASS_NAME Name of your application's main class
| --primary-py-file A main Python file
| --primary-r-file A main R file
| --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to
| place on the PYTHONPATH for Python apps.
| --args ARGS Arguments to be passed to your application's main class.
| --arg ARG Argument to be passed to your application's main class.
| Multiple invocations are possible, each will be passed in order.
| --executor-cores NUM Number of cores for the executors (Default: 1)
| --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G)
| --properties-file FILE Path to a custom Spark properties file.
""".stripMargin)
// scalastyle:on println
Expand Down
Loading