Skip to content

Commit 8c459ed

Browse files
committed
Use a common variable for driver/executor actor system names
1 parent 3a92843 commit 8c459ed

File tree

10 files changed

+45
-27
lines changed

10 files changed

+45
-27
lines changed

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,9 @@ object SparkEnv extends Logging {
111111
private val env = new ThreadLocal[SparkEnv]
112112
@volatile private var lastSetSparkEnv : SparkEnv = _
113113

114+
private[spark] val driverActorSystemName = "sparkDriver"
115+
private[spark] val executorActorSystemName = "sparkExecutor"
116+
114117
def set(e: SparkEnv) {
115118
lastSetSparkEnv = e
116119
env.set(e)
@@ -146,7 +149,7 @@ object SparkEnv extends Logging {
146149
}
147150

148151
val securityManager = new SecurityManager(conf)
149-
val actorSystemName = if (isDriver) "sparkDriver" else "sparkExecutor"
152+
val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
150153
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
151154
actorSystemName, hostname, port, conf, securityManager)
152155

core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.scheduler.cluster
2020
import org.apache.hadoop.conf.Configuration
2121
import org.apache.hadoop.fs.{Path, FileSystem}
2222

23-
import org.apache.spark.{Logging, SparkContext}
23+
import org.apache.spark.{Logging, SparkContext, SparkEnv}
2424
import org.apache.spark.scheduler.TaskSchedulerImpl
2525

2626
private[spark] class SimrSchedulerBackend(
@@ -38,8 +38,10 @@ private[spark] class SimrSchedulerBackend(
3838
override def start() {
3939
super.start()
4040

41-
val driverUrl = "akka.tcp://sparkDriver@%s:%s/user/%s".format(
42-
sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port"),
41+
val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
42+
SparkEnv.driverActorSystemName,
43+
sc.conf.get("spark.driver.host"),
44+
sc.conf.get("spark.driver.port"),
4345
CoarseGrainedSchedulerBackend.ACTOR_NAME)
4446

4547
val conf = new Configuration()

core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.scheduler.cluster
1919

20-
import org.apache.spark.{Logging, SparkConf, SparkContext}
20+
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv}
2121
import org.apache.spark.deploy.{ApplicationDescription, Command}
2222
import org.apache.spark.deploy.client.{AppClient, AppClientListener}
2323
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, TaskSchedulerImpl}
@@ -42,8 +42,10 @@ private[spark] class SparkDeploySchedulerBackend(
4242
super.start()
4343

4444
// The endpoint for executors to talk to us
45-
val driverUrl = "akka.tcp://sparkDriver@%s:%s/user/%s".format(
46-
conf.get("spark.driver.host"), conf.get("spark.driver.port"),
45+
val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
46+
SparkEnv.driverActorSystemName,
47+
conf.get("spark.driver.host"),
48+
conf.get("spark.driver.port"),
4749
CoarseGrainedSchedulerBackend.ACTOR_NAME)
4850
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}")
4951
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.mesos.{Scheduler => MScheduler}
2828
import org.apache.mesos._
2929
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
3030

31-
import org.apache.spark.{Logging, SparkContext, SparkException}
31+
import org.apache.spark.{Logging, SparkContext, SparkEnv, SparkException}
3232
import org.apache.spark.scheduler.TaskSchedulerImpl
3333
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
3434

@@ -130,7 +130,8 @@ private[spark] class CoarseMesosSchedulerBackend(
130130
}
131131
val command = CommandInfo.newBuilder()
132132
.setEnvironment(environment)
133-
val driverUrl = "akka.tcp://sparkDriver@%s:%s/user/%s".format(
133+
val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
134+
SparkEnv.driverActorSystemName,
134135
conf.get("spark.driver.host"),
135136
conf.get("spark.driver.port"),
136137
CoarseGrainedSchedulerBackend.ACTOR_NAME)

core/src/main/scala/org/apache/spark/util/AkkaUtils.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import akka.pattern.ask
2727
import com.typesafe.config.ConfigFactory
2828
import org.apache.log4j.{Level, Logger}
2929

30-
import org.apache.spark.{SparkException, Logging, SecurityManager, SparkConf}
30+
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv, SparkException}
3131

3232
/**
3333
* Various utility classes for working with Akka.
@@ -192,10 +192,11 @@ private[spark] object AkkaUtils extends Logging {
192192
}
193193

194194
def makeDriverRef(name: String, conf: SparkConf, actorSystem: ActorSystem): ActorRef = {
195+
val driverActorSystemName = SparkEnv.driverActorSystemName
195196
val driverHost: String = conf.get("spark.driver.host", "localhost")
196197
val driverPort: Int = conf.getInt("spark.driver.port", 7077)
197198
Utils.checkHost(driverHost, "Expected hostname")
198-
val url = s"akka.tcp://sparkDriver@$driverHost:$driverPort/user/$name"
199+
val url = s"akka.tcp://$driverActorSystemName@$driverHost:$driverPort/user/$name"
199200
val timeout = AkkaUtils.lookupTimeout(conf)
200201
logInfo(s"Connecting to $name: $url")
201202
Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,21 @@ package org.apache.spark.streaming.receiver
2020
import java.nio.ByteBuffer
2121
import java.util.concurrent.atomic.AtomicLong
2222

23-
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
23+
import scala.collection.mutable.ArrayBuffer
2424
import scala.concurrent.Await
2525

2626
import akka.actor.{Actor, Props}
2727
import akka.pattern.ask
2828

29+
import com.google.common.base.Throwables
30+
2931
import org.apache.spark.{Logging, SparkEnv}
30-
import org.apache.spark.storage.StreamBlockId
3132
import org.apache.spark.streaming.scheduler._
3233
import org.apache.spark.util.{Utils, AkkaUtils}
3334
import org.apache.spark.storage.StreamBlockId
3435
import org.apache.spark.streaming.scheduler.DeregisterReceiver
3536
import org.apache.spark.streaming.scheduler.AddBlock
36-
import scala.Some
3737
import org.apache.spark.streaming.scheduler.RegisterReceiver
38-
import com.google.common.base.Throwables
3938

4039
/**
4140
* Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]]
@@ -56,7 +55,8 @@ private[streaming] class ReceiverSupervisorImpl(
5655
private val trackerActor = {
5756
val ip = env.conf.get("spark.driver.host", "localhost")
5857
val port = env.conf.getInt("spark.driver.port", 7077)
59-
val url = "akka.tcp://sparkDriver@%s:%s/user/ReceiverTracker".format(ip, port)
58+
val url = "akka.tcp://%s@%s:%s/user/ReceiverTracker".format(
59+
SparkEnv.driverActorSystemName, ip, port)
6060
env.actorSystem.actorSelection(url)
6161
}
6262

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
2828
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
2929
import akka.actor._
3030
import akka.remote._
31-
import org.apache.spark.{Logging, SecurityManager, SparkConf}
31+
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
3232
import org.apache.spark.util.{Utils, AkkaUtils}
3333
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
3434
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
@@ -229,8 +229,11 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
229229
sparkConf.set("spark.driver.host", driverHost)
230230
sparkConf.set("spark.driver.port", driverPort.toString)
231231

232-
val driverUrl = "akka.tcp://sparkDriver@%s:%s/user/%s".format(
233-
driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
232+
val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
233+
SparkEnv.driverActorSystemName,
234+
driverHost,
235+
driverPort.toString,
236+
CoarseGrainedSchedulerBackend.ACTOR_NAME)
234237

235238
actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
236239
}

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import scala.collection
2626
import scala.collection.JavaConversions._
2727
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
2828

29-
import org.apache.spark.{Logging, SparkConf}
29+
import org.apache.spark.{Logging, SparkConf, SparkEnv}
3030
import org.apache.spark.scheduler.{SplitInfo,TaskSchedulerImpl}
3131
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
3232
import org.apache.spark.util.Utils
@@ -245,8 +245,10 @@ private[yarn] class YarnAllocationHandler(
245245
// Deallocate + allocate can result in reusing id's wrongly - so use a different counter
246246
// (executorIdCounter)
247247
val executorId = executorIdCounter.incrementAndGet().toString
248-
val driverUrl = "akka.tcp://sparkDriver@%s:%s/user/%s".format(
249-
sparkConf.get("spark.driver.host"), sparkConf.get("spark.driver.port"),
248+
val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
249+
SparkEnv.driverActorSystemName,
250+
sparkConf.get("spark.driver.host"),
251+
sparkConf.get("spark.driver.port"),
250252
CoarseGrainedSchedulerBackend.ACTOR_NAME)
251253

252254
logInfo("launching container on " + containerId + " host " + executorHostname)

yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.hadoop.yarn.api.protocolrecords._
2525
import org.apache.hadoop.yarn.conf.YarnConfiguration
2626
import akka.actor._
2727
import akka.remote._
28-
import org.apache.spark.{Logging, SecurityManager, SparkConf}
28+
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
2929
import org.apache.spark.util.{Utils, AkkaUtils}
3030
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
3131
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
@@ -193,8 +193,11 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
193193
sparkConf.set("spark.driver.host", driverHost)
194194
sparkConf.set("spark.driver.port", driverPort.toString)
195195

196-
val driverUrl = "akka.tcp://sparkDriver@%s:%s/user/%s".format(
197-
driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
196+
val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
197+
SparkEnv.driverActorSystemName,
198+
driverHost,
199+
driverPort.toString,
200+
CoarseGrainedSchedulerBackend.ACTOR_NAME)
198201

199202
actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
200203
}

yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import scala.collection
2626
import scala.collection.JavaConversions._
2727
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
2828

29-
import org.apache.spark.{Logging, SparkConf}
29+
import org.apache.spark.{Logging, SparkConf, SparkEnv}
3030
import org.apache.spark.scheduler.{SplitInfo,TaskSchedulerImpl}
3131
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
3232
import org.apache.spark.util.Utils
@@ -262,7 +262,8 @@ private[yarn] class YarnAllocationHandler(
262262
numExecutorsRunning.decrementAndGet()
263263
} else {
264264
val executorId = executorIdCounter.incrementAndGet().toString
265-
val driverUrl = "akka.tcp://sparkDriver@%s:%s/user/%s".format(
265+
val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
266+
SparkEnv.driverActorSystemName,
266267
sparkConf.get("spark.driver.host"),
267268
sparkConf.get("spark.driver.port"),
268269
CoarseGrainedSchedulerBackend.ACTOR_NAME)

0 commit comments

Comments
 (0)