Skip to content

Commit b08893b

Browse files
committed
Additional improvements.
- Made driver & executor options consistent. - Some doc improvements to YARN code. - Handled special flags on YARN.
1 parent ace4ead commit b08893b

File tree

15 files changed

+96
-53
lines changed

15 files changed

+96
-53
lines changed

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -224,8 +224,8 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
224224
throw new Exception(msg)
225225
}
226226
if (javaOpts.contains("-Xmx") || javaOpts.contains("-Xms")) {
227-
val msg = s"$executorOptsKey is not allowed to alter memory settings (was '$javaOpts'). Please use " +
228-
"spark.executor.memory."
227+
val msg = s"$executorOptsKey is not allowed to alter memory settings (was '$javaOpts'). Use " +
228+
"spark.executor.memory instead."
229229
throw new Exception(msg)
230230
}
231231
}

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -246,27 +246,28 @@ class SparkContext(config: SparkConf) extends Logging {
246246
.map(Utils.memoryStringToMb)
247247
.getOrElse(512)
248248

249-
// Environment variables to pass to our executors
250-
private[spark] val executorEnvs = HashMap[String, String]()
249+
// Environment variables to pass to our executors.
250+
// NOTE: This should only be used for test related settings.
251+
private[spark] val testExecutorEnvs = HashMap[String, String]()
251252

252253
// Convert java options to env vars as a work around
253254
// since we can't set env vars directly in sbt.
254-
for { (envKey, propKey) <- Seq(("SPARK_HOME", "spark.home"), ("SPARK_TESTING", "spark.testing"))
255+
for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))
255256
value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
256-
executorEnvs(envKey) = value
257+
testExecutorEnvs(envKey) = value
257258
}
258259
// The Mesos scheduler backend relies on this environment variable to set executor memory.
259260
// TODO: Set this only in the Mesos scheduler.
260-
executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
261-
executorEnvs ++= conf.getExecutorEnv
261+
testExecutorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
262+
testExecutorEnvs ++= conf.getExecutorEnv
262263

263264
// Set SPARK_USER for user who is running SparkContext.
264265
val sparkUser = Option {
265266
Option(System.getProperty("user.name")).getOrElse(System.getenv("SPARK_USER"))
266267
}.getOrElse {
267268
SparkContext.SPARK_UNKNOWN_USER
268269
}
269-
executorEnvs("SPARK_USER") = sparkUser
270+
testExecutorEnvs("SPARK_USER") = sparkUser
270271

271272
// Create and start the scheduler
272273
private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)

core/src/main/scala/org/apache/spark/deploy/Client.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,10 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
5454
System.getenv().foreach{case (k, v) => env(k) = v}
5555

5656
val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
57-
val classPathEntries = sys.props.get("spark.driver.classPath").toSeq.flatMap { cp =>
57+
val classPathEntries = sys.props.get("spark.driver.extraClassPath").toSeq.flatMap { cp =>
5858
cp.split(java.io.File.pathSeparator)
5959
}
60-
val libraryPathEntries = sys.props.get("spark.driver.libraryPath").toSeq.flatMap { cp =>
60+
val libraryPathEntries = sys.props.get("spark.driver.extraLibraryPath").toSeq.flatMap { cp =>
6161
cp.split(java.io.File.pathSeparator)
6262
}
6363
val javaOpts = sys.props.get("spark.driver.extraJavaOptions")

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,11 +144,11 @@ object SparkSubmit {
144144
new OptionAssigner(appArgs.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"),
145145

146146
new OptionAssigner(appArgs.driverExtraClassPath, STANDALONE | YARN, true,
147-
sysProp = "spark.driver.classPath"),
147+
sysProp = "spark.driver.extraClassPath"),
148148
new OptionAssigner(appArgs.driverExtraJavaOptions, STANDALONE | YARN, true,
149-
sysProp = "spark.driver.javaOpts"),
149+
sysProp = "spark.driver.extraJavaOptions"),
150150
new OptionAssigner(appArgs.driverExtraLibraryPath, STANDALONE | YARN, true,
151-
sysProp = "spark.driver.libraryPath"),
151+
sysProp = "spark.driver.extraLibraryPath"),
152152

153153
new OptionAssigner(appArgs.driverMemory, YARN, true, clOption = "--driver-memory"),
154154
new OptionAssigner(appArgs.name, YARN, true, clOption = "--name"),

core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,16 @@ private[spark] class SparkDeploySchedulerBackend(
4747
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}",
4848
"{{CORES}}", "{{WORKER_URL}}")
4949
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
50+
val classPathEntries = sys.props.get("spark.executor.extraClassPath").toSeq.flatMap { cp =>
51+
cp.split(java.io.File.pathSeparator)
52+
}
53+
val libraryPathEntries = sys.props.get("spark.executor.extraLibraryPath").toSeq.flatMap { cp =>
54+
cp.split(java.io.File.pathSeparator)
55+
}
5056

