From 7dd79724eeddf80e4ec395e437a5cd9ad37aa63a Mon Sep 17 00:00:00 2001 From: Pat Shields Date: Wed, 5 Aug 2015 11:16:45 -0400 Subject: [PATCH 1/4] =?UTF-8?q?[SPARK-9672][MESOS]=20Don=E2=80=99t=20inclu?= =?UTF-8?q?de=20SPARK=5FENV=5FLOADED=20when=20passing=20env=20vars?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../spark/deploy/rest/RestSubmissionClient.scala | 16 ++++++++++++---- .../deploy/rest/StandaloneRestSubmitSuite.scala | 6 ++++++ 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala index 1fe956320a1b8..f72651463237a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala @@ -392,15 +392,14 @@ private[spark] object RestSubmissionClient { mainClass: String, appArgs: Array[String], conf: SparkConf, - env: Map[String, String] = sys.env): SubmitRestProtocolResponse = { + env: Map[String, String] = Map()): SubmitRestProtocolResponse = { val master = conf.getOption("spark.master").getOrElse { throw new IllegalArgumentException("'spark.master' must be set.") } val sparkProperties = conf.getAll.toMap - val environmentVariables = env.filter { case (k, _) => k.startsWith("SPARK_") } val client = new RestSubmissionClient(master) val submitRequest = client.constructSubmitRequest( - appResource, mainClass, appArgs, sparkProperties, environmentVariables) + appResource, mainClass, appArgs, sparkProperties, env) client.createSubmission(submitRequest) } @@ -413,6 +412,15 @@ private[spark] object RestSubmissionClient { val mainClass = args(1) val appArgs = args.slice(2, args.size) val conf = new SparkConf - run(appResource, mainClass, appArgs, conf) + val env = filterSystemEnvironment(sys.env) + run(appResource, mainClass, appArgs, conf, env) + } + + /** + * Filter non-spark environment variables from any environment. + */ + def filterSystemEnvironment(env: Map[String, String]): Map[String, String] = { + val sparkVars = env.filter { case (k, _) => k.startsWith("SPARK_") } + sparkVars - "SPARK_ENV_LOADED" } } diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala index 96e456d889ac3..ad2e6741a447c 100644 --- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala @@ -366,6 +366,12 @@ class StandaloneRestSubmitSuite extends SparkFunSuite with BeforeAndAfterEach { assert(conn3.getResponseCode === HttpServletResponse.SC_INTERNAL_SERVER_ERROR) } + test("client does not send 'SPARK_ENV_LOADED' env var by default") { + val environmentVariables = Map("SPARK_VAR" -> "1", "SPARK_ENV_LOADED" -> "1") + val filteredVariables = RestSubmissionClient.filterSystemEnvironment(environmentVariables) + assert(filteredVariables == Map("SPARK_VAR" -> "1")) + } + /* --------------------- * | Helper methods | * --------------------- */ From 28a34b87e79b5529a89f254a1affbffb0bfe29cc Mon Sep 17 00:00:00 2001 From: Pat Shields Date: Tue, 11 Aug 2015 13:10:46 -0400 Subject: [PATCH 2/4] =?UTF-8?q?[SPARK-9672][MESOS]=20Don=E2=80=99t=20filte?= =?UTF-8?q?r=20out=20MESOS=5F*=20when=20passing=20env=20vars?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/apache/spark/deploy/rest/RestSubmissionClient.scala | 2 +- .../spark/deploy/rest/StandaloneRestSubmitSuite.scala | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala index f72651463237a..c07db145dfc4b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala @@ -420,7 +420,7 @@ private[spark] object RestSubmissionClient { * Filter non-spark environment variables from any environment. */ def filterSystemEnvironment(env: Map[String, String]): Map[String, String] = { - val sparkVars = env.filter { case (k, _) => k.startsWith("SPARK_") } + val sparkVars = env.filter { case (k, _) => k.startsWith("SPARK_") || k.startsWith("MESOS_") } sparkVars - "SPARK_ENV_LOADED" } } diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala index ad2e6741a447c..9693e32bf6af6 100644 --- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala @@ -372,6 +372,12 @@ class StandaloneRestSubmitSuite extends SparkFunSuite with BeforeAndAfterEach { assert(filteredVariables == Map("SPARK_VAR" -> "1")) } + test("client includes mesos env vars") { + val environmentVariables = Map("SPARK_VAR" -> "1", "MESOS_VAR" -> "1", "OTHER_VAR" -> "1") + val filteredVariables = RestSubmissionClient.filterSystemEnvironment(environmentVariables) + assert(filteredVariables == Map("SPARK_VAR" -> "1", "MESOS_VAR" -> "1")) + } + /* --------------------- * | Helper methods | * --------------------- */ From 0382675606b53f81b1f4e2663f4538b54cedb1ad Mon Sep 17 00:00:00 2001 From: Pat Shields Date: Thu, 3 Sep 2015 08:28:04 -0400 Subject: [PATCH 3/4] Remove SPARK_ENV_LOADED as part of filter to reduce copies --- .../org/apache/spark/deploy/rest/RestSubmissionClient.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala index c07db145dfc4b..8d806da47ddf7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala @@ -420,7 +420,8 @@ private[spark] object RestSubmissionClient { * Filter non-spark environment variables from any environment. */ def filterSystemEnvironment(env: Map[String, String]): Map[String, String] = { - val sparkVars = env.filter { case (k, _) => k.startsWith("SPARK_") || k.startsWith("MESOS_") } - sparkVars - "SPARK_ENV_LOADED" + env.filter { case (k, _) => + (k.startsWith("SPARK_") && k != "SPARK_ENV_LOADED") || k.startsWith("MESOS_") + } } } From 539988ce1399ec3b677ba96f5a3654e5068f0645 Mon Sep 17 00:00:00 2001 From: Pat Shields Date: Thu, 3 Sep 2015 08:28:37 -0400 Subject: [PATCH 4/4] Make filterSystemEnvironment method protected --- .../org/apache/spark/deploy/rest/RestSubmissionClient.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala index 8d806da47ddf7..65892452afa64 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala @@ -419,7 +419,7 @@ private[spark] object RestSubmissionClient { /** * Filter non-spark environment variables from any environment. */ - def filterSystemEnvironment(env: Map[String, String]): Map[String, String] = { + protected[rest] def filterSystemEnvironment(env: Map[String, String]): Map[String, String] = { env.filter { case (k, _) => (k.startsWith("SPARK_") && k != "SPARK_ENV_LOADED") || k.startsWith("MESOS_") }