Skip to content

Commit a853e74

Browse files
author
Marcelo Vanzin
committed
Re-work CoarseGrainedExecutorBackend command line arguments.
Preparation for changes to come.
1 parent 89522ef commit a853e74

File tree

5 files changed

+88
-33
lines changed

5 files changed

+88
-33
lines changed

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 57 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.executor
1919

2020
import java.nio.ByteBuffer
2121

22+
import scala.collection.mutable
2223
import scala.concurrent.Await
2324

2425
import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
@@ -147,20 +148,64 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
147148
}
148149

149150
def main(args: Array[String]) {
150-
args.length match {
151-
case x if x < 5 =>
152-
System.err.println(
151+
var driverUrl: String = null
152+
var executorId: String = null
153+
var hostname: String = null
154+
var cores: Int = 0
155+
var appId: String = null
156+
var workerUrl: Option[String] = None
157+
158+
var argv = args.toList
159+
while (!argv.isEmpty) {
160+
argv match {
161+
case ("--driver-url") :: value :: tail =>
162+
driverUrl = value
163+
argv = tail
164+
case ("--executor-id") :: value :: tail =>
165+
executorId = value
166+
argv = tail
167+
case ("--hostname") :: value :: tail =>
168+
hostname = value
169+
argv = tail
170+
case ("--cores") :: value :: tail =>
171+
cores = value.toInt
172+
argv = tail
173+
case ("--app-id") :: value :: tail =>
174+
appId = value
175+
argv = tail
176+
case ("--worker-url") :: value :: tail =>
153177
// Worker url is used in spark standalone mode to enforce fate-sharing with worker
154-
"Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> <hostname> " +
155-
"<cores> <appid> [<workerUrl>] ")
156-
System.exit(1)
178+
workerUrl = Some(value)
179+
argv = tail
180+
case Nil =>
181+
case tail =>
182+
System.err.println(s"Unrecognized options: ${tail.mkString(" ")}")
183+
printUsageAndExit()
184+
}
185+
}
157186

158-
// NB: These arguments are provided by SparkDeploySchedulerBackend (for standalone mode)
159-
// and CoarseMesosSchedulerBackend (for mesos mode).
160-
case 5 =>
161-
run(args(0), args(1), args(2), args(3).toInt, args(4), None)
162-
case x if x > 5 =>
163-
run(args(0), args(1), args(2), args(3).toInt, args(4), Some(args(5)))
187+
if (driverUrl == null || executorId == null || hostname == null || cores <= 0 ||
188+
appId == null) {
189+
printUsageAndExit()
164190
}
191+
192+
run(driverUrl, executorId, hostname, cores, appId, workerUrl)
165193
}
194+
195+
private def printUsageAndExit() = {
196+
System.err.println(
197+
"""
198+
|"Usage: CoarseGrainedExecutorBackend [options]
199+
|
200+
| Options are:
201+
| --driver-url <driverUrl>
202+
| --executor-id <executorId>
203+
| --hostname <hostname>
204+
| --cores <cores>
205+
| --app-id <appid>
206+
| [--worker-id <workerUrl>]
207+
|""".stripMargin)
208+
System.exit(1)
209+
}
210+
166211
}

core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,13 @@ private[spark] class SparkDeploySchedulerBackend(
5151
conf.get("spark.driver.host"),
5252
conf.get("spark.driver.port"),
5353
CoarseGrainedSchedulerBackend.ACTOR_NAME)
54-
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{APP_ID}}",
55-
"{{WORKER_URL}}")
54+
val args = Seq(
55+
"--driver-url", driverUrl,
56+
"--executor-id", "{{EXECUTOR_ID}}",
57+
"--hostname", "{{HOSTNAME}}",
58+
"--cores", "{{CORES}}",
59+
"--app-id", "{{APP_ID}}",
60+
"--worker-url", "{{WORKER_URL}}")
5661
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
5762
.map(Utils.splitCommandString).getOrElse(Seq.empty)
5863
val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath").toSeq.flatMap { cp =>

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -153,18 +153,25 @@ private[spark] class CoarseMesosSchedulerBackend(
153153
if (uri == null) {
154154
val runScript = new File(executorSparkHome, "./bin/spark-class").getCanonicalPath
155155
command.setValue(
156-
"%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d %s".format(
157-
prefixEnv, runScript, driverUrl, offer.getSlaveId.getValue,
158-
offer.getHostname, numCores, appId))
156+
"%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend"
157+
.format(prefixEnv, runScript) +
158+
s" --driver-url $driverUrl" +
159+
s" --executor-id ${offer.getSlaveId.getValue}" +
160+
s" --hostname ${offer.getHostname}" +
161+
s" --cores $numCores" +
162+
s" --app-id $appId")
159163
} else {
160164
// Grab everything to the first '.'. We'll use that and '*' to
161165
// glob the directory "correctly".
162166
val basename = uri.split('/').last.split('.').head
163167
command.setValue(
164-
("cd %s*; %s " +
165-
"./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d %s")
166-
.format(basename, prefixEnv, driverUrl, offer.getSlaveId.getValue,
167-
offer.getHostname, numCores, appId))
168+
s"cd $basename*; $prefixEnv " +
169+
"./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" +
170+
s" --driver-url $driverUrl" +
171+
s" --executor-id ${offer.getSlaveId.getValue}" +
172+
s" --hostname ${offer.getHostname}" +
173+
s" --cores $numCores" +
174+
s" --app-id $appId")
168175
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
169176
}
170177
command.build()

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -661,13 +661,11 @@ private[spark] object ClientBase extends Logging {
661661
/**
662662
* Populate the classpath entry in the given environment map.
663663
*
664-
* This does different things depending on the job configuration.
665-
* - if `spark.files.userClassPathFirst` is set to true, only Spark and other framework jars
666-
* (such as Hadoop/Yarn jars) are added to the classpath. User jars and files, and also the
667-
* extra class path, are handled by ChildExecutorURLClassLoader.
668-
* - otherwise, user jars, files and the extra class path are added to the container's class path.
669-
* The position of the user classes in the classpath depends on the value of the
670-
* `spark.yarn.user.classpath.first` configuration.
664+
* Class path isolation, when enabled, makes the user-added jars be loaded from a different
665+
* class loader than other class path entries. The extra class path and other uploaded files
666+
* are still made available through the system class path.
667+
*
668+
* @param args Client arguments (when starting the AM) or null (when starting executors).
671669
*/
672670
def populateClasspath(
673671
args: ClientArguments,

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -120,11 +120,11 @@ trait ExecutorRunnableUtil extends Logging {
120120
"-XX:OnOutOfMemoryError='kill %p'") ++
121121
javaOpts ++
122122
Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
123-
masterAddress.toString,
124-
slaveId.toString,
125-
hostname.toString,
126-
executorCores.toString,
127-
appId,
123+
"--driver-url", masterAddress.toString,
124+
"--executor-id", slaveId.toString,
125+
"--hostname", hostname.toString,
126+
"--cores", executorCores.toString,
127+
"--app-id", appId,
128128
"1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
129129
"2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
130130

0 commit comments

Comments
 (0)