51-
// TODO (pwendell) LOOK AT THIS
5257
val command = Command(
53-
"org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs,
54-
Seq(), Seq(), extraJavaOpts)
58+
"org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.testExecutorEnvs,
59+
classPathEntries, libraryPathEntries, extraJavaOpts)
5560
val sparkHome = sc.getSparkHome()
5661
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
5762
sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,15 @@ private[spark] class CoarseMesosSchedulerBackend(
111111

112112
def createCommand(offer: Offer, numCores: Int): CommandInfo = {
113113
val environment = Environment.newBuilder()
114-
sc.executorEnvs.foreach { case (key, value) =>
114+
val extraClassPath = conf.getOption("spark.executor.extraClassPath")
115+
extraClassPath.foreach { cp =>
116+
envrionment.addVariables(Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build())
117+
}
118+
val extraJavaOpts = conf.getOption("spark.executor.extraJavaOptions", "")
119+
val extraLibraryPath = conf.getOption("spark.executor.extraLibraryPath").map(p => s"-Djava.library.path=$p")
120+
val extraOpts = Seq(extraJavaOpts, extraLibraryPath).flatten.mkString(" ")
121+
122+
sc.testExecutorEnvs.foreach { case (key, value) =>
115123
environment.addVariables(Environment.Variable.newBuilder()
116124
.setName(key)
117125
.setValue(value)
@@ -123,7 +131,7 @@ private[spark] class CoarseMesosSchedulerBackend(
123131
conf.get("spark.driver.host"),
124132
conf.get("spark.driver.port"),
125133
CoarseGrainedSchedulerBackend.ACTOR_NAME)
126-
val extraOpts = conf.get("spark.executor.extraJavaOptions")
134+
127135
val uri = conf.get("spark.executor.uri", null)
128136
if (uri == null) {
129137
val runScript = new File(sparkHome, "./bin/spark-class").getCanonicalPath

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ private[spark] class MesosSchedulerBackend(
9090
"Spark home is not set; set it through the spark.home system " +
9191
"property, the SPARK_HOME environment variable or the SparkContext constructor"))
9292
val environment = Environment.newBuilder()
93-
sc.executorEnvs.foreach { case (key, value) =>
93+
sc.testExecutorEnvs.foreach { case (key, value) =>
9494
environment.addVariables(Environment.Variable.newBuilder()
9595
.setName(key)
9696
.setValue(value)

docs/configuration.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -655,6 +655,22 @@ Apart from these, the following properties are also available, and may be useful
655655
option.
656656
</td>
657657
</tr>
658+
<tr>
659+
<td>spark.executor.extraClassPath</td>
660+
<td>(none)</td>
661+
<td>
662+
Extra classpath entries to append to the classpath of executors. This exists primarily
663+
for backwards-compatiblity with older versions of Spark. Users typically should not need
664+
to set this option.
665+
</td>
666+
</tr>
667+
<tr>
668+
<td>spark.executor.extraLibraryPath</td>
669+
<td>(none)</td>
670+
<td>
671+
Set a special library path to use when launching executor JVM's.
672+
</td>
673+
</tr>
658674

659675
</table>
660676

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ import org.apache.hadoop.yarn.util.{Apps, Records}
3333

3434
import org.apache.spark.{Logging, SparkConf}
3535

36-
36+
/**
37+
* Version of [[org.apache.spark.deploy.yarn.ClientBase]] tailored to YARN's alpha API.
38+
*/
3739
class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: SparkConf)
3840
extends YarnClientImpl with ClientBase with Logging {
3941

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,11 @@ import org.apache.spark.{Logging, SparkConf}
3939

4040
/**
4141
* The entry point (starting in Client#main() and Client#run()) for launching Spark on YARN. The
42-
* Client submits an application to the global ResourceManager to launch Spark's ApplicationMaster,
43-
* which will launch a Spark master process and negotiate resources throughout its duration.
42+
* Client submits an application to the YARN ResourceManager.
43+
*
44+
* Depending on the deployment mode this will launch one of two application master classes:
45+
* 1. In standalone mode, it will launch an [[org.apache.spark.deploy.yarn.ApplicationMaster]] which embeds a driver.
46+
* 2. In client mode, it will launch an [[org.apache.spark.deploy.yarn.ExecutorLauncher]].
4447
*/
4548
trait ClientBase extends Logging {
4649
val args: ClientArguments
@@ -259,8 +262,8 @@ trait ClientBase extends Logging {
259262

260263
val env = new HashMap[String, String]()
261264

262-
ClientBase.populateClasspath(yarnConf, sparkConf, localResources.contains(ClientBase.LOG4J_PROP),
263-
env)
265+
val extraCp = sparkConf.getOption("spark.driver.extraClassPath")
266+
ClientBase.populateClasspath(yarnConf, sparkConf, localResources.contains(ClientBase.LOG4J_PROP), env, extraCp)
264267
env("SPARK_YARN_MODE") = "true"
265268
env("SPARK_YARN_STAGING_DIR") = stagingDir
266269
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
@@ -272,9 +275,6 @@ trait ClientBase extends Logging {
272275
// Allow users to specify some environment variables.
273276
Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
274277

275-
// Add each SPARK_* key to the environment.
276-
System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
277-
278278
env
279279
}
280280

@@ -427,29 +427,29 @@ object ClientBase {
427427
}
428428
}
429429

430-
def populateClasspath(conf: Configuration, sparkConf: SparkConf, addLog4j: Boolean, env: HashMap[String, String]) {
431-
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
430+
def populateClasspath(conf: Configuration, sparkConf: SparkConf, addLog4j: Boolean, env: HashMap[String, String],
431+
extraClassPath: Option[String] = None) {
432+
433+
/** Add entry to the classpath. */
434+
def addClasspathEntry(entry: String) = pps.addToEnvironment(env, Environment.CLASSPATH.name, entry)
435+
/** Add entry to the classpath. Interpreted as a path relative to the working directory. */
436+
def addPwdClasspathEntry(path: String) = addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + entry)
437+
438+
extraClassPath.foreach(addClasspathEntry)
439+
440+
addClasspathEntry(Environment.PWD.$())
432441
// If log4j present, ensure ours overrides all others
433-
if (addLog4j) {
434-
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
435-
Path.SEPARATOR + LOG4J_PROP)
436-
}
442+
if (addLog4j) addPwdClasspathEntry(LOG4J_PROP)
437443
// Normally the users app.jar is last in case conflicts with spark jars
438-
val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false")
439-
.toBoolean
440-
if (userClasspathFirst) {
441-
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
442-
Path.SEPARATOR + APP_JAR)
443-
}
444-
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
445-
Path.SEPARATOR + SPARK_JAR)
446-
ClientBase.populateHadoopClasspath(conf, env)
447-
448-
if (!userClasspathFirst) {
449-
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
450-
Path.SEPARATOR + APP_JAR)
444+
if (sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean) {
445+
addPwdClasspathEntry(APP_JAR)
446+
addPwdClasspathEntry(SPARK_JAR)
447+
ClientBase.populateHadoopClasspath(conf, env)
448+
} else {
449+
addPwdClasspathEntry(SPARK_JAR)
450+
ClientBase.populateHadoopClasspath(conf, env)
451+
addPwdClasspathEntry(APP_JAR)
451452
}
452-
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
453-
Path.SEPARATOR + "*")
453+
addPwdClasspathEntry("*")
454454
}
455455
}

0 commit comments

Comments
 (0)