Skip to content

Commit f98660b

Browse files
author
Andrew Or
committed
Version the protocol and include it in REST URL
Now the REST URLs look like this: http://host:port/v1/submissions/create http://host:port/v1/submissions/kill/driver_123 http://host:port/v1/submissions/status/driver_123
1 parent 721819f commit f98660b

File tree

2 files changed

+57
-13
lines changed

2 files changed

+57
-13
lines changed

core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,18 @@ import org.apache.spark.deploy.SparkSubmitArguments
3333
* currently used for cluster mode only.
3434
*
3535
* The specific request sent to the server depends on the action as follows:
36-
* (1) submit - POST to http://.../submissions/create
37-
* (2) kill - POST http://.../submissions/kill/[submissionId]
38-
* (3) status - GET http://.../submissions/status/[submissionId]
36+
* (1) submit - POST to /submissions/create
37+
* (2) kill - POST /submissions/kill/[submissionId]
38+
* (3) status - GET /submissions/status/[submissionId]
3939
*
4040
* In the case of (1), parameters are posted in the HTTP body in the form of JSON fields.
4141
* Otherwise, the URL fully specifies the intended action of the client.
42+
*
43+
* Additionally, the base URL includes the version of the protocol. For instance:
44+
* http://1.2.3.4:6066/v1/submissions/create. Since the protocol is expected to be stable
45+
* across Spark versions, existing fields cannot be added or removed. In the rare event that
46+
* backward compatibility is broken, Spark must introduce a new protocol version (e.g. v2).
47+
* The client and the server must communicate on the same version of the protocol.
4248
*/
4349
private[spark] class StandaloneRestClient extends Logging {
4450
import StandaloneRestClient._
@@ -147,20 +153,25 @@ private[spark] class StandaloneRestClient extends Logging {
147153

148154
/** Return the REST URL for creating a new submission. */
149155
private def getSubmitUrl(master: String): URL = {
150-
val baseUrl = master.stripPrefix("spark://")
151-
new URL(s"http://$baseUrl/submissions/create")
156+
val baseUrl = getBaseUrl(master)
157+
new URL(s"$baseUrl/submissions/create")
152158
}
153159

154160
/** Return the REST URL for killing an existing submission. */
155161
private def getKillUrl(master: String, submissionId: String): URL = {
156-
val baseUrl = master.stripPrefix("spark://")
157-
new URL(s"http://$baseUrl/submissions/kill/$submissionId")
162+
val baseUrl = getBaseUrl(master)
163+
new URL(s"$baseUrl/submissions/kill/$submissionId")
158164
}
159165

160166
/** Return the REST URL for requesting the status of an existing submission. */
161167
private def getStatusUrl(master: String, submissionId: String): URL = {
162-
val baseUrl = master.stripPrefix("spark://")
163-
new URL(s"http://$baseUrl/submissions/status/$submissionId")
168+
val baseUrl = getBaseUrl(master)
169+
new URL(s"$baseUrl/submissions/status/$submissionId")
170+
}
171+
172+
/** Return the base URL for communicating with the server, including the protocol version. */
173+
private def getBaseUrl(master: String): String = {
174+
"http://" + master.stripPrefix("spark://").stripSuffix("/") + "/" + PROTOCOL_VERSION
164175
}
165176

166177
/** Throw an exception if this is not standalone mode. */
@@ -261,4 +272,5 @@ private[spark] class StandaloneRestClient extends Logging {
261272
private object StandaloneRestClient {
262273
val REPORT_DRIVER_STATUS_INTERVAL = 1000
263274
val REPORT_DRIVER_STATUS_MAX_TRIES = 10
275+
val PROTOCOL_VERSION = "v1"
264276
}

core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ private[spark] class StandaloneRestServer(
4545
requestedPort: Int)
4646
extends Logging {
4747

48+
import StandaloneRestServer._
49+
4850
private var _server: Option[Server] = None
4951

5052
/** Start the server and return the bound port. */
@@ -67,11 +69,17 @@ private[spark] class StandaloneRestServer(
6769
val threadPool = new QueuedThreadPool
6870
threadPool.setDaemon(true)
6971
server.setThreadPool(threadPool)
72+
val pathPrefix = s"/$PROTOCOL_VERSION/submissions"
7073
val mainHandler = new ServletContextHandler
71-
mainHandler.setContextPath("/submissions")
72-
mainHandler.addServlet(new ServletHolder(new KillRequestServlet(master)), "/kill/*")
73-
mainHandler.addServlet(new ServletHolder(new StatusRequestServlet(master)), "/status/*")
74-
mainHandler.addServlet(new ServletHolder(new SubmitRequestServlet(master)), "/create")
74+
mainHandler.setContextPath("/")
75+
mainHandler.addServlet(
76+
new ServletHolder(new SubmitRequestServlet(master)), s"$pathPrefix/create")
77+
mainHandler.addServlet(
78+
new ServletHolder(new KillRequestServlet(master)), s"$pathPrefix/kill/*")
79+
mainHandler.addServlet(
80+
new ServletHolder(new StatusRequestServlet(master)), s"$pathPrefix/status/*")
81+
mainHandler.addServlet(
82+
new ServletHolder(new ErrorServlet), "/")
7583
server.setHandler(mainHandler)
7684
server.start()
7785
val boundPort = server.getConnectors()(0).getLocalPort
@@ -83,6 +91,10 @@ private[spark] class StandaloneRestServer(
8391
}
8492
}
8593

94+
private object StandaloneRestServer {
95+
val PROTOCOL_VERSION = StandaloneRestClient.PROTOCOL_VERSION
96+
}
97+
8698
/**
8799
* An abstract servlet for handling requests passed to the [[StandaloneRestServer]].
88100
*/
@@ -346,3 +358,23 @@ private[spark] class SubmitRequestServlet(master: Master) extends StandaloneRest
346358
appResource, actualDriverMemory, actualDriverCores, actualSuperviseDriver, command)
347359
}
348360
}
361+
362+
/**
363+
* A default servlet that handles error cases that are not captured by other servlets.
364+
*/
365+
private[spark] class ErrorServlet extends HttpServlet {
366+
private val expectedVersion = StandaloneRestServer.PROTOCOL_VERSION
367+
override def service(request: HttpServletRequest, response: HttpServletResponse): Unit = {
368+
val path = request.getPathInfo
369+
val parts = path.stripPrefix("/").split("/")
370+
if (parts.nonEmpty) {
371+
val version = parts.head
372+
if (version != expectedVersion) {
373+
response.sendError(800, s"Incompatible protocol version $version")
374+
return
375+
}
376+
}
377+
response.sendError(801,
378+
s"Unexpected path $path: Please submit requests through /$expectedVersion/submissions/")
379+
}
380+
}

0 commit comments

Comments
 (0)