Skip to content

Commit 9fee16f

Browse files
author
Andrew Or
committed
Include server protocol version on mismatch
So that a future implementation of client can retry with the server version.
1 parent 09f873a commit 9fee16f

File tree

4 files changed

+21
-13
lines changed

4 files changed

+21
-13
lines changed

core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -375,9 +375,9 @@ private[spark] class ErrorServlet extends StandaloneRestServlet {
375375
protected override def service(
376376
request: HttpServletRequest,
377377
response: HttpServletResponse): Unit = {
378-
response.setStatus(HttpServletResponse.SC_BAD_REQUEST)
379378
val path = request.getPathInfo
380379
val parts = path.stripPrefix("/").split("/").toSeq
380+
var versionMismatch = false
381381
var msg =
382382
parts match {
383383
case Nil =>
@@ -391,15 +391,22 @@ private[spark] class ErrorServlet extends StandaloneRestServlet {
391391
"Missing an action: please specify one of /create, /kill, or /status."
392392
case unknownVersion :: _ =>
393393
// http://host:port/unknown-version/*
394-
// Use a special response code in case the client wants to retry with a different version
395-
response.setStatus(StandaloneRestServer.SC_UNKNOWN_PROTOCOL_VERSION)
394+
versionMismatch = true
396395
s"Unknown protocol version '$unknownVersion'."
397396
case _ =>
398397
// never reached
399398
s"Malformed path $path."
400399
}
401400
msg += s" Please submit requests through http://[host]:[port]/$expectedVersion/submissions/..."
402401
val error = handleError(msg)
402+
// If there is a version mismatch, include the highest protocol version that
403+
// this server supports in case the client wants to retry with our version
404+
if (versionMismatch) {
405+
error.protocolVersion = expectedVersion
406+
response.setStatus(StandaloneRestServer.SC_UNKNOWN_PROTOCOL_VERSION)
407+
} else {
408+
response.setStatus(HttpServletResponse.SC_BAD_REQUEST)
409+
}
403410
sendResponse(error, response)
404411
}
405412
}

core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolMessage.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import scala.util.Try
2222
import com.fasterxml.jackson.annotation._
2323
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility
2424
import com.fasterxml.jackson.annotation.JsonInclude.Include
25-
import com.fasterxml.jackson.databind.{ObjectMapper, SerializationFeature}
25+
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper, SerializationFeature}
2626
import com.fasterxml.jackson.module.scala.DefaultScalaModule
2727
import org.json4s.JsonAST._
2828
import org.json4s.jackson.JsonMethods._
@@ -136,8 +136,9 @@ private[spark] abstract class SubmitRestProtocolMessage {
136136
private[spark] object SubmitRestProtocolMessage {
137137
private val packagePrefix = this.getClass.getPackage.getName
138138
private val mapper = new ObjectMapper()
139-
.registerModule(DefaultScalaModule)
139+
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
140140
.enable(SerializationFeature.INDENT_OUTPUT)
141+
.registerModule(DefaultScalaModule)
141142

142143
/**
143144
* Parse the value of the action field from the given JSON.

core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolResponse.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ private[spark] abstract class SubmitRestProtocolResponse extends SubmitRestProto
2626
protected override def doValidate(): Unit = {
2727
super.doValidate()
2828
assertFieldIsSet(serverSparkVersion, "serverSparkVersion")
29-
assertFieldIsSet(success, "success")
3029
assertFieldIsBoolean(success, "success")
3130
}
3231
}
@@ -36,6 +35,10 @@ private[spark] abstract class SubmitRestProtocolResponse extends SubmitRestProto
3635
*/
3736
private[spark] class CreateSubmissionResponse extends SubmitRestProtocolResponse {
3837
var submissionId: String = null
38+
protected override def doValidate(): Unit = {
39+
super.doValidate()
40+
assertFieldIsSet(success, "success")
41+
}
3942
}
4043

4144
/**
@@ -46,6 +49,7 @@ private[spark] class KillSubmissionResponse extends SubmitRestProtocolResponse {
4649
protected override def doValidate(): Unit = {
4750
super.doValidate()
4851
assertFieldIsSet(submissionId, "submissionId")
52+
assertFieldIsSet(success, "success")
4953
}
5054
}
5155

@@ -61,20 +65,17 @@ private[spark] class SubmissionStatusResponse extends SubmitRestProtocolResponse
6165
protected override def doValidate(): Unit = {
6266
super.doValidate()
6367
assertFieldIsSet(submissionId, "submissionId")
68+
assertFieldIsSet(success, "success")
6469
}
6570
}
6671

6772
/**
6873
* An error response message used in the REST application submission protocol.
6974
*/
7075
private[spark] class ErrorResponse extends SubmitRestProtocolResponse {
71-
72-
// request was unsuccessful
73-
success = "false"
74-
76+
var protocolVersion: String = null
7577
protected override def doValidate(): Unit = {
7678
super.doValidate()
7779
assertFieldIsSet(message, "message")
78-
assert(!success.toBoolean, s"The 'success' field must be false in $messageType.")
7980
}
8081
}

core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -327,8 +327,7 @@ class SubmitRestProtocolSuite extends FunSuite {
327327
|{
328328
| "action" : "ErrorResponse",
329329
| "message" : "Field not found in submit request: X",
330-
| "serverSparkVersion" : "1.2.3",
331-
| "success": "false"
330+
| "serverSparkVersion" : "1.2.3"
332331
|}
333332
""".stripMargin
334333

0 commit comments

Comments
 (0)