@@ -68,13 +68,15 @@ class SparkEnv (
6868 val shuffleMemoryManager : ShuffleMemoryManager ,
6969 val conf : SparkConf ) extends Logging {
7070
71+ private [spark] var isStopped = false
7172 private val pythonWorkers = mutable.HashMap [(String , Map [String , String ]), PythonWorkerFactory ]()
7273
7374 // A general, soft-reference map for metadata needed during HadoopRDD split computation
7475 // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
7576 private [spark] val hadoopJobMetadata = new MapMaker ().softValues().makeMap[String , Any ]()
7677
7778 private [spark] def stop () {
79+ isStopped = true
7880 pythonWorkers.foreach { case (key, worker) => worker.stop() }
7981 Option (httpFileServer).foreach(_.stop())
8082 mapOutputTracker.stop()
@@ -142,24 +144,63 @@ object SparkEnv extends Logging {
142144 env
143145 }
144146
145- private [spark] def create (
147+ /**
148+ * Create a SparkEnv for the driver.
149+ */
150+ private [spark] def createDriverEnv (
151+ conf : SparkConf ,
152+ isLocal : Boolean ,
153+ listenerBus : LiveListenerBus ): SparkEnv = {
154+ assert(conf.contains(" spark.driver.host" ), " spark.driver.host is not set on the driver!" )
155+ assert(conf.contains(" spark.driver.port" ), " spark.driver.port is not set on the driver!" )
156+ val hostname = conf.get(" spark.driver.host" )
157+ val port = conf.get(" spark.driver.port" ).toInt
158+ create(conf, " <driver>" , hostname, port, true , isLocal, listenerBus)
159+ }
160+
161+ /**
162+ * Create a SparkEnv for an executor.
163+ * In coarse-grained mode, the executor provides an actor system that is already instantiated.
164+ */
165+ private [spark] def createExecutorEnv (
166+ conf : SparkConf ,
167+ executorId : String ,
168+ hostname : String ,
169+ port : Int ,
170+ isLocal : Boolean ,
171+ actorSystem : ActorSystem = null ): SparkEnv = {
172+ create(conf, executorId, hostname, port, false , isLocal, defaultActorSystem = actorSystem)
173+ }
174+
175+ /**
176+ * Helper method to create a SparkEnv for a driver or an executor.
177+ */
178+ private def create (
146179 conf : SparkConf ,
147180 executorId : String ,
148181 hostname : String ,
149182 port : Int ,
150183 isDriver : Boolean ,
151184 isLocal : Boolean ,
152- listenerBus : LiveListenerBus = null ): SparkEnv = {
185+ listenerBus : LiveListenerBus = null ,
186+ defaultActorSystem : ActorSystem = null ): SparkEnv = {
153187
154188 // Listener bus is only used on the driver
155189 if (isDriver) {
156190 assert(listenerBus != null , " Attempted to create driver SparkEnv with null listener bus!" )
157191 }
158192
159193 val securityManager = new SecurityManager (conf)
160- val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
161- val (actorSystem, boundPort) = AkkaUtils .createActorSystem(
162- actorSystemName, hostname, port, conf, securityManager)
194+
195+ // If an existing actor system is already provided, use it.
196+ // This is the case when an executor is launched in coarse-grained mode.
197+ val (actorSystem, boundPort) =
198+ Option (defaultActorSystem) match {
199+ case Some (as) => (as, port)
200+ case None =>
201+ val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
202+ AkkaUtils .createActorSystem(actorSystemName, hostname, port, conf, securityManager)
203+ }
163204
164205 // Figure out which port Akka actually bound to in case the original port is 0 or occupied.
165206 // This is so that we tell the executors the correct port to connect to.
0 commit comments