Skip to content

Commit a2d2ec8

Browse files
SPARK-11402: Use ChildRunnerProvider to create ExecutorRunner and DriverRunner
Abstracted ExecutorRunner and DriverRunner. The current implementations were renamed to ExecutorRunnerImpl and DriverRunnerImpl respectively. Added a way to provide a custom implemnetation of the runners by defining their factories.
1 parent f79ebf2 commit a2d2ec8

17 files changed

+524
-261
lines changed

core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ private[spark] class ApplicationDescription(
2929
// short name of compression codec used when writing event logs, if any (e.g. lzf)
3030
val eventLogCodec: Option[String] = None,
3131
val coresPerExecutor: Option[Int] = None)
32-
extends Serializable {
32+
extends Description with Serializable {
3333

3434
val user = System.getProperty("user.name", "<unknown>")
3535

core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.spark.deploy.ExecutorState.ExecutorState
2323
import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
2424
import org.apache.spark.deploy.master.DriverState.DriverState
2525
import org.apache.spark.deploy.master.RecoveryState.MasterState
26-
import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
26+
import org.apache.spark.deploy.worker.{DriverRunnerInfo, ExecutorRunnerInfo}
2727
import org.apache.spark.rpc.RpcEndpointRef
2828
import org.apache.spark.util.Utils
2929

@@ -188,8 +188,8 @@ private[deploy] object DeployMessages {
188188
// Worker to WorkerWebUI
189189

190190
case class WorkerStateResponse(host: String, port: Int, workerId: String,
191-
executors: List[ExecutorRunner], finishedExecutors: List[ExecutorRunner],
192-
drivers: List[DriverRunner], finishedDrivers: List[DriverRunner], masterUrl: String,
191+
executors: List[ExecutorRunnerInfo], finishedExecutors: List[ExecutorRunnerInfo],
192+
drivers: List[DriverRunnerInfo], finishedDrivers: List[DriverRunnerInfo], masterUrl: String,
193193
cores: Int, memory: Int, coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String) {
194194

195195
Utils.checkHost(host, "Required hostname")
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy
19+
20+
trait Description {
21+
def command: Command
22+
}

core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,12 @@
1717

1818
package org.apache.spark.deploy
1919

20-
private[deploy] class DriverDescription(
21-
val jarUrl: String,
22-
val mem: Int,
23-
val cores: Int,
24-
val supervise: Boolean,
25-
val command: Command)
26-
extends Serializable {
27-
28-
def copy(
29-
jarUrl: String = jarUrl,
30-
mem: Int = mem,
31-
cores: Int = cores,
32-
supervise: Boolean = supervise,
33-
command: Command = command): DriverDescription =
34-
new DriverDescription(jarUrl, mem, cores, supervise, command)
20+
private[deploy] case class DriverDescription(
21+
jarUrl: String,
22+
mem: Int,
23+
cores: Int,
24+
supervise: Boolean,
25+
command: Command) extends Description {
3526

3627
override def toString: String = s"DriverDescription (${command.mainClass})"
3728
}

core/src/main/scala/org/apache/spark/deploy/ExecutorDescription.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,11 @@ package org.apache.spark.deploy
2222
* This state is sufficient for the Master to reconstruct its internal data structures during
2323
* failover.
2424
*/
25-
private[deploy] class ExecutorDescription(
26-
val appId: String,
27-
val execId: Int,
28-
val cores: Int,
29-
val state: ExecutorState.Value)
30-
extends Serializable {
25+
private[deploy] case class ExecutorDescription(
26+
appId: String,
27+
execId: Int,
28+
cores: Int,
29+
state: ExecutorState.Value) {
3130

3231
override def toString: String =
3332
"ExecutorState(appId=%s, execId=%d, cores=%d, state=%s)".format(appId, execId, cores, state)

core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import org.json4s.JsonDSL._
2222

2323
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
2424
import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
25-
import org.apache.spark.deploy.worker.ExecutorRunner
25+
import org.apache.spark.deploy.worker.ExecutorRunnerInfo
2626

2727
private[deploy] object JsonProtocol {
2828
def writeWorkerInfo(obj: WorkerInfo): JObject = {
@@ -60,11 +60,11 @@ private[deploy] object JsonProtocol {
6060
("command" -> obj.command.toString)
6161
}
6262

63-
def writeExecutorRunner(obj: ExecutorRunner): JObject = {
64-
("id" -> obj.execId) ~
65-
("memory" -> obj.memory) ~
63+
def writeExecutorRunner(obj: ExecutorRunnerInfo): JObject = {
64+
("id" -> obj.setup.id) ~
65+
("memory" -> obj.setup.memory) ~
6666
("appid" -> obj.appId) ~
67-
("appdesc" -> writeApplicationDescription(obj.appDesc))
67+
("appdesc" -> writeApplicationDescription(obj.setup.description))
6868
}
6969

7070
def writeDriverInfo(obj: DriverInfo): JObject = {

core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ class LocalSparkCluster(
6262

6363
/* Start the Workers */
6464
for (workerNum <- 1 to numWorkers) {
65-
val workerEnv = Worker.startRpcEnvAndEndpoint(localHostname, 0, 0, coresPerWorker,
65+
val (workerEnv, _) = Worker.startRpcEnvAndEndpoint(localHostname, 0, 0, coresPerWorker,
6666
memoryPerWorker, masters, null, Some(workerNum), _conf)
6767
workerRpcEnvs += workerEnv
6868
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.worker
19+
20+
import org.apache.spark.deploy.{DriverDescription, Description, ApplicationDescription, ExecutorState}
21+
22+
private[deploy] trait ChildRunnerFactory[D <: Description, T <: ChildRunnerInfo[D]] {
23+
def createRunner(
24+
appId: Option[String],
25+
processSetup: ChildProcessCommonSetup[D],
26+
workerSetup: WorkerSetup,
27+
stateChangeListener: StateChangeListener[D, T],
28+
localDirs: Seq[String]): ChildProcessRunner[D, T]
29+
}
30+
31+
private[deploy] object DriverRunnerFactoryImpl
32+
extends ChildRunnerFactory[DriverDescription, DriverRunnerInfo] {
33+
34+
override def createRunner(
35+
appId: Option[String],
36+
processSetup: ChildProcessCommonSetup[DriverDescription],
37+
workerSetup: WorkerSetup,
38+
stateChangeListener: StateChangeListener[DriverDescription, DriverRunnerInfo],
39+
localDirs: Seq[String]): DriverRunner = {
40+
41+
val manager = new DriverRunnerImpl(
42+
processSetup,
43+
workerSetup,
44+
stateChangeListener)
45+
46+
manager
47+
}
48+
}
49+
50+
private[deploy] object ExecutorRunnerFactoryImpl
51+
extends ChildRunnerFactory[ApplicationDescription, ExecutorRunnerInfo] {
52+
53+
override def createRunner(
54+
appId: Option[String],
55+
processSetup: ChildProcessCommonSetup[ApplicationDescription],
56+
workerSetup: WorkerSetup,
57+
stateChangeListener:
58+
StateChangeListener[ApplicationDescription, ExecutorRunnerInfo],
59+
localDirs: Seq[String]): ExecutorRunner = {
60+
61+
val manager = new ExecutorRunnerImpl(
62+
processSetup,
63+
workerSetup,
64+
stateChangeListener,
65+
appId.get,
66+
localDirs,
67+
ExecutorState.LOADING)
68+
69+
manager
70+
}
71+
72+
}

core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala renamed to core/src/main/scala/org/apache/spark/deploy/worker/DriverRunnerImpl.scala

Lines changed: 37 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -24,29 +24,28 @@ import scala.collection.JavaConverters._
2424
import com.google.common.base.Charsets.UTF_8
2525
import com.google.common.io.Files
2626
import org.apache.hadoop.fs.Path
27+
import org.apache.spark.Logging
2728

28-
import org.apache.spark.{Logging, SparkConf, SecurityManager}
2929
import org.apache.spark.deploy.{DriverDescription, SparkHadoopUtil}
30-
import org.apache.spark.deploy.DeployMessages.DriverStateChanged
3130
import org.apache.spark.deploy.master.DriverState
3231
import org.apache.spark.deploy.master.DriverState.DriverState
33-
import org.apache.spark.rpc.RpcEndpointRef
34-
import org.apache.spark.util.{Utils, Clock, SystemClock}
32+
import org.apache.spark.util.{Clock, SystemClock, Utils}
3533

3634
/**
3735
* Manages the execution of one driver, including automatically restarting the driver on failure.
3836
* This is currently only used in standalone cluster deploy mode.
3937
*/
40-
private[deploy] class DriverRunner(
41-
conf: SparkConf,
42-
val driverId: String,
43-
val workDir: File,
44-
val sparkHome: File,
45-
val driverDesc: DriverDescription,
46-
val worker: RpcEndpointRef,
47-
val workerUrl: String,
48-
val securityManager: SecurityManager)
49-
extends Logging {
38+
private[deploy] class DriverRunnerImpl(
39+
processSetup: ChildProcessCommonSetup[DriverDescription],
40+
workerSetup: WorkerSetup,
41+
stateChangeListener: StateChangeListener[DriverDescription, DriverRunnerInfo])
42+
extends ChildProcessRunner[DriverDescription, DriverRunnerInfo]
43+
with DriverRunnerInfo with Logging { self =>
44+
45+
override def state: DriverState = finalState.getOrElse(DriverState.RUNNING)
46+
override def info: DriverRunnerImpl = this
47+
override def exception: Option[Exception] = finalException
48+
override def setup: ChildProcessCommonSetup[DriverDescription] = processSetup
5049

5150
@volatile private var process: Option[Process] = None
5251
@volatile private var killed = false
@@ -71,23 +70,26 @@ private[deploy] class DriverRunner(
7170
}
7271

7372
/** Starts a thread to run and manage the driver. */
74-
private[worker] def start() = {
75-
new Thread("DriverRunner for " + driverId) {
73+
def start(): Unit = {
74+
new Thread("DriverRunner for " + processSetup.id) {
7675
override def run() {
7776
try {
78-
val driverDir = createWorkingDirectory()
79-
val localJarFilename = downloadUserJar(driverDir)
77+
val localJarFilename = downloadUserJar(processSetup.workDir)
8078

8179
def substituteVariables(argument: String): String = argument match {
82-
case "{{WORKER_URL}}" => workerUrl
80+
case "{{WORKER_URL}}" => workerSetup.workerUri
8381
case "{{USER_JAR}}" => localJarFilename
8482
case other => other
8583
}
8684

8785
// TODO: If we add ability to submit multiple jars they should also be added here
88-
val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager,
89-
driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)
90-
launchDriver(builder, driverDir, driverDesc.supervise)
86+
val builder = CommandUtils.buildProcessBuilder(
87+
processSetup.description.command,
88+
workerSetup.securityManager,
89+
processSetup.memory,
90+
workerSetup.sparkHome.getAbsolutePath,
91+
substituteVariables)
92+
launchDriver(builder, processSetup.workDir, processSetup.description.supervise)
9193
}
9294
catch {
9395
case e: Exception => finalException = Some(e)
@@ -107,39 +109,28 @@ private[deploy] class DriverRunner(
107109

108110
finalState = Some(state)
109111

110-
worker.send(DriverStateChanged(driverId, state, finalException))
112+
stateChangeListener(self, None,
113+
finalException orElse finalExitCode.filter(_ != 0).map(new NonZeroExitCodeException(_)))
111114
}
112115
}.start()
113116
}
114117

115118
/** Terminate this driver (or prevent it from ever starting if not yet started) */
116-
private[worker] def kill() {
119+
def kill(): Unit = {
117120
synchronized {
118121
process.foreach(p => p.destroy())
119122
killed = true
120123
}
121124
}
122125

123-
/**
124-
* Creates the working directory for this driver.
125-
* Will throw an exception if there are errors preparing the directory.
126-
*/
127-
private def createWorkingDirectory(): File = {
128-
val driverDir = new File(workDir, driverId)
129-
if (!driverDir.exists() && !driverDir.mkdirs()) {
130-
throw new IOException("Failed to create directory " + driverDir)
131-
}
132-
driverDir
133-
}
134-
135126
/**
136127
* Download the user jar into the supplied directory and return its local path.
137128
* Will throw an exception if there are errors downloading the jar.
138129
*/
139130
private def downloadUserJar(driverDir: File): String = {
140-
val jarPath = new Path(driverDesc.jarUrl)
131+
val jarPath = new Path(processSetup.description.jarUrl)
141132

142-
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
133+
val hadoopConf = SparkHadoopUtil.get.newConfiguration(workerSetup.conf)
143134
val destPath = new File(driverDir.getAbsolutePath, jarPath.getName)
144135
val jarFileName = jarPath.getName
145136
val localJarFile = new File(driverDir, jarFileName)
@@ -148,10 +139,10 @@ private[deploy] class DriverRunner(
148139
if (!localJarFile.exists()) { // May already exist if running multiple workers on one node
149140
logInfo(s"Copying user jar $jarPath to $destPath")
150141
Utils.fetchFile(
151-
driverDesc.jarUrl,
142+
processSetup.description.jarUrl,
152143
driverDir,
153-
conf,
154-
securityManager,
144+
workerSetup.conf,
145+
workerSetup.securityManager,
155146
hadoopConf,
156147
System.currentTimeMillis(),
157148
useCache = false)
@@ -180,7 +171,7 @@ private[deploy] class DriverRunner(
180171
runCommandWithRetry(ProcessBuilderLike(builder), initialize, supervise)
181172
}
182173

183-
def runCommandWithRetry(
174+
private[spark] def runCommandWithRetry(
184175
command: ProcessBuilderLike, initialize: Process => Unit, supervise: Boolean): Unit = {
185176
// Time to wait between submission retries.
186177
var waitSeconds = 1
@@ -232,3 +223,7 @@ private[deploy] object ProcessBuilderLike {
232223
override def command: Seq[String] = processBuilder.command().asScala
233224
}
234225
}
226+
227+
private[deploy] trait DriverRunnerInfo extends ChildRunnerInfo[DriverDescription] {
228+
def state: DriverState
229+
}

0 commit comments

Comments
 (0)