Skip to content

Commit b776817

Browse files
committed
Update Master, Worker, Client, AppClient and related classes to use RpcEndpoint
1 parent b65bad6 commit b776817

26 files changed

+729
-610
lines changed

core/src/main/scala/org/apache/spark/deploy/Client.scala

Lines changed: 68 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -17,33 +17,38 @@
1717

1818
package org.apache.spark.deploy
1919

20-
import scala.concurrent._
20+
import scala.concurrent.ExecutionContext
21+
import scala.util.{Failure, Success}
2122

22-
import akka.actor._
23-
import akka.pattern.ask
24-
import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent}
2523
import org.apache.log4j.{Level, Logger}
2624

25+
import org.apache.spark.rpc.{RpcEndpointRef, RpcAddress, RpcEnv, ThreadSafeRpcEndpoint}
2726
import org.apache.spark.{Logging, SecurityManager, SparkConf}
2827
import org.apache.spark.deploy.DeployMessages._
2928
import org.apache.spark.deploy.master.{DriverState, Master}
30-
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, Utils}
29+
import org.apache.spark.util.Utils
3130

3231
/**
3332
* Proxy that relays messages to the driver.
3433
*/
35-
private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
36-
extends Actor with ActorLogReceive with Logging {
37-
38-
var masterActor: ActorSelection = _
39-
val timeout = AkkaUtils.askTimeout(conf)
40-
41-
override def preStart(): Unit = {
42-
masterActor = context.actorSelection(
43-
Master.toAkkaUrl(driverArgs.master, AkkaUtils.protocol(context.system)))
44-
45-
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
34+
private class ClientEndpoint(
35+
override val rpcEnv: RpcEnv,
36+
driverArgs: ClientArguments,
37+
masterEndpoint: RpcEndpointRef,
38+
conf: SparkConf)
39+
extends ThreadSafeRpcEndpoint with Logging {
40+
41+
private val forwardMessageThread = Utils.newDaemonFixedThreadPool(1, "client-forward-message")
42+
private implicit val forwardMessageExecutionContext =
43+
ExecutionContext.fromExecutor(forwardMessageThread,
44+
t => t match {
45+
case ie: InterruptedException => // Exit normally
46+
case e =>
47+
e.printStackTrace()
48+
System.exit(-1)
49+
})
4650

51+
override def onStart(): Unit = {
4752
println(s"Sending ${driverArgs.cmd} command to ${driverArgs.master}")
4853

4954
driverArgs.cmd match {
@@ -79,22 +84,36 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
7984
driverArgs.supervise,
8085
command)
8186

82-
masterActor ! RequestSubmitDriver(driverDescription)
87+
masterEndpoint.sendWithReply[SubmitDriverResponse](RequestSubmitDriver(driverDescription)).
88+
onComplete {
89+
case Success(v) => self.send(v)
90+
case Failure(e) =>
91+
println(s"Error sending messages to master ${driverArgs.master}, exiting.")
92+
e.printStackTrace()
93+
System.exit(-1)
94+
}
8395

8496
case "kill" =>
8597
val driverId = driverArgs.driverId
86-
masterActor ! RequestKillDriver(driverId)
98+
masterEndpoint.sendWithReply[KillDriverResponse](RequestKillDriver(driverId)).onComplete {
99+
case Success(v) => self.send(v)
100+
case Failure(e) =>
101+
println(s"Error sending messages to master ${driverArgs.master}, exiting.")
102+
e.printStackTrace()
103+
System.exit(-1)
104+
}
87105
}
88106
}
89107

90108
/* Find out driver status then exit the JVM */
91109
def pollAndReportStatus(driverId: String) {
110+
// Since ClientEndpoint is the only RpcEndpoint in the process, blocking the event loop thread
111+
// is fine.
92112
println("... waiting before polling master for driver state")
93113
Thread.sleep(5000)
94114
println("... polling master for driver state")
95-
val statusFuture = (masterActor ? RequestDriverStatus(driverId))(timeout)
96-
.mapTo[DriverStatusResponse]
97-
val statusResponse = Await.result(statusFuture, timeout)
115+
val statusResponse =
116+
masterEndpoint.askWithReply[DriverStatusResponse](RequestDriverStatus(driverId))
98117

99118
statusResponse.found match {
100119
case false =>
@@ -118,7 +137,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
118137
}
119138
}
120139

121-
override def receiveWithLogging: PartialFunction[Any, Unit] = {
140+
override def receive: PartialFunction[Any, Unit] = {
122141

123142
case SubmitDriverResponse(success, driverId, message) =>
124143
println(message)
@@ -128,14 +147,27 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
128147
println(message)
129148
if (success) pollAndReportStatus(driverId) else System.exit(-1)
130149

131-
case DisassociatedEvent(_, remoteAddress, _) =>
132-
println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
133-
System.exit(-1)
150+
}
151+
152+
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
153+
println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
154+
System.exit(-1)
155+
}
156+
157+
override def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = {
158+
println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
159+
cause.printStackTrace()
160+
System.exit(-1)
161+
}
162+
163+
override def onError(cause: Throwable): Unit = {
164+
println(s"Error processing messages, exiting.")
165+
cause.printStackTrace()
166+
System.exit(-1)
167+
}
134168

135-
case AssociationErrorEvent(cause, _, remoteAddress, _, _) =>
136-
println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
137-
println(s"Cause was: $cause")
138-
System.exit(-1)
169+
override def onStop(): Unit = {
170+
forwardMessageThread.shutdownNow()
139171
}
140172
}
141173

@@ -159,13 +191,14 @@ object Client {
159191
conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING"))
160192
Logger.getRootLogger.setLevel(driverArgs.logLevel)
161193

162-
val (actorSystem, _) = AkkaUtils.createActorSystem(
163-
"driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
194+
val rpcEnv =
195+
RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
164196

165-
// Verify driverArgs.master is a valid url so that we can use it in ClientActor safely
166-
Master.toAkkaUrl(driverArgs.master, AkkaUtils.protocol(actorSystem))
167-
actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))
197+
val masterAddress = RpcAddress.fromSparkURL(driverArgs.master)
198+
val masterEndpoint =
199+
rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)
200+
rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoint, conf))
168201

