diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 5e23801e15b10..4286ab19eb3ad 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -768,6 +768,22 @@ from the other deployment modes. See the [configuration page](configuration.html myIdentifier. Multiple node selector keys can be added by setting multiple configurations with this prefix. + + spark.executorEnv.[EnvironmentVariableName] + (none) + + Add the environment variable specified by EnvironmentVariableName to + the Executor process. The user can specify multiple of these to set multiple environment variables. + + + + spark.kubernetes.driverEnv.[EnvironmentVariableName] + (none) + + Add the environment variable specified by EnvironmentVariableName to + the Driver process. The user can specify multiple of these to set multiple environment variables. + + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index f9c4c9c6a1e18..6e1633f6a63cb 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -126,6 +126,8 @@ package object config extends Logging { .stringConf .createOptional + private[spark] val KUBERNETES_DRIVER_ENV_KEY = "spark.kubernetes.driverEnv." + private[spark] val KUBERNETES_DRIVER_ANNOTATIONS = ConfigBuilder("spark.kubernetes.driver.annotations") .doc("Custom annotations that will be added to the driver pod. This should be a" + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/BaseDriverConfigurationStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/BaseDriverConfigurationStep.scala index b3f509b44054e..a8539e0772163 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/BaseDriverConfigurationStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/BaseDriverConfigurationStep.scala @@ -72,6 +72,13 @@ private[spark] class BaseDriverConfigurationStep( require(!driverCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION), s"Annotation with key $SPARK_APP_NAME_ANNOTATION is not allowed as it is reserved for" + s" Spark bookkeeping operations.") + + val driverCustomEnvs = submissionSparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ENV_KEY).toSeq + .map(env => new EnvVarBuilder() + .withName(env._1) + .withValue(env._2) + .build()) + val allDriverAnnotations = driverCustomAnnotations ++ Map(SPARK_APP_NAME_ANNOTATION -> appName) val nodeSelector = ConfigurationUtils.parsePrefixedKeyValuePairs( submissionSparkConf, KUBERNETES_NODE_SELECTOR_PREFIX, "node selector") @@ -91,6 +98,7 @@ private[spark] class BaseDriverConfigurationStep( .withName(DRIVER_CONTAINER_NAME) .withImage(driverDockerImage) .withImagePullPolicy(dockerImagePullPolicy) + .addAllToEnv(driverCustomEnvs.asJava) .addToEnv(driverExtraClasspathEnv.toSeq: _*) .addNewEnv() .withName(ENV_DRIVER_MEMORY) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index a50a9c8bb9c3b..4eae6ee3184ba 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -455,7 +455,7 @@ private[spark] class KubernetesClusterSchedulerBackend( .withValue(cp) .build() } - val requiredEnv = Seq( + val requiredEnv = (Seq( (ENV_EXECUTOR_PORT, executorPort.toString), (ENV_DRIVER_URL, driverUrl), // Executor backend expects integral value for executor cores, so round it up to an int. @@ -463,7 +463,7 @@ private[spark] class KubernetesClusterSchedulerBackend( (ENV_EXECUTOR_MEMORY, executorMemoryString), (ENV_APPLICATION_ID, applicationId()), (ENV_EXECUTOR_ID, executorId), - (ENV_MOUNTED_CLASSPATH, s"$executorJarsDownloadDir/*")) + (ENV_MOUNTED_CLASSPATH, s"$executorJarsDownloadDir/*")) ++ sc.executorEnvs.toSeq) .map(env => new EnvVarBuilder() .withName(env._1) .withValue(env._2) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/BaseDriverConfigurationStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/BaseDriverConfigurationStepSuite.scala index c7d80a16a1532..4520c40ec81c1 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/BaseDriverConfigurationStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/BaseDriverConfigurationStepSuite.scala @@ -36,6 +36,8 @@ private[spark] class BaseDriverConfigurationStepSuite extends SparkFunSuite { private val CUSTOM_ANNOTATION_VALUE = "customAnnotationValue" private val DEPRECATED_CUSTOM_ANNOTATION_KEY = "customAnnotationDeprecated" private val DEPRECATED_CUSTOM_ANNOTATION_VALUE = "customAnnotationDeprecatedValue" + private val DRIVER_CUSTOM_ENV_KEY1 = "customDriverEnv1" + private val DRIVER_CUSTOM_ENV_KEY2 = "customDriverEnv2" test("Set all possible configurations from the user.") { val sparkConf = new SparkConf() @@ -49,6 +51,9 @@ private[spark] class BaseDriverConfigurationStepSuite extends SparkFunSuite { .set(s"spark.kubernetes.driver.annotation.$CUSTOM_ANNOTATION_KEY", CUSTOM_ANNOTATION_VALUE) .set("spark.kubernetes.driver.annotations", s"$DEPRECATED_CUSTOM_ANNOTATION_KEY=$DEPRECATED_CUSTOM_ANNOTATION_VALUE") + .set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY1", "customDriverEnv1") + .set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY2", "customDriverEnv2") + val submissionStep = new BaseDriverConfigurationStep( APP_ID, RESOURCE_NAME_PREFIX, @@ -74,11 +79,13 @@ private[spark] class BaseDriverConfigurationStepSuite extends SparkFunSuite { .asScala .map(env => (env.getName, env.getValue)) .toMap - assert(envs.size === 4) + assert(envs.size === 6) assert(envs(ENV_SUBMIT_EXTRA_CLASSPATH) === "/opt/spark/spark-exmaples.jar") assert(envs(ENV_DRIVER_MEMORY) === "456m") assert(envs(ENV_DRIVER_MAIN_CLASS) === MAIN_CLASS) assert(envs(ENV_DRIVER_ARGS) === "arg1 arg2") + assert(envs(DRIVER_CUSTOM_ENV_KEY1) === "customDriverEnv1") + assert(envs(DRIVER_CUSTOM_ENV_KEY2) === "customDriverEnv2") val resourceRequirements = preparedDriverSpec.driverContainer.getResources val requests = resourceRequirements.getRequests.asScala assert(requests("cpu").getAmount === "2")