Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,22 @@ from the other deployment modes. See the [configuration page](configuration.html
<code>myIdentifier</code>. Multiple node selector keys can be added by setting multiple configurations with this prefix.
</td>
</tr>
<tr>
<td><code>spark.executorEnv.[EnvironmentVariableName]</code></td>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is already doc'd in docs/configuration.md -- please remove this duplicate config from docs/running-on-kubernetes.md and reference it in the spark.kubernetes.driverEnv.[EnvironmentVariableName] section below with language something like For executors, use the generic spark.executorEnv.[EnvironmentVariableName] which works across cluster managers

<td>(none)</td>
<td>
Add the environment variable specified by <code>EnvironmentVariableName</code> to
the Executor process. The user can specify multiple of these to set multiple environment variables.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driverEnv.[EnvironmentVariableName]</code></td>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the equivalent of YARN's spark.yarn.appMasterEnv.[EnvironmentVariableName]

<td>(none)</td>
<td>
Add the environment variable specified by <code>EnvironmentVariableName</code> to
the Driver process. The user can specify multiple of these to set multiple environment variables.
</td>
</tr>
</table>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ package object config extends Logging {
.stringConf
.createOptional

private[spark] val KUBERNETES_DRIVER_ENV_KEY = "spark.kubernetes.driverEnv."

private[spark] val KUBERNETES_DRIVER_ANNOTATIONS =
ConfigBuilder("spark.kubernetes.driver.annotations")
.doc("Custom annotations that will be added to the driver pod. This should be a" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ private[spark] class BaseDriverConfigurationStep(
require(!driverCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION),
s"Annotation with key $SPARK_APP_NAME_ANNOTATION is not allowed as it is reserved for" +
s" Spark bookkeeping operations.")

val driverCustomEnvs = submissionSparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ENV_KEY).toSeq
.map(env => new EnvVarBuilder()
.withName(env._1)
.withValue(env._2)
.build())

val allDriverAnnotations = driverCustomAnnotations ++ Map(SPARK_APP_NAME_ANNOTATION -> appName)
val nodeSelector = ConfigurationUtils.parsePrefixedKeyValuePairs(
submissionSparkConf, KUBERNETES_NODE_SELECTOR_PREFIX, "node selector")
Expand All @@ -91,6 +98,7 @@ private[spark] class BaseDriverConfigurationStep(
.withName(DRIVER_CONTAINER_NAME)
.withImage(driverDockerImage)
.withImagePullPolicy(dockerImagePullPolicy)
.addAllToEnv(driverCustomEnvs.asJava)
.addToEnv(driverExtraClasspathEnv.toSeq: _*)
.addNewEnv()
.withName(ENV_DRIVER_MEMORY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,15 +456,15 @@ private[spark] class KubernetesClusterSchedulerBackend(
.withValue(cp)
.build()
}
val requiredEnv = Seq(
val requiredEnv = (Seq(
(ENV_EXECUTOR_PORT, executorPort.toString),
(ENV_DRIVER_URL, driverUrl),
// Executor backend expects integral value for executor cores, so round it up to an int.
(ENV_EXECUTOR_CORES, math.ceil(executorCores).toInt.toString),
(ENV_EXECUTOR_MEMORY, executorMemoryString),
(ENV_APPLICATION_ID, applicationId()),
(ENV_EXECUTOR_ID, executorId),
(ENV_MOUNTED_CLASSPATH, s"$executorJarsDownloadDir/*"))
(ENV_MOUNTED_CLASSPATH, s"$executorJarsDownloadDir/*")) ++ sc.executorEnvs.toSeq)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if a user specifies an executorEnv that conflicts with one of the above? I think we want to prohibit users from overriding the Spark-internal settings and throw an exception if they attempt to do that

.map(env => new EnvVarBuilder()
.withName(env._1)
.withValue(env._2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ private[spark] class BaseDriverConfigurationStepSuite extends SparkFunSuite {
private val CUSTOM_ANNOTATION_VALUE = "customAnnotationValue"
private val DEPRECATED_CUSTOM_ANNOTATION_KEY = "customAnnotationDeprecated"
private val DEPRECATED_CUSTOM_ANNOTATION_VALUE = "customAnnotationDeprecatedValue"
private val DRIVER_CUSTOM_ENV_KEY1 = "customDriverEnv1"
private val DRIVER_CUSTOM_ENV_KEY2 = "customDriverEnv2"

test("Set all possible configurations from the user.") {
val sparkConf = new SparkConf()
Expand All @@ -49,6 +51,9 @@ private[spark] class BaseDriverConfigurationStepSuite extends SparkFunSuite {
.set(s"spark.kubernetes.driver.annotation.$CUSTOM_ANNOTATION_KEY", CUSTOM_ANNOTATION_VALUE)
.set("spark.kubernetes.driver.annotations",
s"$DEPRECATED_CUSTOM_ANNOTATION_KEY=$DEPRECATED_CUSTOM_ANNOTATION_VALUE")
.set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY1", "customDriverEnv1")
.set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY2", "customDriverEnv2")

val submissionStep = new BaseDriverConfigurationStep(
APP_ID,
RESOURCE_NAME_PREFIX,
Expand All @@ -74,11 +79,13 @@ private[spark] class BaseDriverConfigurationStepSuite extends SparkFunSuite {
.asScala
.map(env => (env.getName, env.getValue))
.toMap
assert(envs.size === 4)
assert(envs.size === 6)
assert(envs(ENV_SUBMIT_EXTRA_CLASSPATH) === "/opt/spark/spark-exmaples.jar")
assert(envs(ENV_DRIVER_MEMORY) === "456m")
assert(envs(ENV_DRIVER_MAIN_CLASS) === MAIN_CLASS)
assert(envs(ENV_DRIVER_ARGS) === "arg1 arg2")
assert(envs(DRIVER_CUSTOM_ENV_KEY1) === "customDriverEnv1")
assert(envs(DRIVER_CUSTOM_ENV_KEY2) === "customDriverEnv2")
val resourceRequirements = preparedDriverSpec.driverContainer.getResources
val requests = resourceRequirements.getRequests.asScala
assert(requests("cpu").getAmount === "2")
Expand Down