169-
actorSystem.awaitTermination()
202+
rpcEnv.awaitTermination()
170203
}
171204
}

core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,12 @@ import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
2424
import org.apache.spark.deploy.master.DriverState.DriverState
2525
import org.apache.spark.deploy.master.RecoveryState.MasterState
2626
import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
27+
import org.apache.spark.rpc.RpcEndpointRef
2728
import org.apache.spark.util.Utils
2829

2930
private[deploy] sealed trait DeployMessage extends Serializable
3031

31-
/** Contains messages sent between Scheduler actor nodes. */
32+
/** Contains messages sent between Scheduler endpoint nodes. */
3233
private[deploy] object DeployMessages {
3334

3435
// Worker to Master
@@ -37,6 +38,7 @@ private[deploy] object DeployMessages {
3738
id: String,
3839
host: String,
3940
port: Int,
41+
worker: RpcEndpointRef,
4042
cores: Int,
4143
memory: Int,
4244
webUiPort: Int,
@@ -63,11 +65,11 @@ private[deploy] object DeployMessages {
6365
case class WorkerSchedulerStateResponse(id: String, executors: List[ExecutorDescription],
6466
driverIds: Seq[String])
6567

66-
case class Heartbeat(workerId: String) extends DeployMessage
68+
case class Heartbeat(workerId: String, worker: RpcEndpointRef) extends DeployMessage
6769

6870
// Master to Worker
6971

70-
case class RegisteredWorker(masterUrl: String, masterWebUiUrl: String) extends DeployMessage
72+
case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: String) extends DeployMessage
7173

7274
case class RegisterWorkerFailed(message: String) extends DeployMessage
7375

@@ -92,13 +94,13 @@ private[deploy] object DeployMessages {
9294

9395
// Worker internal
9496

95-
case object WorkDirCleanup // Sent to Worker actor periodically for cleaning up app folders
97+
case object WorkDirCleanup // Sent to Worker endpoint periodically for cleaning up app folders
9698

9799
case object ReregisterWithMaster // used when a worker attempts to reconnect to a master
98100

99101
// AppClient to Master
100102

101-
case class RegisterApplication(appDescription: ApplicationDescription)
103+
case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)
102104
extends DeployMessage
103105

104106
case class UnregisterApplication(appId: String)
@@ -107,7 +109,7 @@ private[deploy] object DeployMessages {
107109

108110
// Master to AppClient
109111

110-
case class RegisteredApplication(appId: String, masterUrl: String) extends DeployMessage
112+
case class RegisteredApplication(appId: String, master: RpcEndpointRef) extends DeployMessage
111113

112114
// TODO(matei): replace hostPort with host
113115
case class ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) {
@@ -142,7 +144,7 @@ private[deploy] object DeployMessages {
142144

143145
// Master to Worker & AppClient
144146

145-
case class MasterChanged(masterUrl: String, masterWebUiUrl: String)
147+
case class MasterChanged(master: RpcEndpointRef, masterWebUiUrl: String)
146148

147149
// MasterWebUI To Master
148150

core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@ package org.apache.spark.deploy
1919

2020
import scala.collection.mutable.ArrayBuffer
2121

22-
import akka.actor.ActorSystem
23-
22+
import org.apache.spark.rpc.RpcEnv
2423
import org.apache.spark.{Logging, SparkConf}
2524
import org.apache.spark.deploy.worker.Worker
2625
import org.apache.spark.deploy.master.Master
@@ -41,8 +40,8 @@ class LocalSparkCluster(
4140
extends Logging {
4241

4342
private val localHostname = Utils.localHostName()
44-
private val masterActorSystems = ArrayBuffer[ActorSystem]()
45-
private val workerActorSystems = ArrayBuffer[ActorSystem]()
43+
private val masterRpcEnvs = ArrayBuffer[RpcEnv]()
44+
private val workerRpcEnvs = ArrayBuffer[RpcEnv]()
4645

4746
def start(): Array[String] = {
4847
logInfo("Starting a local Spark cluster with " + numWorkers + " workers.")
@@ -51,16 +50,16 @@ class LocalSparkCluster(
5150
val _conf = conf.clone().setIfMissing("spark.master.rest.enabled", "false")
5251

5352
/* Start the Master */
54-
val (masterSystem, masterPort, _, _) = Master.startSystemAndActor(localHostname, 0, 0, _conf)
55-
masterActorSystems += masterSystem
56-
val masterUrl = "spark://" + localHostname + ":" + masterPort
53+
val (rpcEnv, _, _) = Master.startRpcEnvAndEndpoint(localHostname, 0, 0, _conf)
54+
masterRpcEnvs += rpcEnv
55+
val masterUrl = "spark://" + localHostname + ":" + rpcEnv.address.port
5756
val masters = Array(masterUrl)
5857

5958
/* Start the Workers */
6059
for (workerNum <- 1 to numWorkers) {
61-
val (workerSystem, _) = Worker.startSystemAndActor(localHostname, 0, 0, coresPerWorker,
60+
val workerEnv = Worker.startRpcEnvAndEndpoint(localHostname, 0, 0, coresPerWorker,
6261
memoryPerWorker, masters, null, Some(workerNum), _conf)
63-
workerActorSystems += workerSystem
62+
workerRpcEnvs += workerEnv
6463
}
6564

6665
masters
@@ -71,11 +70,11 @@ class LocalSparkCluster(
7170
// Stop the workers before the master so they don't get upset that it disconnected
7271
// TODO: In Akka 2.1.x, ActorSystem.awaitTermination hangs when you have remote actors!
7372
// This is unfortunate, but for now we just comment it out.
74-
workerActorSystems.foreach(_.shutdown())
73+
workerRpcEnvs.foreach(_.shutdown())
7574
// workerActorSystems.foreach(_.awaitTermination())
76-
masterActorSystems.foreach(_.shutdown())
75+
masterRpcEnvs.foreach(_.shutdown())
7776
// masterActorSystems.foreach(_.awaitTermination())
78-
masterActorSystems.clear()
79-
workerActorSystems.clear()
77+
masterRpcEnvs.clear()
78+
workerRpcEnvs.clear()
8079
}
8180
}

0 commit comments

Comments
 (0)