Skip to content

Commit 148af60

Browse files
andrewor14pwendell
authored andcommitted
[SPARK-2454] Do not ship spark home to Workers
When standalone Workers launch executors, they inherit the Spark home set by the driver. This means if the worker machines do not share the same directory structure as the driver node, the Workers will attempt to run scripts (e.g. bin/compute-classpath.sh) that do not exist locally and fail. This is a common scenario if the driver is launched from outside of the cluster. The solution is to simply not pass the driver's Spark home to the Workers. This PR further makes an attempt to avoid overloading the usages of `spark.home`, which is now only used for setting executor Spark home on Mesos and in python. This is based on top of #1392 and originally reported by YanTangZhai. Tested on standalone cluster. Author: Andrew Or <[email protected]> Closes #1734 from andrewor14/spark-home-reprise and squashes the following commits: f71f391 [Andrew Or] Revert changes in python 1c2532c [Andrew Or] Merge branch 'master' of github.com:apache/spark into spark-home-reprise 188fc5d [Andrew Or] Avoid using spark.home where possible 09272b7 [Andrew Or] Always use Worker's working directory as spark home
1 parent d934801 commit 148af60

File tree

13 files changed

+15
-26
lines changed

13 files changed

+15
-26
lines changed

core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ private[spark] class ApplicationDescription(
2222
val maxCores: Option[Int],
2323
val memoryPerSlave: Int,
2424
val command: Command,
25-
val sparkHome: Option[String],
2625
var appUiUrl: String,
2726
val eventLogDir: Option[String] = None)
2827
extends Serializable {

core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ private[spark] object JsonProtocol {
5656
("cores" -> obj.maxCores) ~
5757
("memoryperslave" -> obj.memoryPerSlave) ~
5858
("user" -> obj.user) ~
59-
("sparkhome" -> obj.sparkHome) ~
6059
("command" -> obj.command.toString)
6160
}
6261

core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,8 @@ private[spark] object TestClient {
4848
val conf = new SparkConf
4949
val (actorSystem, _) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0,
5050
conf = conf, securityManager = new SecurityManager(conf))
51-
val desc = new ApplicationDescription(
52-
"TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map(),
53-
Seq(), Seq(), Seq()), Some("dummy-spark-home"), "ignored")
51+
val desc = new ApplicationDescription("TestClient", Some(1), 512,
52+
Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(), Seq(), Seq()), "ignored")
5453
val listener = new TestListener
5554
val client = new AppClient(actorSystem, Array(url), desc, listener, new SparkConf)
5655
client.start()

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ private[spark] class Worker(
8181
@volatile var registered = false
8282
@volatile var connected = false
8383
val workerId = generateWorkerId()
84-
val sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse("."))
84+
val sparkHome =
85+
new File(sys.props.get("spark.test.home").orElse(sys.env.get("SPARK_HOME")).getOrElse("."))
8586
var workDir: File = null
8687
val executors = new HashMap[String, ExecutorRunner]
8788
val finishedExecutors = new HashMap[String, ExecutorRunner]
@@ -233,9 +234,7 @@ private[spark] class Worker(
233234
try {
234235
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
235236
val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
236-
self, workerId, host,
237-
appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome),
238-
workDir, akkaUrl, conf, ExecutorState.RUNNING)
237+
self, workerId, host, sparkHome, workDir, akkaUrl, conf, ExecutorState.RUNNING)
239238
executors(appId + "/" + execId) = manager
240239
manager.start()
241240
coresUsed += cores_

core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,8 @@ private[spark] class SparkDeploySchedulerBackend(
6060
val javaOpts = sparkJavaOpts ++ extraJavaOpts
6161
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
6262
args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts)
63-
val sparkHome = sc.getSparkHome()
6463
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
65-
sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))
64+
sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))
6665

6766
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
6867
client.start()

core/src/test/scala/org/apache/spark/DriverSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import scala.language.postfixOps
3434
class DriverSuite extends FunSuite with Timeouts {
3535

3636
test("driver should exit after finishing") {
37-
val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get
37+
val sparkHome = sys.props("spark.test.home")
3838
// Regression test for SPARK-530: "Spark driver process doesn't exit after finishing"
3939
val masters = Table(("master"), ("local"), ("local-cluster[2,1,512]"))
4040
forAll(masters) { (master: String) =>

core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ class JsonProtocolSuite extends FunSuite {
8989

9090
def createAppDesc(): ApplicationDescription = {
9191
val cmd = new Command("mainClass", List("arg1", "arg2"), Map(), Seq(), Seq(), Seq())
92-
new ApplicationDescription("name", Some(4), 1234, cmd, Some("sparkHome"), "appUiUrl")
92+
new ApplicationDescription("name", Some(4), 1234, cmd, "appUiUrl")
9393
}
9494

9595
def createAppInfo() : ApplicationInfo = {
@@ -169,8 +169,7 @@ object JsonConstants {
169169
val appDescJsonStr =
170170
"""
171171
|{"name":"name","cores":4,"memoryperslave":1234,
172-
|"user":"%s","sparkhome":"sparkHome",
173-
|"command":"Command(mainClass,List(arg1, arg2),Map(),List(),List(),List())"}
172+
|"user":"%s","command":"Command(mainClass,List(arg1, arg2),Map(),List(),List(),List())"}
174173
""".format(System.getProperty("user.name", "<unknown>")).stripMargin
175174

176175
val executorRunnerJsonStr =

core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ class SparkSubmitSuite extends FunSuite with Matchers {
295295

296296
// NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
297297
def runSparkSubmit(args: Seq[String]): String = {
298-
val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get
298+
val sparkHome = sys.props("spark.test.home")
299299
Utils.executeAndGetOutput(
300300
Seq("./bin/spark-submit") ++ args,
301301
new File(sparkHome),

core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,11 @@ import org.apache.spark.SparkConf
2727
class ExecutorRunnerTest extends FunSuite {
2828
test("command includes appId") {
2929
def f(s:String) = new File(s)
30-
val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home"))
30+
val sparkHome = sys.props("spark.test.home")
3131
val appDesc = new ApplicationDescription("app name", Some(8), 500,
32-
Command("foo", Seq(), Map(), Seq(), Seq(), Seq()),
33-
sparkHome, "appUiUrl")
32+
Command("foo", Seq(), Map(), Seq(), Seq(), Seq()), "appUiUrl")
3433
val appId = "12345-worker321-9876"
35-
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome.getOrElse(".")),
34+
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome),
3635
f("ooga"), "blah", new SparkConf, ExecutorState.RUNNING)
3736

3837
assert(er.getCommandSeq.last === appId)

project/SparkBuild.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ object TestSettings {
328328
lazy val settings = Seq (
329329
// Fork new JVMs for tests and set Java options for those
330330
fork := true,
331-
javaOptions in Test += "-Dspark.home=" + sparkHome,
331+
javaOptions in Test += "-Dspark.test.home=" + sparkHome,
332332
javaOptions in Test += "-Dspark.testing=1",
333333
javaOptions in Test += "-Dsun.io.serialization.extendedDebugInfo=true",
334334
javaOptions in Test ++= System.getProperties.filter(_._1 startsWith "spark")

0 commit comments

Comments
 (0)