From d383efba12c66addb17006dea107bb0421d50bc3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=83=AD=E5=B0=8F=E9=BE=99=2010207633?= Date: Fri, 31 Mar 2017 21:57:09 +0800 Subject: [PATCH 1/7] [SPARK-20177]Document about compression way has some little detail changes. --- docs/configuration.md | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index a9753925407d7..156997b539e65 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -639,6 +639,7 @@ Apart from these, the following properties are also available, and may be useful false Whether to compress logged events, if spark.eventLog.enabled is true. + Compression will use spark.io.compression.codec. @@ -773,14 +774,15 @@ Apart from these, the following properties are also available, and may be useful true Whether to compress broadcast variables before sending them. Generally a good idea. + Compression will use spark.io.compression.codec. spark.io.compression.codec lz4 - The codec used to compress internal data such as RDD partitions, broadcast variables and - shuffle outputs. By default, Spark provides three codecs: lz4, lzf, + The codec used to compress internal data such as RDD partitions,event log, broadcast variables + and shuffle outputs. By default, Spark provides three codecs: lz4, lzf, and snappy. You can also use fully qualified class names to specify the codec, e.g. org.apache.spark.io.LZ4CompressionCodec, @@ -881,6 +883,7 @@ Apart from these, the following properties are also available, and may be useful StorageLevel.MEMORY_ONLY_SER in Java and Scala or StorageLevel.MEMORY_ONLY in Python). Can save substantial space at the cost of some extra CPU time. + Compression will use spark.io.compression.codec. From 3059013e9d2aec76def14eb314b6761bea0e7ca0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=83=AD=E5=B0=8F=E9=BE=99=2010207633?= Date: Sat, 1 Apr 2017 09:38:02 +0800 Subject: [PATCH 2/7] [SPARK-20177] event log add a space --- docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index 156997b539e65..2687f542b8bd3 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -781,7 +781,7 @@ Apart from these, the following properties are also available, and may be useful spark.io.compression.codec lz4 - The codec used to compress internal data such as RDD partitions,event log, broadcast variables + The codec used to compress internal data such as RDD partitions, event log, broadcast variables and shuffle outputs. By default, Spark provides three codecs: lz4, lzf, and snappy. You can also use fully qualified class names to specify the codec, e.g. From 555cef88fe09134ac98fd0ad056121c7df2539aa Mon Sep 17 00:00:00 2001 From: guoxiaolongzte Date: Sun, 2 Apr 2017 08:16:08 +0800 Subject: [PATCH 3/7] '/applications/[app-id]/jobs' in rest api,status should be [running|succeeded|failed|unknown] --- docs/monitoring.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/monitoring.md b/docs/monitoring.md index 80519525af0c3..c1e827224b5b4 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -289,7 +289,7 @@ can be identified by their `[attempt-id]`. In the API listed below, when running /applications/[app-id]/jobs A list of all jobs for a given application. -
?status=[complete|succeeded|failed] list only jobs in the specific state. +
?status=[running|succeeded|failed|unknown] list only jobs in the specific state. From 0efb0dd9e404229cce638fe3fb0c966276784df7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=83=AD=E5=B0=8F=E9=BE=99=2010207633?= Date: Wed, 5 Apr 2017 11:47:53 +0800 Subject: [PATCH 4/7] [SPARK-20218]'/applications/[app-id]/stages' in REST API,add description. --- docs/monitoring.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/monitoring.md b/docs/monitoring.md index 4d0617d253b80..d180d77e2cd4d 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -299,6 +299,7 @@ can be identified by their `[attempt-id]`. In the API listed below, when running /applications/[app-id]/stages A list of all stages for a given application. +
?status=[active|complete|pending|failed] list only stages in the state. /applications/[app-id]/stages/[stage-id] From 0e37fdeee28e31fc97436dabd001d3c85c5a7794 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=83=AD=E5=B0=8F=E9=BE=99=2010207633?= Date: Wed, 5 Apr 2017 13:22:54 +0800 Subject: [PATCH 5/7] [SPARK-20218] '/applications/[app-id]/stages/[stage-id]' in REST API,remove redundant description. --- docs/monitoring.md | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/monitoring.md b/docs/monitoring.md index d180d77e2cd4d..da954385dc452 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -305,7 +305,6 @@ can be identified by their `[attempt-id]`. In the API listed below, when running /applications/[app-id]/stages/[stage-id] A list of all attempts for the given stage. -
?status=[active|complete|pending|failed] list only stages in the state. From 06cfd9aa134d75187c0635f4101cf75d3b2a0c6e Mon Sep 17 00:00:00 2001 From: guoxiaolong Date: Fri, 21 Apr 2017 16:49:40 +0800 Subject: [PATCH 6/7] =?UTF-8?q?[SPARK-20428][CORE][REST]REST=20interface?= =?UTF-8?q?=20about=20'v1/submissions/=20kill/',=20currently=20only=20supp?= =?UTF-8?q?orts=20delete=20a=20single=20=E2=80=98driver=E2=80=99,=20now=20?= =?UTF-8?q?let=20spark=20support=20delete=20some=20=E2=80=98drivers?= =?UTF-8?q?=E2=80=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../deploy/rest/RestSubmissionServer.scala | 6 ++--- .../deploy/rest/StandaloneRestServer.scala | 22 ++++++++++++++----- .../rest/StandaloneRestSubmitSuite.scala | 4 ++-- 3 files changed, 21 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala index b30c980e95a9a..2b3ec6fe0fc12 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala @@ -214,15 +214,15 @@ private[rest] abstract class KillRequestServlet extends RestServlet { protected override def doPost( request: HttpServletRequest, response: HttpServletResponse): Unit = { - val submissionId = parseSubmissionId(request.getPathInfo) - val responseMessage = submissionId.map(handleKill).getOrElse { + val submissionIds = parseSubmissionId(request.getPathInfo) + val responseMessage = submissionIds.map(handleKill).getOrElse { response.setStatus(HttpServletResponse.SC_BAD_REQUEST) handleError("Submission ID is missing in kill request.") } sendResponse(responseMessage, response) } - protected def handleKill(submissionId: String): KillSubmissionResponse + protected def handleKill(submissionIds: String): KillSubmissionResponse } /** diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala index 56620064c57fa..5a18742bda65e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala @@ -70,14 +70,24 @@ private[deploy] class StandaloneRestServer( private[rest] class StandaloneKillRequestServlet(masterEndpoint: RpcEndpointRef, conf: SparkConf) extends KillRequestServlet { - protected def handleKill(submissionId: String): KillSubmissionResponse = { - val response = masterEndpoint.askSync[DeployMessages.KillDriverResponse]( - DeployMessages.RequestKillDriver(submissionId)) + protected def handleKill(submissionIds: String): KillSubmissionResponse = { val k = new KillSubmissionResponse - k.serverSparkVersion = sparkVersion - k.message = response.message - k.submissionId = submissionId + Some(submissionIds).getOrElse("").split(",").map(submissionId => { + val response = masterEndpoint.askSync[DeployMessages.KillDriverResponse] ( + DeployMessages.RequestKillDriver(submissionId)) + if(k.submissionId.nonEmpty) { + k.submissionId = k.submissionId.concat(",").concat(submissionId) + } else { + k.submissionId = submissionId + } + if(k.message.nonEmpty) { + k.message = response.message + } else { + k.message = k.message.concat(",").concat(response.message) + } k.success = response.success + }) + k.serverSparkVersion = sparkVersion k } } diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala index dd50e33da30ac..0d0f4e9ed2065 100644 --- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala @@ -607,8 +607,8 @@ private class FaultyStandaloneRestServer( /** A faulty servlet that produces invalid responses. */ class InvalidKillServlet extends StandaloneKillRequestServlet(masterEndpoint, masterConf) { - protected override def handleKill(submissionId: String): KillSubmissionResponse = { - val k = super.handleKill(submissionId) + protected override def handleKill(submissionIds: String): KillSubmissionResponse = { + val k = super.handleKill(submissionIds) k.submissionId = null k } From fa34cc9a1e9dd51c3c33fe00fd70353df84599f7 Mon Sep 17 00:00:00 2001 From: guoxiaolong Date: Wed, 3 May 2017 13:58:50 +0800 Subject: [PATCH 7/7] [SPARK-20428]Add the batch to remove the interface. --- .../deploy/rest/RestSubmissionServer.scala | 26 ++++++++++++++++--- .../deploy/rest/StandaloneRestServer.scala | 8 +++--- 2 files changed, 27 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala index 2b3ec6fe0fc12..524f53729a9e5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala @@ -206,15 +206,35 @@ private[rest] abstract class RestServlet extends HttpServlet with Logging { * A servlet for handling kill requests passed to the [[RestSubmissionServer]]. */ private[rest] abstract class KillRequestServlet extends RestServlet { + private val BATCH_DELETE = "batch" + private val SUBMISSION_IDS = "submissionIds" /** - * If a submission ID is specified in the URL, have the Master kill the corresponding - * driver and return an appropriate response to the client. Otherwise, return error. + * 1.If a submission ID is specified in the URL, have the Master kill the corresponding + * driver and return an appropriate response to the client, + * For e.g., URL:/v1/submissions/kill/${a submission ID}. + * 2.if batch is specified in the URL, have the Master kill the corresponding + * drivers and return an appropriate response to the client. + * For e.g., URL:/v1/submissions/kill/batch + * 3.Otherwise, return error. */ protected override def doPost( request: HttpServletRequest, response: HttpServletResponse): Unit = { - val submissionIds = parseSubmissionId(request.getPathInfo) + val submissionId = parseSubmissionId(request.getPathInfo) + val submissionIds : Option[String] = { + if (BATCH_DELETE == submissionId.getOrElse("")) { + val requestMessageJson = Source.fromInputStream(request.getInputStream).mkString + val value: Option[String] = parse(requestMessageJson) match { + case JObject(fields) => + fields.collectFirst { case (SUBMISSION_IDS, v) => v }.collect { case JString(s) => s } + case _ => None + } + value + } else { + submissionId + } + } val responseMessage = submissionIds.map(handleKill).getOrElse { response.setStatus(HttpServletResponse.SC_BAD_REQUEST) handleError("Submission ID is missing in kill request.") diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala index 5a18742bda65e..26c8dfc5b8dbc 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala @@ -75,15 +75,15 @@ private[rest] class StandaloneKillRequestServlet(masterEndpoint: RpcEndpointRef, Some(submissionIds).getOrElse("").split(",").map(submissionId => { val response = masterEndpoint.askSync[DeployMessages.KillDriverResponse] ( DeployMessages.RequestKillDriver(submissionId)) - if(k.submissionId.nonEmpty) { + if(k.submissionId != null && k.submissionId.nonEmpty) { k.submissionId = k.submissionId.concat(",").concat(submissionId) } else { k.submissionId = submissionId } - if(k.message.nonEmpty) { - k.message = response.message - } else { + if(k.message != null && k.message.nonEmpty) { k.message = k.message.concat(",").concat(response.message) + } else { + k.message = response.message } k.success = response.success })