Skip to content

Commit 6ff088d

Browse files
author
Andrew Or
committed
Rename classes to generalize REST protocol
Previously the REST protocol was very explicitly tied to the standalone mode. This commit frees the protocol from this restriction.
1 parent af9d9cb commit 6ff088d

15 files changed

+466
-430
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,11 +75,10 @@ object SparkSubmit {
7575
}
7676

7777
// In standalone cluster mode, use the brand new REST client to submit the application
78-
val doingRest = appArgs.master.startsWith("spark://") && appArgs.deployMode == "cluster"
79-
if (doingRest) {
80-
println("Submitting driver through the REST interface.")
78+
val isStandaloneCluster =
79+
appArgs.master.startsWith("spark://") && appArgs.deployMode == "cluster"
80+
if (isStandaloneCluster) {
8181
new StandaloneRestClient().submitDriver(appArgs)
82-
println("Done submitting driver.")
8382
return
8483
}
8584

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,10 @@ private[spark] class Master(
122122
throw new SparkException("spark.deploy.defaultCores must be positive")
123123
}
124124

125-
val restServer = new StandaloneRestServer(this, host, 6677)
125+
// Alternative application submission gateway that is stable across Spark versions
126+
private val restServerPort = conf.getInt("spark.master.rest.port", 17077)
127+
private val restServer = new StandaloneRestServer(this, host, restServerPort)
128+
restServer.start()
126129

127130
override def preStart() {
128131
logInfo("Starting Spark master at " + masterUrl)

core/src/main/scala/org/apache/spark/deploy/rest/DriverStatusRequestMessage.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ package org.apache.spark.deploy.rest
2020
/**
2121
* A field used in a DriverStatusRequestMessage.
2222
*/
23-
private[spark] abstract class DriverStatusRequestField extends StandaloneRestProtocolField
24-
private[spark] object DriverStatusRequestField extends StandaloneRestProtocolFieldCompanion {
23+
private[spark] abstract class DriverStatusRequestField extends SubmitRestProtocolField
24+
private[spark] object DriverStatusRequestField extends SubmitRestProtocolFieldCompanion {
2525
case object ACTION extends DriverStatusRequestField
2626
case object SPARK_VERSION extends DriverStatusRequestField
2727
case object MESSAGE extends DriverStatusRequestField
@@ -32,16 +32,16 @@ private[spark] object DriverStatusRequestField extends StandaloneRestProtocolFie
3232
}
3333

3434
/**
35-
* A request sent to the standalone Master to query the status of a driver.
35+
* A request sent to the cluster manager to query the status of a driver.
3636
*/
37-
private[spark] class DriverStatusRequestMessage extends StandaloneRestProtocolMessage(
38-
StandaloneRestProtocolAction.DRIVER_STATUS_REQUEST,
37+
private[spark] class DriverStatusRequestMessage extends SubmitRestProtocolMessage(
38+
SubmitRestProtocolAction.DRIVER_STATUS_REQUEST,
3939
DriverStatusRequestField.ACTION,
4040
DriverStatusRequestField.requiredFields)
4141

42-
private[spark] object DriverStatusRequestMessage extends StandaloneRestProtocolMessageCompanion {
43-
protected override def newMessage(): StandaloneRestProtocolMessage =
42+
private[spark] object DriverStatusRequestMessage extends SubmitRestProtocolMessageCompanion {
43+
protected override def newMessage(): SubmitRestProtocolMessage =
4444
new DriverStatusRequestMessage
45-
protected override def fieldWithName(field: String): StandaloneRestProtocolField =
45+
protected override def fieldWithName(field: String): SubmitRestProtocolField =
4646
DriverStatusRequestField.withName(field)
4747
}

core/src/main/scala/org/apache/spark/deploy/rest/DriverStatusResponseMessage.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ package org.apache.spark.deploy.rest
2020
/**
2121
* A field used in a DriverStatusResponseMessage.
2222
*/
23-
private[spark] abstract class DriverStatusResponseField extends StandaloneRestProtocolField
24-
private[spark] object DriverStatusResponseField extends StandaloneRestProtocolFieldCompanion {
23+
private[spark] abstract class DriverStatusResponseField extends SubmitRestProtocolField
24+
private[spark] object DriverStatusResponseField extends SubmitRestProtocolFieldCompanion {
2525
case object ACTION extends DriverStatusResponseField
2626
case object SPARK_VERSION extends DriverStatusResponseField
2727
case object MESSAGE extends DriverStatusResponseField
@@ -36,16 +36,16 @@ private[spark] object DriverStatusResponseField extends StandaloneRestProtocolFi
3636
}
3737

3838
/**
39-
* A message sent from the standalone Master in response to a DriverStatusResponseMessage.
39+
* A message sent from the cluster manager in response to a DriverStatusResponseMessage.
4040
*/
41-
private[spark] class DriverStatusResponseMessage extends StandaloneRestProtocolMessage(
42-
StandaloneRestProtocolAction.DRIVER_STATUS_RESPONSE,
41+
private[spark] class DriverStatusResponseMessage extends SubmitRestProtocolMessage(
42+
SubmitRestProtocolAction.DRIVER_STATUS_RESPONSE,
4343
DriverStatusResponseField.ACTION,
4444
DriverStatusResponseField.requiredFields)
4545

46-
private[spark] object DriverStatusResponseMessage extends StandaloneRestProtocolMessageCompanion {
47-
protected override def newMessage(): StandaloneRestProtocolMessage =
46+
private[spark] object DriverStatusResponseMessage extends SubmitRestProtocolMessageCompanion {
47+
protected override def newMessage(): SubmitRestProtocolMessage =
4848
new DriverStatusResponseMessage
49-
protected override def fieldWithName(field: String): StandaloneRestProtocolField =
49+
protected override def fieldWithName(field: String): SubmitRestProtocolField =
5050
DriverStatusResponseField.withName(field)
5151
}

core/src/main/scala/org/apache/spark/deploy/rest/ErrorMessage.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ package org.apache.spark.deploy.rest
2020
/**
2121
* A field used in a ErrorMessage.
2222
*/
23-
private[spark] abstract class ErrorField extends StandaloneRestProtocolField
24-
private[spark] object ErrorField extends StandaloneRestProtocolFieldCompanion {
23+
private[spark] abstract class ErrorField extends SubmitRestProtocolField
24+
private[spark] object ErrorField extends SubmitRestProtocolFieldCompanion {
2525
case object ACTION extends ErrorField
2626
case object SPARK_VERSION extends ErrorField
2727
case object MESSAGE extends ErrorField
@@ -30,15 +30,15 @@ private[spark] object ErrorField extends StandaloneRestProtocolFieldCompanion {
3030
}
3131

3232
/**
33-
* An error message exchanged in the standalone REST protocol.
33+
* An error message exchanged in the stable application submission protocol.
3434
*/
35-
private[spark] class ErrorMessage extends StandaloneRestProtocolMessage(
36-
StandaloneRestProtocolAction.ERROR,
35+
private[spark] class ErrorMessage extends SubmitRestProtocolMessage(
36+
SubmitRestProtocolAction.ERROR,
3737
ErrorField.ACTION,
3838
ErrorField.requiredFields)
3939

40-
private[spark] object ErrorMessage extends StandaloneRestProtocolMessageCompanion {
41-
protected override def newMessage(): StandaloneRestProtocolMessage = new ErrorMessage
42-
protected override def fieldWithName(field: String): StandaloneRestProtocolField =
40+
private[spark] object ErrorMessage extends SubmitRestProtocolMessageCompanion {
41+
protected override def newMessage(): SubmitRestProtocolMessage = new ErrorMessage
42+
protected override def fieldWithName(field: String): SubmitRestProtocolField =
4343
ErrorField.withName(field)
4444
}

core/src/main/scala/org/apache/spark/deploy/rest/KillDriverRequestMessage.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ package org.apache.spark.deploy.rest
2020
/**
2121
* A field used in a KillDriverRequestMessage.
2222
*/
23-
private[spark] abstract class KillDriverRequestField extends StandaloneRestProtocolField
24-
private[spark] object KillDriverRequestField extends StandaloneRestProtocolFieldCompanion {
23+
private[spark] abstract class KillDriverRequestField extends SubmitRestProtocolField
24+
private[spark] object KillDriverRequestField extends SubmitRestProtocolFieldCompanion {
2525
case object ACTION extends KillDriverRequestField
2626
case object SPARK_VERSION extends KillDriverRequestField
2727
case object MESSAGE extends KillDriverRequestField
@@ -32,16 +32,16 @@ private[spark] object KillDriverRequestField extends StandaloneRestProtocolField
3232
}
3333

3434
/**
35-
* A request sent to the standalone Master to kill a driver.
35+
* A request sent to the cluster manager to kill a driver.
3636
*/
37-
private[spark] class KillDriverRequestMessage extends StandaloneRestProtocolMessage(
38-
StandaloneRestProtocolAction.KILL_DRIVER_REQUEST,
37+
private[spark] class KillDriverRequestMessage extends SubmitRestProtocolMessage(
38+
SubmitRestProtocolAction.KILL_DRIVER_REQUEST,
3939
KillDriverRequestField.ACTION,
4040
KillDriverRequestField.requiredFields)
4141

42-
private[spark] object KillDriverRequestMessage extends StandaloneRestProtocolMessageCompanion {
43-
protected override def newMessage(): StandaloneRestProtocolMessage =
42+
private[spark] object KillDriverRequestMessage extends SubmitRestProtocolMessageCompanion {
43+
protected override def newMessage(): SubmitRestProtocolMessage =
4444
new KillDriverRequestMessage
45-
protected override def fieldWithName(field: String): StandaloneRestProtocolField =
45+
protected override def fieldWithName(field: String): SubmitRestProtocolField =
4646
KillDriverRequestField.withName(field)
4747
}

core/src/main/scala/org/apache/spark/deploy/rest/KillDriverResponseMessage.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ package org.apache.spark.deploy.rest
2020
/**
2121
* A field used in a KillDriverResponseMessage.
2222
*/
23-
private[spark] abstract class KillDriverResponseField extends StandaloneRestProtocolField
24-
private[spark] object KillDriverResponseField extends StandaloneRestProtocolFieldCompanion {
23+
private[spark] abstract class KillDriverResponseField extends SubmitRestProtocolField
24+
private[spark] object KillDriverResponseField extends SubmitRestProtocolFieldCompanion {
2525
case object ACTION extends KillDriverResponseField
2626
case object SPARK_VERSION extends KillDriverResponseField
2727
case object MESSAGE extends KillDriverResponseField
@@ -33,16 +33,16 @@ private[spark] object KillDriverResponseField extends StandaloneRestProtocolFiel
3333
}
3434

3535
/**
36-
* A message sent from the standalone Master in response to a KillDriverResponseMessage.
36+
* A message sent from the cluster manager in response to a KillDriverResponseMessage.
3737
*/
38-
private[spark] class KillDriverResponseMessage extends StandaloneRestProtocolMessage(
39-
StandaloneRestProtocolAction.KILL_DRIVER_RESPONSE,
38+
private[spark] class KillDriverResponseMessage extends SubmitRestProtocolMessage(
39+
SubmitRestProtocolAction.KILL_DRIVER_RESPONSE,
4040
KillDriverResponseField.ACTION,
4141
KillDriverResponseField.requiredFields)
4242

43-
private[spark] object KillDriverResponseMessage extends StandaloneRestProtocolMessageCompanion {
44-
protected override def newMessage(): StandaloneRestProtocolMessage =
43+
private[spark] object KillDriverResponseMessage extends SubmitRestProtocolMessageCompanion {
44+
protected override def newMessage(): SubmitRestProtocolMessage =
4545
new KillDriverResponseMessage
46-
protected override def fieldWithName(field: String): StandaloneRestProtocolField =
46+
protected override def fieldWithName(field: String): SubmitRestProtocolField =
4747
KillDriverResponseField.withName(field)
4848
}

core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala

Lines changed: 17 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -17,52 +17,22 @@
1717

1818
package org.apache.spark.deploy.rest
1919

20-
import java.io.DataOutputStream
2120
import java.net.URL
22-
import java.net.HttpURLConnection
23-
24-
import scala.io.Source
25-
26-
import com.google.common.base.Charsets
2721

2822
import org.apache.spark.{SPARK_VERSION => sparkVersion}
2923
import org.apache.spark.deploy.SparkSubmitArguments
3024
import org.apache.spark.util.Utils
3125

3226
/**
33-
* A client that submits Spark applications using a stable REST protocol in standalone
34-
* cluster mode. This client is intended to communicate with the StandaloneRestServer.
27+
* A client that submits Spark applications to the standalone Master using a stable
28+
* REST protocol. This client is intended to communicate with the StandaloneRestServer,
29+
* and currently only used in cluster mode.
3530
*/
36-
private[spark] class StandaloneRestClient {
37-
38-
def submitDriver(args: SparkSubmitArguments): Unit = {
39-
validateSubmitArguments(args)
40-
val url = getHttpUrl(args.master)
41-
val request = constructSubmitRequest(args)
42-
val response = sendHttp(url, request)
43-
println(response.toJson)
44-
}
45-
46-
def killDriver(master: String, driverId: String): Unit = {
47-
validateMaster(master)
48-
val url = getHttpUrl(master)
49-
val request = constructKillRequest(master, driverId)
50-
val response = sendHttp(url, request)
51-
println(response.toJson)
52-
}
53-
54-
def requestDriverStatus(master: String, driverId: String): Unit = {
55-
validateMaster(master)
56-
val url = getHttpUrl(master)
57-
val request = constructStatusRequest(master, driverId)
58-
val response = sendHttp(url, request)
59-
println(response.toJson)
60-
}
31+
private[spark] class StandaloneRestClient extends SubmitRestClient {
6132

62-
/**
63-
* Construct a submit driver request message.
64-
*/
65-
private def constructSubmitRequest(args: SparkSubmitArguments): SubmitDriverRequestMessage = {
33+
/** Construct a submit driver request message. */
34+
override protected def constructSubmitRequest(
35+
args: SparkSubmitArguments): SubmitDriverRequestMessage = {
6636
import SubmitDriverRequestField._
6737
val driverMemory = Option(args.driverMemory)
6838
.map { m => Utils.memoryStringToMb(m).toString }
@@ -78,7 +48,6 @@ private[spark] class StandaloneRestClient {
7848
.setFieldIfNotNull(MAIN_CLASS, args.mainClass)
7949
.setFieldIfNotNull(JARS, args.jars)
8050
.setFieldIfNotNull(FILES, args.files)
81-
.setFieldIfNotNull(PY_FILES, args.pyFiles)
8251
.setFieldIfNotNull(DRIVER_MEMORY, driverMemory)
8352
.setFieldIfNotNull(DRIVER_CORES, args.driverCores)
8453
.setFieldIfNotNull(DRIVER_EXTRA_JAVA_OPTIONS, args.driverExtraJavaOptions)
@@ -97,10 +66,8 @@ private[spark] class StandaloneRestClient {
9766
message.validate()
9867
}
9968

100-
/**
101-
* Construct a kill driver request message.
102-
*/
103-
private def constructKillRequest(
69+
/** Construct a kill driver request message. */
70+
override protected def constructKillRequest(
10471
master: String,
10572
driverId: String): KillDriverRequestMessage = {
10673
import KillDriverRequestField._
@@ -111,10 +78,8 @@ private[spark] class StandaloneRestClient {
11178
.validate()
11279
}
11380

114-
/**
115-
* Construct a driver status request message.
116-
*/
117-
private def constructStatusRequest(
81+
/** Construct a driver status request message. */
82+
override protected def constructStatusRequest(
11883
master: String,
11984
driverId: String): DriverStatusRequestMessage = {
12085
import DriverStatusRequestField._
@@ -125,67 +90,23 @@ private[spark] class StandaloneRestClient {
12590
.validate()
12691
}
12792

128-
/**
129-
* Send the provided request in an HTTP message to the given URL.
130-
* Return the response received from the REST server.
131-
*/
132-
private def sendHttp(
133-
url: URL,
134-
request: StandaloneRestProtocolMessage): StandaloneRestProtocolMessage = {
135-
val conn = url.openConnection().asInstanceOf[HttpURLConnection]
136-
conn.setRequestMethod("POST")
137-
conn.setRequestProperty("Content-Type", "application/json")
138-
conn.setRequestProperty("charset", "utf-8")
139-
conn.setDoOutput(true)
140-
println("Sending this JSON blob to server:\n" + request.toJson)
141-
val content = request.toJson.getBytes(Charsets.UTF_8)
142-
val out = new DataOutputStream(conn.getOutputStream)
143-
out.write(content)
144-
out.close()
145-
val response = Source.fromInputStream(conn.getInputStream).mkString
146-
StandaloneRestProtocolMessage.fromJson(response)
147-
}
148-
149-
/**
150-
* Throw an exception if this is not standalone cluster mode.
151-
*/
152-
private def validateSubmitArguments(args: SparkSubmitArguments): Unit = {
153-
validateMaster(args.master)
154-
validateDeployMode(args.deployMode)
155-
}
156-
157-
/**
158-
* Throw an exception if this is not standalone mode.
159-
*/
160-
private def validateMaster(master: String): Unit = {
93+
/** Throw an exception if this is not standalone mode. */
94+
override protected def validateMaster(master: String): Unit = {
16195
if (!master.startsWith("spark://")) {
16296
throw new IllegalArgumentException("This REST client is only supported in standalone mode.")
16397
}
16498
}
16599

166-
/**
167-
* Throw an exception if this is not cluster deploy mode.
168-
*/
169-
private def validateDeployMode(deployMode: String): Unit = {
100+
/** Throw an exception if this is not cluster deploy mode. */
101+
override protected def validateDeployMode(deployMode: String): Unit = {
170102
if (deployMode != "cluster") {
171103
throw new IllegalArgumentException("This REST client is only supported in cluster mode.")
172104
}
173105
}
174106

175-
/**
176-
* Extract the URL portion of the master address.
177-
*/
178-
private def getHttpUrl(master: String): URL = {
107+
/** Extract the URL portion of the master address. */
108+
override protected def getHttpUrl(master: String): URL = {
179109
validateMaster(master)
180110
new URL("http://" + master.stripPrefix("spark://"))
181111
}
182112
}
183-
184-
object StandaloneRestClient {
185-
def main(args: Array[String]): Unit = {
186-
assert(args.length > 0)
187-
//val client = new StandaloneRestClient
188-
//client.submitDriver("spark://" + args(0))
189-
println("Done.")
190-
}
191-
}

0 commit comments

Comments
 (0)