From 9d42a9cba7ef601c406b0f48eefc9ca3a96c42ea Mon Sep 17 00:00:00 2001 From: tangzhankun Date: Mon, 24 Jul 2017 14:28:33 +0800 Subject: [PATCH 1/2] allow configuration to set environment variables on driver and executor as below: --conf spark.executorEnv.[EnvironmentVariableName] --conf spark.driverEnv.[EnvironmentVariableName] --- docs/running-on-kubernetes.md | 16 ++++++++++++++++ .../apache/spark/deploy/kubernetes/config.scala | 2 ++ .../BaseDriverConfigurationStep.scala | 8 ++++++++ .../KubernetesClusterSchedulerBackend.scala | 4 ++-- .../BaseDriverConfigurationStepSuite.scala | 9 ++++++++- 5 files changed, 36 insertions(+), 3 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 5e23801e15b10..45d7e2107e32d 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -768,6 +768,22 @@ from the other deployment modes. See the [configuration page](configuration.html myIdentifier. Multiple node selector keys can be added by setting multiple configurations with this prefix. + + spark.executorEnv.[EnvironmentVariableName] + (none) + + Add the environment variable specified by EnvironmentVariableName to + the Executor process. The user can specify multiple of these to set multiple environment variables. + + + + spark.driverEnv.[EnvironmentVariableName] + (none) + + Add the environment variable specified by EnvironmentVariableName to + the Driver process. The user can specify multiple of these to set multiple environment variables. + + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index f9c4c9c6a1e18..40da5a478452b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -126,6 +126,8 @@ package object config extends Logging { .stringConf .createOptional + private[spark] val KUBERNETES_DRIVER_ENV_KEY = "spark.driverEnv." + private[spark] val KUBERNETES_DRIVER_ANNOTATIONS = ConfigBuilder("spark.kubernetes.driver.annotations") .doc("Custom annotations that will be added to the driver pod. This should be a" + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/BaseDriverConfigurationStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/BaseDriverConfigurationStep.scala index b3f509b44054e..ae6cdf8e53e98 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/BaseDriverConfigurationStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/BaseDriverConfigurationStep.scala @@ -72,6 +72,13 @@ private[spark] class BaseDriverConfigurationStep( require(!driverCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION), s"Annotation with key $SPARK_APP_NAME_ANNOTATION is not allowed as it is reserved for" + s" Spark bookkeeping operations.") + + val driverCustomEnvs = submissionSparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ENV_KEY).toSeq + .map(env => new EnvVarBuilder() + .withName(env._1) + .withValue(env._2) + .build()) + val allDriverAnnotations = driverCustomAnnotations ++ Map(SPARK_APP_NAME_ANNOTATION -> appName) val nodeSelector = ConfigurationUtils.parsePrefixedKeyValuePairs( submissionSparkConf, KUBERNETES_NODE_SELECTOR_PREFIX, "node selector") @@ -91,6 +98,7 @@ private[spark] class BaseDriverConfigurationStep( .withName(DRIVER_CONTAINER_NAME) .withImage(driverDockerImage) .withImagePullPolicy(dockerImagePullPolicy) + .addAllToEnv(driverCustomEnvs.asJava) .addToEnv(driverExtraClasspathEnv.toSeq: _*) .addNewEnv() .withName(ENV_DRIVER_MEMORY) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index a50a9c8bb9c3b..4eae6ee3184ba 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -455,7 +455,7 @@ private[spark] class KubernetesClusterSchedulerBackend( .withValue(cp) .build() } - val requiredEnv = Seq( + val requiredEnv = (Seq( (ENV_EXECUTOR_PORT, executorPort.toString), (ENV_DRIVER_URL, driverUrl), // Executor backend expects integral value for executor cores, so round it up to an int. @@ -463,7 +463,7 @@ private[spark] class KubernetesClusterSchedulerBackend( (ENV_EXECUTOR_MEMORY, executorMemoryString), (ENV_APPLICATION_ID, applicationId()), (ENV_EXECUTOR_ID, executorId), - (ENV_MOUNTED_CLASSPATH, s"$executorJarsDownloadDir/*")) + (ENV_MOUNTED_CLASSPATH, s"$executorJarsDownloadDir/*")) ++ sc.executorEnvs.toSeq) .map(env => new EnvVarBuilder() .withName(env._1) .withValue(env._2) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/BaseDriverConfigurationStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/BaseDriverConfigurationStepSuite.scala index c7d80a16a1532..4520c40ec81c1 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/BaseDriverConfigurationStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/BaseDriverConfigurationStepSuite.scala @@ -36,6 +36,8 @@ private[spark] class BaseDriverConfigurationStepSuite extends SparkFunSuite { private val CUSTOM_ANNOTATION_VALUE = "customAnnotationValue" private val DEPRECATED_CUSTOM_ANNOTATION_KEY = "customAnnotationDeprecated" private val DEPRECATED_CUSTOM_ANNOTATION_VALUE = "customAnnotationDeprecatedValue" + private val DRIVER_CUSTOM_ENV_KEY1 = "customDriverEnv1" + private val DRIVER_CUSTOM_ENV_KEY2 = "customDriverEnv2" test("Set all possible configurations from the user.") { val sparkConf = new SparkConf() @@ -49,6 +51,9 @@ private[spark] class BaseDriverConfigurationStepSuite extends SparkFunSuite { .set(s"spark.kubernetes.driver.annotation.$CUSTOM_ANNOTATION_KEY", CUSTOM_ANNOTATION_VALUE) .set("spark.kubernetes.driver.annotations", s"$DEPRECATED_CUSTOM_ANNOTATION_KEY=$DEPRECATED_CUSTOM_ANNOTATION_VALUE") + .set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY1", "customDriverEnv1") + .set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY2", "customDriverEnv2") + val submissionStep = new BaseDriverConfigurationStep( APP_ID, RESOURCE_NAME_PREFIX, @@ -74,11 +79,13 @@ private[spark] class BaseDriverConfigurationStepSuite extends SparkFunSuite { .asScala .map(env => (env.getName, env.getValue)) .toMap - assert(envs.size === 4) + assert(envs.size === 6) assert(envs(ENV_SUBMIT_EXTRA_CLASSPATH) === "/opt/spark/spark-exmaples.jar") assert(envs(ENV_DRIVER_MEMORY) === "456m") assert(envs(ENV_DRIVER_MAIN_CLASS) === MAIN_CLASS) assert(envs(ENV_DRIVER_ARGS) === "arg1 arg2") + assert(envs(DRIVER_CUSTOM_ENV_KEY1) === "customDriverEnv1") + assert(envs(DRIVER_CUSTOM_ENV_KEY2) === "customDriverEnv2") val resourceRequirements = preparedDriverSpec.driverContainer.getResources val requests = resourceRequirements.getRequests.asScala assert(requests("cpu").getAmount === "2") From c5df8544a6461f3965ac3be374c3fd69865b1ea1 Mon Sep 17 00:00:00 2001 From: tangzhankun Date: Fri, 28 Jul 2017 11:06:07 +0800 Subject: [PATCH 2/2] change the driver environment key prefix to spark.kubernetes.driverEnv. --- docs/running-on-kubernetes.md | 2 +- .../scala/org/apache/spark/deploy/kubernetes/config.scala | 2 +- .../submit/submitsteps/BaseDriverConfigurationStep.scala | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 45d7e2107e32d..4286ab19eb3ad 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -777,7 +777,7 @@ from the other deployment modes. See the [configuration page](configuration.html - spark.driverEnv.[EnvironmentVariableName] + spark.kubernetes.driverEnv.[EnvironmentVariableName] (none) Add the environment variable specified by EnvironmentVariableName to diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index 40da5a478452b..6e1633f6a63cb 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -126,7 +126,7 @@ package object config extends Logging { .stringConf .createOptional - private[spark] val KUBERNETES_DRIVER_ENV_KEY = "spark.driverEnv." + private[spark] val KUBERNETES_DRIVER_ENV_KEY = "spark.kubernetes.driverEnv." private[spark] val KUBERNETES_DRIVER_ANNOTATIONS = ConfigBuilder("spark.kubernetes.driver.annotations") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/BaseDriverConfigurationStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/BaseDriverConfigurationStep.scala index ae6cdf8e53e98..a8539e0772163 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/BaseDriverConfigurationStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/BaseDriverConfigurationStep.scala @@ -75,9 +75,9 @@ private[spark] class BaseDriverConfigurationStep( val driverCustomEnvs = submissionSparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ENV_KEY).toSeq .map(env => new EnvVarBuilder() - .withName(env._1) - .withValue(env._2) - .build()) + .withName(env._1) + .withValue(env._2) + .build()) val allDriverAnnotations = driverCustomAnnotations ++ Map(SPARK_APP_NAME_ANNOTATION -> appName) val nodeSelector = ConfigurationUtils.parsePrefixedKeyValuePairs(