1818package org .apache .spark .executor
1919
2020import java .nio .ByteBuffer
21- import java .util .concurrent .TimeUnit
2221
2322import scala .concurrent .Await
2423
2524import akka .actor ._
2625import akka .remote ._
2726import akka .pattern .Patterns
28- import akka .util .Timeout
2927
3028import org .apache .spark .{SparkEnv , Logging , SecurityManager , SparkConf }
3129import org .apache .spark .TaskState .TaskState
@@ -39,10 +37,8 @@ private[spark] class CoarseGrainedExecutorBackend(
3937 driverUrl : String ,
4038 executorId : String ,
4139 hostPort : String ,
42- cores : Int )
43- extends Actor
44- with ExecutorBackend
45- with Logging {
40+ cores : Int ,
41+ sparkProperties : Seq [(String , String )]) extends Actor with ExecutorBackend with Logging {
4642
4743 Utils .checkHostPort(hostPort, " Expected hostport" )
4844
@@ -57,7 +53,7 @@ private[spark] class CoarseGrainedExecutorBackend(
5753 }
5854
5955 override def receive = {
60- case RegisteredExecutor (sparkProperties) =>
56+ case RegisteredExecutor =>
6157 logInfo(" Successfully registered with driver" )
6258 // Make this host instead of hostPort ?
6359 executor = new Executor (executorId, Utils .parseHostPort(hostPort)._1, sparkProperties,
@@ -114,20 +110,20 @@ private[spark] object CoarseGrainedExecutorBackend {
114110 val (fetcher, _) = AkkaUtils .createActorSystem(
115111 " driverPropsFetcher" , hostname, 0 , executorConf, new SecurityManager (executorConf))
116112 val driver = fetcher.actorSelection(driverUrl)
117- val timeout = new Timeout ( 5 , TimeUnit . MINUTES )
113+ val timeout = AkkaUtils .askTimeout(executorConf )
118114 val fut = Patterns .ask(driver, RetrieveSparkProps , timeout)
119- val props = Await .result(fut, timeout.duration ).asInstanceOf [Seq [(String , String )]]
115+ val props = Await .result(fut, timeout).asInstanceOf [Seq [(String , String )]]
120116 fetcher.shutdown()
121117
122- // Create a new ActorSystem to run the backend, because we can't create a
123- // SparkEnv / Executor before getting started with all our system properties, etc
118+ // Create a new ActorSystem using driver's Spark properties to run the backend.
124119 val driverConf = new SparkConf ().setAll(props)
125120 val (actorSystem, boundPort) = AkkaUtils .createActorSystem(
126121 " sparkExecutor" , hostname, 0 , driverConf, new SecurityManager (driverConf))
127122 // set it
128123 val sparkHostPort = hostname + " :" + boundPort
129124 actorSystem.actorOf(
130- Props (classOf [CoarseGrainedExecutorBackend ], driverUrl, executorId, sparkHostPort, cores),
125+ Props (classOf [CoarseGrainedExecutorBackend ],
126+ driverUrl, executorId, sparkHostPort, cores, props),
131127 name = " Executor" )
132128 workerUrl.foreach { url =>
133129 actorSystem.actorOf(Props (classOf [WorkerWatcher ], url), name = " WorkerWatcher" )
0 commit comments