Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
d383efb
[SPARK-20177]Document about compression way has some little detail ch…
Mar 31, 2017
3059013
[SPARK-20177] event log add a space
Apr 1, 2017
555cef8
'/applications/[app-id]/jobs' in rest api,status should be [running|s…
Apr 2, 2017
46bb1ad
Merge branch 'master' of https://github.com/apache/spark into SPARK-2…
Apr 5, 2017
0efb0dd
[SPARK-20218]'/applications/[app-id]/stages' in REST API,add descript…
Apr 5, 2017
0e37fde
[SPARK-20218] '/applications/[app-id]/stages/[stage-id]' in REST API,…
Apr 5, 2017
52641bb
Merge branch 'SPARK-20218'
Apr 7, 2017
d3977c9
Merge branch 'master' of https://github.com/apache/spark
Apr 8, 2017
137b90e
Merge branch 'master' of https://github.com/apache/spark
Apr 10, 2017
0fe5865
Merge branch 'SPARK-20190' of https://github.com/guoxiaolongzte/spark
Apr 10, 2017
cf6f42a
Merge branch 'master' of https://github.com/apache/spark
Apr 10, 2017
685cd6b
Merge branch 'master' of https://github.com/apache/spark
Apr 14, 2017
c716a92
Merge branch 'master' of https://github.com/apache/spark
Apr 17, 2017
679cec3
Merge branch 'master' of https://github.com/apache/spark
Apr 19, 2017
3c9387a
Merge branch 'master' of https://github.com/apache/spark
Apr 19, 2017
cb71f44
Merge branch 'master' of https://github.com/apache/spark
Apr 20, 2017
ce92a74
Merge branch 'master' of https://github.com/apache/spark
Apr 21, 2017
dd64342
Merge branch 'master' of https://github.com/apache/spark
Apr 21, 2017
06cfd9a
[SPARK-20428][CORE][REST]REST interface about 'v1/submissions/ kill/'…
Apr 21, 2017
fa34cc9
[SPARK-20428]Add the batch to remove the interface.
May 3, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -206,23 +206,43 @@ private[rest] abstract class RestServlet extends HttpServlet with Logging {
* A servlet for handling kill requests passed to the [[RestSubmissionServer]].
*/
private[rest] abstract class KillRequestServlet extends RestServlet {
private val BATCH_DELETE = "batch"
private val SUBMISSION_IDS = "submissionIds"

/**
* If a submission ID is specified in the URL, have the Master kill the corresponding
* driver and return an appropriate response to the client. Otherwise, return error.
* 1.If a submission ID is specified in the URL, have the Master kill the corresponding
* driver and return an appropriate response to the client,
* For e.g., URL:/v1/submissions/kill/${a submission ID}.
* 2.if batch is specified in the URL, have the Master kill the corresponding
* drivers and return an appropriate response to the client.
* For e.g., URL:/v1/submissions/kill/batch
* 3.Otherwise, return error.
*/
protected override def doPost(
request: HttpServletRequest,
response: HttpServletResponse): Unit = {
val submissionId = parseSubmissionId(request.getPathInfo)
val responseMessage = submissionId.map(handleKill).getOrElse {
val submissionIds : Option[String] = {
if (BATCH_DELETE == submissionId.getOrElse("")) {
val requestMessageJson = Source.fromInputStream(request.getInputStream).mkString
val value: Option[String] = parse(requestMessageJson) match {
case JObject(fields) =>
fields.collectFirst { case (SUBMISSION_IDS, v) => v }.collect { case JString(s) => s }
case _ => None
}
value
} else {
submissionId
}
}
val responseMessage = submissionIds.map(handleKill).getOrElse {
response.setStatus(HttpServletResponse.SC_BAD_REQUEST)
handleError("Submission ID is missing in kill request.")
}
sendResponse(responseMessage, response)
}

protected def handleKill(submissionId: String): KillSubmissionResponse
protected def handleKill(submissionIds: String): KillSubmissionResponse
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,24 @@ private[deploy] class StandaloneRestServer(
private[rest] class StandaloneKillRequestServlet(masterEndpoint: RpcEndpointRef, conf: SparkConf)
extends KillRequestServlet {

protected def handleKill(submissionId: String): KillSubmissionResponse = {
val response = masterEndpoint.askSync[DeployMessages.KillDriverResponse](
DeployMessages.RequestKillDriver(submissionId))
protected def handleKill(submissionIds: String): KillSubmissionResponse = {
val k = new KillSubmissionResponse
k.serverSparkVersion = sparkVersion
k.message = response.message
k.submissionId = submissionId
Some(submissionIds).getOrElse("").split(",").map(submissionId => {
val response = masterEndpoint.askSync[DeployMessages.KillDriverResponse] (
DeployMessages.RequestKillDriver(submissionId))
if(k.submissionId != null && k.submissionId.nonEmpty) {
k.submissionId = k.submissionId.concat(",").concat(submissionId)
} else {
k.submissionId = submissionId
}
if(k.message != null && k.message.nonEmpty) {
k.message = k.message.concat(",").concat(response.message)
} else {
k.message = response.message
}
k.success = response.success
})
k.serverSparkVersion = sparkVersion
k
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -607,8 +607,8 @@ private class FaultyStandaloneRestServer(

/** A faulty servlet that produces invalid responses. */
class InvalidKillServlet extends StandaloneKillRequestServlet(masterEndpoint, masterConf) {
protected override def handleKill(submissionId: String): KillSubmissionResponse = {
val k = super.handleKill(submissionId)
protected override def handleKill(submissionIds: String): KillSubmissionResponse = {
val k = super.handleKill(submissionIds)
k.submissionId = null
k
}
Expand Down