Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 0 additions & 42 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -606,48 +606,6 @@ from the other deployment modes. See the [configuration page](configuration.html
<code>myIdentifier</code>. Multiple annotations can be added by setting multiple configurations with this prefix.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.labels</code></td>
<td>(none)</td>
<td>
<i>Deprecated.</i> Use <code>spark.kubernetes.driver.label.<labelKey></code> instead which supports <code>=</code>
and <code>,</code> characters in label values.
Custom labels that will be added to the driver pod. This should be a comma-separated list of label key-value pairs,
where each label is in the format <code>key=value</code>. Note that Spark also adds its own labels to the driver pod
for bookkeeping purposes.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.annotations</code></td>
<td>(none)</td>
<td>
<i>Deprecated.</i> Use <code>spark.kubernetes.driver.annotation.<annotationKey></code> instead which supports
<code>=</code> and <code>,</code> characters in annotation values.
Custom annotations that will be added to the driver pod. This should be a comma-separated list of label key-value
pairs, where each annotation is in the format <code>key=value</code>.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.labels</code></td>
<td>(none)</td>
<td>
<i>Deprecated.</i> Use <code>spark.kubernetes.executor.label.<labelKey></code> instead which supports
<code>=</code> and <code>,</code> characters in label values.
Custom labels that will be added to the executor pods. This should be a comma-separated list of label key-value
pairs, where each label is in the format <code>key=value</code>. Note that Spark also adds its own labels to the
executor pods for bookkeeping purposes.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.annotations</code></td>
<td>(none)</td>
<td>
<i>Deprecated.</i> Use <code>spark.kubernetes.executor.annotation.<annotationKey></code> instead which supports
<code>=</code> and <code>,</code> characters in annotation values.
Custom annotations that will be added to the executor pods. This should be a comma-separated list of annotation
key-value pairs, where each annotation is in the format <code>key=value</code>.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.pod.name</code></td>
<td>(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.deploy.k8s

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.OptionalConfigEntry

object ConfigurationUtils extends Logging {
def parseKeyValuePairs(
Expand All @@ -41,31 +40,6 @@ object ConfigurationUtils extends Logging {
}).getOrElse(Map.empty[String, String])
}

def combinePrefixedKeyValuePairsWithDeprecatedConf(
sparkConf: SparkConf,
prefix: String,
deprecatedConf: OptionalConfigEntry[String],
configType: String): Map[String, String] = {
val deprecatedKeyValuePairsString = sparkConf.get(deprecatedConf)
deprecatedKeyValuePairsString.foreach { _ =>
logWarning(s"Configuration with key ${deprecatedConf.key} is deprecated. Use" +
s" configurations with prefix $prefix<key> instead.")
}
val fromDeprecated = parseKeyValuePairs(
deprecatedKeyValuePairsString,
deprecatedConf.key,
configType)
val fromPrefix = sparkConf.getAllWithPrefix(prefix)
val combined = fromDeprecated.toSeq ++ fromPrefix
combined.groupBy(_._1).foreach {
case (key, values) =>
require(values.size == 1,
s"Cannot have multiple values for a given $configType key, got key $key with" +
s" values $values")
}
combined.toMap
}

def parsePrefixedKeyValuePairs(
sparkConf: SparkConf,
prefix: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,43 +115,12 @@ package object config extends Logging {

private[spark] val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label."
private[spark] val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation."

private[spark] val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label."
private[spark] val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation."

private[spark] val KUBERNETES_DRIVER_LABELS =
ConfigBuilder("spark.kubernetes.driver.labels")
.doc("Custom labels that will be added to the driver pod. This should be a comma-separated" +
" list of label key-value pairs, where each label is in the format key=value. Note that" +
" Spark also adds its own labels to the driver pod for bookkeeping purposes.")
.stringConf
.createOptional

private[spark] val KUBERNETES_DRIVER_ENV_KEY = "spark.kubernetes.driverEnv."

private[spark] val KUBERNETES_DRIVER_ANNOTATIONS =
ConfigBuilder("spark.kubernetes.driver.annotations")
.doc("Custom annotations that will be added to the driver pod. This should be a" +
" comma-separated list of annotation key-value pairs, where each annotation is in the" +
" format key=value.")
.stringConf
.createOptional

private[spark] val KUBERNETES_EXECUTOR_LABELS =
ConfigBuilder("spark.kubernetes.executor.labels")
.doc("Custom labels that will be added to the executor pods. This should be a" +
" comma-separated list of label key-value pairs, where each label is in the format" +
" key=value.")
.stringConf
.createOptional

private[spark] val KUBERNETES_EXECUTOR_ANNOTATIONS =
ConfigBuilder("spark.kubernetes.executor.annotations")
.doc("Custom annotations that will be added to the executor pods. This should be a" +
" comma-separated list of annotation key-value pairs, where each annotation is in the" +
" format key=value.")
.stringConf
.createOptional

private[spark] val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets."
private[spark] val KUBERNETES_EXECUTOR_SECRETS_PREFIX = "spark.kubernetes.executor.secrets."

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,9 @@ private[spark] class DriverConfigurationStepsOrchestrator(
.getOrElse(Array.empty[String]) ++
additionalMainAppPythonFile.toSeq ++
additionalPythonFiles
val driverCustomLabels = ConfigurationUtils.combinePrefixedKeyValuePairsWithDeprecatedConf(
val driverCustomLabels = ConfigurationUtils.parsePrefixedKeyValuePairs(
submissionSparkConf,
KUBERNETES_DRIVER_LABEL_PREFIX,
KUBERNETES_DRIVER_LABELS,
"label")
require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), s"Label with key " +
s" $SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping" +
Expand Down Expand Up @@ -124,7 +123,7 @@ private[spark] class DriverConfigurationStepsOrchestrator(
// Then, indicate to the outer block that the init-container should not handle
// those local files simply by filtering them out.
val sparkFilesWithoutLocal = KubernetesFileUtils.getNonSubmitterLocalFiles(sparkFiles)
val smallFilesSecretName = s"${kubernetesAppId}-submitted-files"
val smallFilesSecretName = s"$kubernetesAppId-submitted-files"
val mountSmallFilesBootstrap = new MountSmallFilesBootstrapImpl(
smallFilesSecretName, MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH)
val mountSmallLocalFilesStep = new MountSmallLocalFilesStep(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,9 @@ private[spark] class BaseDriverConfigurationStep(
.build()
}
val driverCustomAnnotations = ConfigurationUtils
.combinePrefixedKeyValuePairsWithDeprecatedConf(
.parsePrefixedKeyValuePairs(
submissionSparkConf,
KUBERNETES_DRIVER_ANNOTATION_PREFIX,
KUBERNETES_DRIVER_ANNOTATIONS,
"annotation")
require(!driverCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION),
s"Annotation with key $SPARK_APP_NAME_ANNOTATION is not allowed as it is reserved for" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.scheduler.cluster.k8s
import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder}
import org.apache.commons.io.FilenameUtils

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.k8s.{ConfigurationUtils, InitContainerResourceStagingServerSecretPlugin, PodWithDetachedInitContainer, SparkPodInitContainerBootstrap}
Expand Down Expand Up @@ -56,10 +55,9 @@ private[spark] class ExecutorPodFactoryImpl(
org.apache.spark.internal.config.EXECUTOR_CLASS_PATH)
private val executorJarsDownloadDir = sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION)

private val executorLabels = ConfigurationUtils.combinePrefixedKeyValuePairsWithDeprecatedConf(
private val executorLabels = ConfigurationUtils.parsePrefixedKeyValuePairs(
sparkConf,
KUBERNETES_EXECUTOR_LABEL_PREFIX,
KUBERNETES_EXECUTOR_LABELS,
"executor label")
require(
!executorLabels.contains(SPARK_APP_ID_LABEL),
Expand All @@ -70,10 +68,9 @@ private[spark] class ExecutorPodFactoryImpl(
s" Spark.")

private val executorAnnotations =
ConfigurationUtils.combinePrefixedKeyValuePairsWithDeprecatedConf(
ConfigurationUtils.parsePrefixedKeyValuePairs(
sparkConf,
KUBERNETES_EXECUTOR_ANNOTATION_PREFIX,
KUBERNETES_EXECUTOR_ANNOTATIONS,
"executor annotation")
private val nodeSelector =
ConfigurationUtils.parsePrefixedKeyValuePairs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ private[spark] class BaseDriverConfigurationStepSuite extends SparkFunSuite {
private val APP_ARGS = Array("arg1", "arg2")
private val CUSTOM_ANNOTATION_KEY = "customAnnotation"
private val CUSTOM_ANNOTATION_VALUE = "customAnnotationValue"
private val DEPRECATED_CUSTOM_ANNOTATION_KEY = "customAnnotationDeprecated"
private val DEPRECATED_CUSTOM_ANNOTATION_VALUE = "customAnnotationDeprecatedValue"
private val DRIVER_CUSTOM_ENV_KEY1 = "customDriverEnv1"
private val DRIVER_CUSTOM_ENV_KEY2 = "customDriverEnv2"

Expand All @@ -49,8 +47,6 @@ private[spark] class BaseDriverConfigurationStepSuite extends SparkFunSuite {
.set(KUBERNETES_DRIVER_MEMORY_OVERHEAD, 200L)
.set(DRIVER_DOCKER_IMAGE, "spark-driver:latest")
.set(s"spark.kubernetes.driver.annotation.$CUSTOM_ANNOTATION_KEY", CUSTOM_ANNOTATION_VALUE)
.set("spark.kubernetes.driver.annotations",
s"$DEPRECATED_CUSTOM_ANNOTATION_KEY=$DEPRECATED_CUSTOM_ANNOTATION_VALUE")
.set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY1", "customDriverEnv1")
.set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY2", "customDriverEnv2")

Expand Down Expand Up @@ -98,7 +94,6 @@ private[spark] class BaseDriverConfigurationStepSuite extends SparkFunSuite {
assert(driverPodMetadata.getLabels.asScala === DRIVER_LABELS)
val expectedAnnotations = Map(
CUSTOM_ANNOTATION_KEY -> CUSTOM_ANNOTATION_VALUE,
DEPRECATED_CUSTOM_ANNOTATION_KEY -> DEPRECATED_CUSTOM_ANNOTATION_VALUE,
SPARK_APP_NAME_ANNOTATION -> APP_NAME)
assert(driverPodMetadata.getAnnotations.asScala === expectedAnnotations)
assert(preparedDriverSpec.driverPod.getSpec.getRestartPolicy === "Never")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ class InitContainerConfigurationStepsOrchestratorSuite extends SparkFunSuite {

test ("error thrown if local jars provided without resource staging server") {
val sparkConf = new SparkConf(true)
.set(KUBERNETES_DRIVER_LABELS, s"$DEPRECATED_CUSTOM_LABEL_KEY=$DEPRECATED_CUSTOM_LABEL_VALUE")
.set(s"$KUBERNETES_DRIVER_LABEL_PREFIX$CUSTOM_LABEL_KEY", CUSTOM_LABEL_VALUE)

assert(sparkConf.get(RESOURCE_STAGING_SERVER_URI).isEmpty)
Expand All @@ -72,7 +71,6 @@ class InitContainerConfigurationStepsOrchestratorSuite extends SparkFunSuite {

test ("error not thrown with non-local jars and resource staging server provided") {
val sparkConf = new SparkConf(true)
.set(KUBERNETES_DRIVER_LABELS, s"$DEPRECATED_CUSTOM_LABEL_KEY=$DEPRECATED_CUSTOM_LABEL_VALUE")
.set(s"$KUBERNETES_DRIVER_LABEL_PREFIX$CUSTOM_LABEL_KEY", CUSTOM_LABEL_VALUE)
.set(RESOURCE_STAGING_SERVER_URI, STAGING_SERVER_URI)

Expand All @@ -97,7 +95,6 @@ class InitContainerConfigurationStepsOrchestratorSuite extends SparkFunSuite {

test ("error not thrown with non-local jars and no resource staging server provided") {
val sparkConf = new SparkConf(true)
.set(KUBERNETES_DRIVER_LABELS, s"$DEPRECATED_CUSTOM_LABEL_KEY=$DEPRECATED_CUSTOM_LABEL_VALUE")
.set(s"$KUBERNETES_DRIVER_LABEL_PREFIX$CUSTOM_LABEL_KEY", CUSTOM_LABEL_VALUE)

val orchestrator = new InitContainerConfigurationStepsOrchestrator(
Expand All @@ -120,7 +117,6 @@ class InitContainerConfigurationStepsOrchestratorSuite extends SparkFunSuite {

test ("including step to contact resource staging server") {
val sparkConf = new SparkConf(true)
.set(KUBERNETES_DRIVER_LABELS, s"$DEPRECATED_CUSTOM_LABEL_KEY=$DEPRECATED_CUSTOM_LABEL_VALUE")
.set(s"$KUBERNETES_DRIVER_LABEL_PREFIX$CUSTOM_LABEL_KEY", CUSTOM_LABEL_VALUE)
.set(RESOURCE_STAGING_SERVER_URI, STAGING_SERVER_URI)

Expand All @@ -145,7 +141,6 @@ class InitContainerConfigurationStepsOrchestratorSuite extends SparkFunSuite {

test ("not including steps because no contact to resource staging server") {
val sparkConf = new SparkConf(true)
.set(KUBERNETES_DRIVER_LABELS, s"$DEPRECATED_CUSTOM_LABEL_KEY=$DEPRECATED_CUSTOM_LABEL_VALUE")
.set(s"$KUBERNETES_DRIVER_LABEL_PREFIX$CUSTOM_LABEL_KEY", CUSTOM_LABEL_VALUE)

val orchestrator = new InitContainerConfigurationStepsOrchestrator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
sparkConf = kubernetesTestComponents.newSparkConf()
.set(INIT_CONTAINER_DOCKER_IMAGE, s"spark-init:latest")
.set(DRIVER_DOCKER_IMAGE, s"spark-driver:latest")
.set(KUBERNETES_DRIVER_LABELS, s"spark-app-locator=$APP_LOCATOR_LABEL")
.set(s"${KUBERNETES_DRIVER_LABEL_PREFIX}spark-app-locator", APP_LOCATOR_LABEL)
kubernetesTestComponents.createNamespace()
}

Expand Down