diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala index b30c980e95a9a..524f53729a9e5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala @@ -206,23 +206,43 @@ private[rest] abstract class RestServlet extends HttpServlet with Logging { * A servlet for handling kill requests passed to the [[RestSubmissionServer]]. */ private[rest] abstract class KillRequestServlet extends RestServlet { + private val BATCH_DELETE = "batch" + private val SUBMISSION_IDS = "submissionIds" /** - * If a submission ID is specified in the URL, have the Master kill the corresponding - * driver and return an appropriate response to the client. Otherwise, return error. + * 1.If a submission ID is specified in the URL, have the Master kill the corresponding + * driver and return an appropriate response to the client, + * For e.g., URL:/v1/submissions/kill/${a submission ID}. + * 2.if batch is specified in the URL, have the Master kill the corresponding + * drivers and return an appropriate response to the client. + * For e.g., URL:/v1/submissions/kill/batch + * 3.Otherwise, return error. */ protected override def doPost( request: HttpServletRequest, response: HttpServletResponse): Unit = { val submissionId = parseSubmissionId(request.getPathInfo) - val responseMessage = submissionId.map(handleKill).getOrElse { + val submissionIds : Option[String] = { + if (BATCH_DELETE == submissionId.getOrElse("")) { + val requestMessageJson = Source.fromInputStream(request.getInputStream).mkString + val value: Option[String] = parse(requestMessageJson) match { + case JObject(fields) => + fields.collectFirst { case (SUBMISSION_IDS, v) => v }.collect { case JString(s) => s } + case _ => None + } + value + } else { + submissionId + } + } + val responseMessage = submissionIds.map(handleKill).getOrElse { response.setStatus(HttpServletResponse.SC_BAD_REQUEST) handleError("Submission ID is missing in kill request.") } sendResponse(responseMessage, response) } - protected def handleKill(submissionId: String): KillSubmissionResponse + protected def handleKill(submissionIds: String): KillSubmissionResponse } /** diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala index 56620064c57fa..26c8dfc5b8dbc 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala @@ -70,14 +70,24 @@ private[deploy] class StandaloneRestServer( private[rest] class StandaloneKillRequestServlet(masterEndpoint: RpcEndpointRef, conf: SparkConf) extends KillRequestServlet { - protected def handleKill(submissionId: String): KillSubmissionResponse = { - val response = masterEndpoint.askSync[DeployMessages.KillDriverResponse]( - DeployMessages.RequestKillDriver(submissionId)) + protected def handleKill(submissionIds: String): KillSubmissionResponse = { val k = new KillSubmissionResponse - k.serverSparkVersion = sparkVersion - k.message = response.message - k.submissionId = submissionId + Some(submissionIds).getOrElse("").split(",").map(submissionId => { + val response = masterEndpoint.askSync[DeployMessages.KillDriverResponse] ( + DeployMessages.RequestKillDriver(submissionId)) + if(k.submissionId != null && k.submissionId.nonEmpty) { + k.submissionId = k.submissionId.concat(",").concat(submissionId) + } else { + k.submissionId = submissionId + } + if(k.message != null && k.message.nonEmpty) { + k.message = k.message.concat(",").concat(response.message) + } else { + k.message = response.message + } k.success = response.success + }) + k.serverSparkVersion = sparkVersion k } } diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala index dd50e33da30ac..0d0f4e9ed2065 100644 --- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala @@ -607,8 +607,8 @@ private class FaultyStandaloneRestServer( /** A faulty servlet that produces invalid responses. */ class InvalidKillServlet extends StandaloneKillRequestServlet(masterEndpoint, masterConf) { - protected override def handleKill(submissionId: String): KillSubmissionResponse = { - val k = super.handleKill(submissionId) + protected override def handleKill(submissionIds: String): KillSubmissionResponse = { + val k = super.handleKill(submissionIds) k.submissionId = null k }