Skip to content

Commit 4ab696a

Browse files
committed
bootstrap to retrieve driver spark conf
1 parent 3870248 commit 4ab696a

File tree

3 files changed

+37
-20
lines changed

3 files changed

+37
-20
lines changed

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,14 @@
1818
package org.apache.spark.executor
1919

2020
import java.nio.ByteBuffer
21+
import java.util.concurrent.TimeUnit
22+
23+
import scala.concurrent.Await
2124

2225
import akka.actor._
2326
import akka.remote._
27+
import akka.pattern.Patterns
28+
import akka.util.Timeout
2429

2530
import org.apache.spark.{SparkEnv, Logging, SecurityManager, SparkConf}
2631
import org.apache.spark.TaskState.TaskState
@@ -101,26 +106,33 @@ private[spark] object CoarseGrainedExecutorBackend {
101106
workerUrl: Option[String]) {
102107

103108
SparkHadoopUtil.get.runAsSparkUser { () =>
104-
// Debug code
105-
Utils.checkHost(hostname)
106-
107-
val conf = new SparkConf
108-
// Create a new ActorSystem to run the backend, because we can't create a
109-
// SparkEnv / Executor before getting started with all our system properties, etc
110-
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
111-
conf, new SecurityManager(conf))
112-
// set it
113-
val sparkHostPort = hostname + ":" + boundPort
114-
actorSystem.actorOf(
115-
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId,
116-
sparkHostPort, cores),
117-
name = "Executor")
118-
workerUrl.foreach {
119-
url =>
120-
actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")
121-
}
122-
actorSystem.awaitTermination()
123-
109+
// Debug code
110+
Utils.checkHost(hostname)
111+
112+
// Bootstrap to fetch the driver's Spark properties.
113+
val executorConf = new SparkConf
114+
val (fetcher, _) = AkkaUtils.createActorSystem(
115+
"driverConfFetcher", hostname, 0, executorConf, new SecurityManager(executorConf))
116+
val driver = fetcher.actorSelection(driverUrl)
117+
val timeout = new Timeout(5, TimeUnit.MINUTES)
118+
val fut = Patterns.ask(driver, RetrieveSparkProps, timeout)
119+
val props = Await.result(fut, timeout.duration).asInstanceOf[Seq[(String, String)]]
120+
fetcher.shutdown()
121+
122+
// Create a new ActorSystem to run the backend, because we can't create a
123+
// SparkEnv / Executor before getting started with all our system properties, etc
124+
val driverConf = new SparkConf().setAll(props)
125+
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
126+
"sparkExecutor", hostname, 0, driverConf, new SecurityManager(driverConf))
127+
// set it
128+
val sparkHostPort = hostname + ":" + boundPort
129+
actorSystem.actorOf(
130+
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores),
131+
name = "Executor")
132+
workerUrl.foreach { url =>
133+
actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")
134+
}
135+
actorSystem.awaitTermination()
124136
}
125137
}
126138

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,14 @@ import java.nio.ByteBuffer
2222
import org.apache.spark.TaskState.TaskState
2323
import org.apache.spark.scheduler.TaskDescription
2424
import org.apache.spark.util.{SerializableBuffer, Utils}
25+
import org.apache.spark.SparkConf
2526

2627
private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable
2728

2829
private[spark] object CoarseGrainedClusterMessages {
2930

31+
case object RetrieveSparkProps extends CoarseGrainedClusterMessage
32+
3033
// Driver to executors
3134
case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage
3235

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
124124
addressToExecutorId.get(address).foreach(removeExecutor(_,
125125
"remote Akka client disassociated"))
126126

127+
case RetrieveSparkProps =>
128+
sender ! sparkProperties
127129
}
128130

129131
// Make fake resource offers on all executors

0 commit comments

Comments
 (0)