Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,22 @@ from the other deployment modes. See the [configuration page](configuration.html
<code>myIdentifier</code>. Multiple node selector keys can be added by setting multiple configurations with this prefix.
</td>
</tr>
<tr>
<td><code>spark.executorEnv.[EnvironmentVariableName]</code></td>
<td>(none)</td>
<td>
Add the environment variable specified by <code>EnvironmentVariableName</code> to
the Executor process. The user can specify multiple of these to set multiple environment variables.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driverEnv.[EnvironmentVariableName]</code></td>
<td>(none)</td>
<td>
Add the environment variable specified by <code>EnvironmentVariableName</code> to
the Driver process. The user can specify multiple of these to set multiple environment variables.
</td>
</tr>
</table>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ package object config extends Logging {
.stringConf
.createOptional

private[spark] val KUBERNETES_DRIVER_ENV_KEY = "spark.kubernetes.driverEnv."

private[spark] val KUBERNETES_DRIVER_ANNOTATIONS =
ConfigBuilder("spark.kubernetes.driver.annotations")
.doc("Custom annotations that will be added to the driver pod. This should be a" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ private[spark] class BaseDriverConfigurationStep(
require(!driverCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION),
s"Annotation with key $SPARK_APP_NAME_ANNOTATION is not allowed as it is reserved for" +
s" Spark bookkeeping operations.")

val driverCustomEnvs = submissionSparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ENV_KEY).toSeq
.map(env => new EnvVarBuilder()
.withName(env._1)
.withValue(env._2)
.build())

val allDriverAnnotations = driverCustomAnnotations ++ Map(SPARK_APP_NAME_ANNOTATION -> appName)
val nodeSelector = ConfigurationUtils.parsePrefixedKeyValuePairs(
submissionSparkConf, KUBERNETES_NODE_SELECTOR_PREFIX, "node selector")
Expand All @@ -91,6 +98,7 @@ private[spark] class BaseDriverConfigurationStep(
.withName(DRIVER_CONTAINER_NAME)
.withImage(driverDockerImage)
.withImagePullPolicy(dockerImagePullPolicy)
.addAllToEnv(driverCustomEnvs.asJava)
.addToEnv(driverExtraClasspathEnv.toSeq: _*)
.addNewEnv()
.withName(ENV_DRIVER_MEMORY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,15 +455,15 @@ private[spark] class KubernetesClusterSchedulerBackend(
.withValue(cp)
.build()
}
val requiredEnv = Seq(
val requiredEnv = (Seq(
(ENV_EXECUTOR_PORT, executorPort.toString),
(ENV_DRIVER_URL, driverUrl),
// Executor backend expects integral value for executor cores, so round it up to an int.
(ENV_EXECUTOR_CORES, math.ceil(executorCores).toInt.toString),
(ENV_EXECUTOR_MEMORY, executorMemoryString),
(ENV_APPLICATION_ID, applicationId()),
(ENV_EXECUTOR_ID, executorId),
(ENV_MOUNTED_CLASSPATH, s"$executorJarsDownloadDir/*"))
(ENV_MOUNTED_CLASSPATH, s"$executorJarsDownloadDir/*")) ++ sc.executorEnvs.toSeq)
.map(env => new EnvVarBuilder()
.withName(env._1)
.withValue(env._2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ private[spark] class BaseDriverConfigurationStepSuite extends SparkFunSuite {
private val CUSTOM_ANNOTATION_VALUE = "customAnnotationValue"
private val DEPRECATED_CUSTOM_ANNOTATION_KEY = "customAnnotationDeprecated"
private val DEPRECATED_CUSTOM_ANNOTATION_VALUE = "customAnnotationDeprecatedValue"
private val DRIVER_CUSTOM_ENV_KEY1 = "customDriverEnv1"
private val DRIVER_CUSTOM_ENV_KEY2 = "customDriverEnv2"

test("Set all possible configurations from the user.") {
val sparkConf = new SparkConf()
Expand All @@ -49,6 +51,9 @@ private[spark] class BaseDriverConfigurationStepSuite extends SparkFunSuite {
.set(s"spark.kubernetes.driver.annotation.$CUSTOM_ANNOTATION_KEY", CUSTOM_ANNOTATION_VALUE)
.set("spark.kubernetes.driver.annotations",
s"$DEPRECATED_CUSTOM_ANNOTATION_KEY=$DEPRECATED_CUSTOM_ANNOTATION_VALUE")
.set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY1", "customDriverEnv1")
.set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY2", "customDriverEnv2")

val submissionStep = new BaseDriverConfigurationStep(
APP_ID,
RESOURCE_NAME_PREFIX,
Expand All @@ -74,11 +79,13 @@ private[spark] class BaseDriverConfigurationStepSuite extends SparkFunSuite {
.asScala
.map(env => (env.getName, env.getValue))
.toMap
assert(envs.size === 4)
assert(envs.size === 6)
assert(envs(ENV_SUBMIT_EXTRA_CLASSPATH) === "/opt/spark/spark-exmaples.jar")
assert(envs(ENV_DRIVER_MEMORY) === "456m")
assert(envs(ENV_DRIVER_MAIN_CLASS) === MAIN_CLASS)
assert(envs(ENV_DRIVER_ARGS) === "arg1 arg2")
assert(envs(DRIVER_CUSTOM_ENV_KEY1) === "customDriverEnv1")
assert(envs(DRIVER_CUSTOM_ENV_KEY2) === "customDriverEnv2")
val resourceRequirements = preparedDriverSpec.driverContainer.getResources
val requests = resourceRequirements.getRequests.asScala
assert(requests("cpu").getAmount === "2")
Expand Down