From 48f3ddb9b6ab87911b6777968020cb571d2e2ec4 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 14 Jun 2017 11:56:00 -0700 Subject: [PATCH 1/7] New API for custom labels and annotations. This APi allows for these labels and annotations to have = and , characters, which is hard to accomplish in the old scheme. --- docs/running-on-kubernetes.md | 44 +++++++++++++++++++ .../spark/deploy/kubernetes/config.scala | 5 +++ .../deploy/kubernetes/submit/Client.scala | 39 +++++++++++----- .../KubernetesClusterSchedulerBackend.scala | 18 ++++++-- .../kubernetes/submit/ClientV2Suite.scala | 12 ++++- 5 files changed, 102 insertions(+), 16 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index c10630fc5c5c6..4c55909ad563c 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -524,10 +524,51 @@ from the other deployment modes. See the [configuration page](configuration.html (typically 6-10%). + + spark.kubernetes.driver.label.[labelKey] + (none) + + Adds a label to the driver pod, with key labelKey and the value as the configuration's value. For + example, setting spark.kubernetes.driver.label.identifier to myIdentifier will result in + the driver pod having a label with key identifier and value myIdentifier. Multiple labels + can be added by setting multiple configurations with this prefix. + + + + spark.kubernetes.driver.annotation.[annotationKey] + (none) + + Adds an annotation to the driver pod, with key annotationKey and the value as the configuration's + value. For example, setting spark.kubernetes.driver.annotation.identifier to myIdentifier + will result in the driver pod having an annotation with key identifier and value + myIdentifier. Multiple annotations can be added by setting multiple configurations with this prefix. + + + + spark.kubernetes.executor.label.[labelKey] + (none) + + Adds a label to all executor pods, with key labelKey and the value as the configuration's value. For + example, setting spark.kubernetes.executor.label.identifier to myIdentifier will result in + the executor pods having a label with key identifier and value myIdentifier. Multiple + labels can be added by setting multiple configurations with this prefix. + + + + spark.kubernetes.executor.annotation.[annotationKey] + (none) + + Adds an annotation to the executor pods, with key annotationKey and the value as the configuration's + value. For example, setting spark.kubernetes.executor.annotation.identifier to myIdentifier + will result in the executor pods having an annotation with key identifier and value + myIdentifier. Multiple annotations can be added by setting multiple configurations with this prefix. + + spark.kubernetes.driver.labels (none) + Deprecated. Use spark.kubernetes.driver.label. instead. Custom labels that will be added to the driver pod. This should be a comma-separated list of label key-value pairs, where each label is in the format key=value. Note that Spark also adds its own labels to the driver pod for bookkeeping purposes. @@ -537,6 +578,7 @@ from the other deployment modes. See the [configuration page](configuration.html spark.kubernetes.driver.annotations (none) + Deprecated. Use spark.kubernetes.driver.annotation. instead. Custom annotations that will be added to the driver pod. This should be a comma-separated list of label key-value pairs, where each annotation is in the format key=value. @@ -545,6 +587,7 @@ from the other deployment modes. See the [configuration page](configuration.html spark.kubernetes.executor.labels (none) + Deprecated. Use spark.kubernetes.executor.label. instead. Custom labels that will be added to the executor pods. This should be a comma-separated list of label key-value pairs, where each label is in the format key=value. Note that Spark also adds its own labels to the executor pods for bookkeeping purposes. @@ -554,6 +597,7 @@ from the other deployment modes. See the [configuration page](configuration.html spark.kubernetes.executor.annotations (none) + Deprecated. Use spark.kubernetes.executor.annotation. instead. Custom annotations that will be added to the executor pods. This should be a comma-separated list of annotation key-value pairs, where each annotation is in the format key=value. diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index d1fd88fc880d1..70ea19e44ef8c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -113,6 +113,11 @@ package object config extends Logging { .bytesConf(ByteUnit.MiB) .createOptional + private[spark] val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label." + private[spark] val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation." + private[spark] val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label." + private[spark] val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation." + private[spark] val KUBERNETES_DRIVER_LABELS = ConfigBuilder("spark.kubernetes.driver.labels") .doc("Custom labels that will be added to the driver pod. This should be a comma-separated" + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala index c2e616eadc1e0..fba8b64e3b570 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala @@ -82,17 +82,36 @@ private[spark] class Client( def run(): Unit = { validateNoDuplicateFileNames(sparkJars) validateNoDuplicateFileNames(sparkFiles) - val parsedCustomLabels = ConfigurationUtils.parseKeyValuePairs( + + val parsedCustomLabelsDeprecated = ConfigurationUtils.parseKeyValuePairs( customLabels, KUBERNETES_DRIVER_LABELS.key, "labels") - require(!parsedCustomLabels.contains(SPARK_APP_ID_LABEL), s"Label with key " + - s" $SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping" + - s" operations.") - val parsedCustomAnnotations = ConfigurationUtils.parseKeyValuePairs( + // Remark: getAllWithPrefix strips out the prefix in the returned array. + val customLabelsFromPrefixConf = sparkConf.getAllWithPrefix(KUBERNETES_DRIVER_LABEL_PREFIX) + val allCustomLabels = parsedCustomLabelsDeprecated.toSeq ++ customLabelsFromPrefixConf + allCustomLabels.groupBy(_._1).foreach { + case (key, values) => + require(values.size == 1, + s"Cannot have multiple values for a label key, got key $key with values $values") + } + require(!allCustomLabels.contains(SPARK_APP_ID_LABEL), s"Label with key " + + s" $SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping" + + s" operations.") + + val parsedCustomAnnotationsDeprecated = ConfigurationUtils.parseKeyValuePairs( customAnnotations, KUBERNETES_DRIVER_ANNOTATIONS.key, "annotations") - require(!parsedCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION), s"Annotation with key" + - s" $SPARK_APP_NAME_ANNOTATION is not allowed as it is reserved for Spark bookkeeping" + - s" operations.") - val allLabels = parsedCustomLabels ++ Map( + val customAnnotationsFromPrefixConf = + sparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ANNOTATION_PREFIX) + val allCustomAnnotations = parsedCustomAnnotationsDeprecated ++ customAnnotationsFromPrefixConf + allCustomAnnotations.groupBy(_._1).foreach { + case (key, values) => + require(values.size == 1, + s"Cannot have multiple values for the same annotation key, got key $key" + + s"with values $values") + } + require(!allCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION), + s"Annotation with key $SPARK_APP_NAME_ANNOTATION is not allowed as it is reserved for" + + s" Spark bookkeeping operations.") + val allLabels = allCustomLabels.toMap ++ Map( SPARK_APP_ID_LABEL -> kubernetesAppId, SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) @@ -139,7 +158,7 @@ private[spark] class Client( .withNewMetadata() .withName(kubernetesDriverPodName) .addToLabels(allLabels.asJava) - .addToAnnotations(parsedCustomAnnotations.asJava) + .addToAnnotations(allCustomAnnotations.asJava) .addToAnnotations(SPARK_APP_NAME_ANNOTATION, appName) .endMetadata() .withNewSpec() diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 6ab6480d848a2..9a0a2208a88a5 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -62,7 +62,7 @@ private[spark] class KubernetesClusterSchedulerBackend( private val executorLabels = ConfigurationUtils.parseKeyValuePairs( conf.get(KUBERNETES_EXECUTOR_LABELS), KUBERNETES_EXECUTOR_LABELS.key, - "executor labels") + "executor labels").toSeq ++ conf.getAllWithPrefix(KUBERNETES_EXECUTOR_LABEL_PREFIX) require( !executorLabels.contains(SPARK_APP_ID_LABEL), s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is" + @@ -71,10 +71,20 @@ private[spark] class KubernetesClusterSchedulerBackend( !executorLabels.contains(SPARK_EXECUTOR_ID_LABEL), s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" + s" Spark.") + executorLabels.groupBy(_._1).foreach { + case (key, values) => + require(values.size == 1, + s"Cannot have multiple values for a label key, got key $key with values $values") + } private val executorAnnotations = ConfigurationUtils.parseKeyValuePairs( conf.get(KUBERNETES_EXECUTOR_ANNOTATIONS), KUBERNETES_EXECUTOR_ANNOTATIONS.key, - "executor annotations") + "executor annotations").toSeq ++ conf.getAllWithPrefix(KUBERNETES_EXECUTOR_ANNOTATION_PREFIX) + executorAnnotations.groupBy(_._1).foreach { + case (key, values) => + require(values.size == 1, + s"Cannot have multiple values for an annotation key, got key $key with values $values") + } private var shufflePodCache: Option[ShufflePodCache] = None private val executorDockerImage = conf.get(EXECUTOR_DOCKER_IMAGE) @@ -344,8 +354,8 @@ private[spark] class KubernetesClusterSchedulerBackend( val basePodBuilder = new PodBuilder() .withNewMetadata() .withName(name) - .withLabels(resolvedExecutorLabels.asJava) - .withAnnotations(executorAnnotations.asJava) + .withLabels(resolvedExecutorLabels.toMap.asJava) + .withAnnotations(executorAnnotations.toMap.asJava) .withOwnerReferences() .addNewOwnerReference() .withController(true) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala index 193f36a7423b2..4b1e38d3f94c6 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala @@ -49,12 +49,17 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { private val APP_ID = "spark-id" private val CUSTOM_LABEL_KEY = "customLabel" private val CUSTOM_LABEL_VALUE = "customLabelValue" + private val DEPRECATED_CUSTOM_LABEL_KEY = "deprecatedCustomLabel" + private val DEPRECATED_CUSTOM_LABEL_VALUE = "deprecatedCustomLabelValue" private val ALL_EXPECTED_LABELS = Map( CUSTOM_LABEL_KEY -> CUSTOM_LABEL_VALUE, + DEPRECATED_CUSTOM_LABEL_KEY -> DEPRECATED_CUSTOM_LABEL_VALUE, SPARK_APP_ID_LABEL -> APP_ID, SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) private val CUSTOM_ANNOTATION_KEY = "customAnnotation" private val CUSTOM_ANNOTATION_VALUE = "customAnnotationValue" + private val DEPRECATED_CUSTOM_ANNOTATION_KEY = "deprecatedCustomAnnotation" + private val DEPRECATED_CUSTOM_ANNOTATION_VALUE = "deprecatedCustomAnnotationValue" private val INIT_CONTAINER_SECRET_NAME = "init-container-secret" private val INIT_CONTAINER_SECRET_DATA = Map("secret-key" -> "secret-data") private val MAIN_CLASS = "org.apache.spark.examples.SparkPi" @@ -94,8 +99,10 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { .set(DRIVER_DOCKER_IMAGE, CUSTOM_DRIVER_IMAGE) .set(org.apache.spark.internal.config.DRIVER_MEMORY, DRIVER_MEMORY_MB.toLong) .set(KUBERNETES_DRIVER_MEMORY_OVERHEAD, DRIVER_MEMORY_OVERHEAD_MB.toLong) - .set(KUBERNETES_DRIVER_LABELS, s"$CUSTOM_LABEL_KEY=$CUSTOM_LABEL_VALUE") - .set(KUBERNETES_DRIVER_ANNOTATIONS, s"$CUSTOM_ANNOTATION_KEY=$CUSTOM_ANNOTATION_VALUE") + .set(KUBERNETES_DRIVER_LABELS, s"$DEPRECATED_CUSTOM_LABEL_KEY=$DEPRECATED_CUSTOM_LABEL_VALUE") + .set(KUBERNETES_DRIVER_ANNOTATIONS, s"$DEPRECATED_CUSTOM_ANNOTATION_KEY=$DEPRECATED_CUSTOM_ANNOTATION_VALUE") + .set(s"$KUBERNETES_DRIVER_LABEL_PREFIX$CUSTOM_LABEL_KEY", CUSTOM_LABEL_VALUE) + .set(s"$KUBERNETES_DRIVER_ANNOTATION_PREFIX$CUSTOM_ANNOTATION_KEY", CUSTOM_ANNOTATION_VALUE) .set(org.apache.spark.internal.config.DRIVER_CLASS_PATH, DRIVER_EXTRA_CLASSPATH) .set(org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS, DRIVER_JAVA_OPTIONS) private val EXECUTOR_INIT_CONF_KEY = "executor-init-conf" @@ -444,6 +451,7 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { private def podHasCorrectAnnotations(pod: Pod): Boolean = { val expectedAnnotations = Map( + DEPRECATED_CUSTOM_ANNOTATION_KEY -> DEPRECATED_CUSTOM_ANNOTATION_VALUE, CUSTOM_ANNOTATION_KEY -> CUSTOM_ANNOTATION_VALUE, SPARK_APP_NAME_ANNOTATION -> APP_NAME, BOOTSTRAPPED_POD_ANNOTATION -> TRUE) From e6e91968e7bfd0586b5a8c3ac9e5f54ef5f9bf94 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 14 Jun 2017 12:02:35 -0700 Subject: [PATCH 2/7] Compare correct values in requirements --- .../apache/spark/deploy/kubernetes/submit/Client.scala | 8 ++++---- .../kubernetes/KubernetesClusterSchedulerBackend.scala | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala index fba8b64e3b570..ef01b70844e4a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala @@ -93,12 +93,12 @@ private[spark] class Client( require(values.size == 1, s"Cannot have multiple values for a label key, got key $key with values $values") } - require(!allCustomLabels.contains(SPARK_APP_ID_LABEL), s"Label with key " + + require(!allCustomLabels.map(_._1).contains(SPARK_APP_ID_LABEL), s"Label with key " + s" $SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping" + s" operations.") val parsedCustomAnnotationsDeprecated = ConfigurationUtils.parseKeyValuePairs( - customAnnotations, KUBERNETES_DRIVER_ANNOTATIONS.key, "annotations") + customAnnotations, KUBERNETES_DRIVER_ANNOTATIONS.key, "annotations").toSeq val customAnnotationsFromPrefixConf = sparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ANNOTATION_PREFIX) val allCustomAnnotations = parsedCustomAnnotationsDeprecated ++ customAnnotationsFromPrefixConf @@ -108,7 +108,7 @@ private[spark] class Client( s"Cannot have multiple values for the same annotation key, got key $key" + s"with values $values") } - require(!allCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION), + require(!allCustomAnnotations.map(_._1).contains(SPARK_APP_NAME_ANNOTATION), s"Annotation with key $SPARK_APP_NAME_ANNOTATION is not allowed as it is reserved for" + s" Spark bookkeeping operations.") val allLabels = allCustomLabels.toMap ++ Map( @@ -158,7 +158,7 @@ private[spark] class Client( .withNewMetadata() .withName(kubernetesDriverPodName) .addToLabels(allLabels.asJava) - .addToAnnotations(allCustomAnnotations.asJava) + .addToAnnotations(allCustomAnnotations.toMap.asJava) .addToAnnotations(SPARK_APP_NAME_ANNOTATION, appName) .endMetadata() .withNewSpec() diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 9a0a2208a88a5..f1999f4badb87 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -64,11 +64,11 @@ private[spark] class KubernetesClusterSchedulerBackend( KUBERNETES_EXECUTOR_LABELS.key, "executor labels").toSeq ++ conf.getAllWithPrefix(KUBERNETES_EXECUTOR_LABEL_PREFIX) require( - !executorLabels.contains(SPARK_APP_ID_LABEL), + !executorLabels.map(_._1).contains(SPARK_APP_ID_LABEL), s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is" + s" reserved for Spark.") require( - !executorLabels.contains(SPARK_EXECUTOR_ID_LABEL), + !executorLabels.map(_._1).contains(SPARK_EXECUTOR_ID_LABEL), s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" + s" Spark.") executorLabels.groupBy(_._1).foreach { From 124c862cc24b8e146f920af2f8d99f8db59d33d1 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 14 Jun 2017 12:13:26 -0700 Subject: [PATCH 3/7] Use helper method --- .../kubernetes/ConfigurationUtils.scala | 23 ++++++++++- .../deploy/kubernetes/submit/Client.scala | 38 +++++++------------ .../KubernetesClusterSchedulerBackend.scala | 37 ++++++++---------- 3 files changed, 51 insertions(+), 47 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/ConfigurationUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/ConfigurationUtils.scala index f3bd598556019..fb57778afcd2d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/ConfigurationUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/ConfigurationUtils.scala @@ -17,7 +17,8 @@ package org.apache.spark.deploy.kubernetes -import org.apache.spark.SparkException +import org.apache.spark.internal.config.{ConfigEntry, OptionalConfigEntry} +import org.apache.spark.{SparkConf, SparkException} object ConfigurationUtils { def parseKeyValuePairs( @@ -38,4 +39,24 @@ object ConfigurationUtils { }).toMap }).getOrElse(Map.empty[String, String]) } + + def combinePrefixedKeyValuePairsWithDeprecatedConf( + sparkConf: SparkConf, + prefix: String, + deprecatedConf: OptionalConfigEntry[String], + configType: String): Map[String, String] = { + val fromDeprecated = parseKeyValuePairs( + sparkConf.get(deprecatedConf), + deprecatedConf.key, + configType) + val fromPrefix = sparkConf.getAllWithPrefix(prefix) + val combined = fromDeprecated.toSeq ++ fromPrefix + combined.groupBy(_._1).foreach { + case (key, values) => + require(values.size == 1, + s"Cannot have multiple values for a given $configType key, got key $key with" + + s" values $values") + } + combined.toMap + } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala index ef01b70844e4a..2381789f8f1e5 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala @@ -29,6 +29,7 @@ import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ import org.apache.spark.deploy.rest.kubernetes.ResourceStagingServerSslOptionsProviderImpl import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.ConfigEntry import org.apache.spark.launcher.SparkLauncher import org.apache.spark.util.Utils @@ -83,35 +84,24 @@ private[spark] class Client( validateNoDuplicateFileNames(sparkJars) validateNoDuplicateFileNames(sparkFiles) - val parsedCustomLabelsDeprecated = ConfigurationUtils.parseKeyValuePairs( - customLabels, KUBERNETES_DRIVER_LABELS.key, "labels") - // Remark: getAllWithPrefix strips out the prefix in the returned array. - val customLabelsFromPrefixConf = sparkConf.getAllWithPrefix(KUBERNETES_DRIVER_LABEL_PREFIX) - val allCustomLabels = parsedCustomLabelsDeprecated.toSeq ++ customLabelsFromPrefixConf - allCustomLabels.groupBy(_._1).foreach { - case (key, values) => - require(values.size == 1, - s"Cannot have multiple values for a label key, got key $key with values $values") - } - require(!allCustomLabels.map(_._1).contains(SPARK_APP_ID_LABEL), s"Label with key " + + val allCustomLabels = ConfigurationUtils.combinePrefixedKeyValuePairsWithDeprecatedConf( + sparkConf, + KUBERNETES_DRIVER_LABEL_PREFIX, + KUBERNETES_DRIVER_LABELS, + "label") + require(!allCustomLabels.contains(SPARK_APP_ID_LABEL), s"Label with key " + s" $SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping" + s" operations.") - val parsedCustomAnnotationsDeprecated = ConfigurationUtils.parseKeyValuePairs( - customAnnotations, KUBERNETES_DRIVER_ANNOTATIONS.key, "annotations").toSeq - val customAnnotationsFromPrefixConf = - sparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ANNOTATION_PREFIX) - val allCustomAnnotations = parsedCustomAnnotationsDeprecated ++ customAnnotationsFromPrefixConf - allCustomAnnotations.groupBy(_._1).foreach { - case (key, values) => - require(values.size == 1, - s"Cannot have multiple values for the same annotation key, got key $key" + - s"with values $values") - } - require(!allCustomAnnotations.map(_._1).contains(SPARK_APP_NAME_ANNOTATION), + val allCustomAnnotations = ConfigurationUtils.combinePrefixedKeyValuePairsWithDeprecatedConf( + sparkConf, + KUBERNETES_DRIVER_ANNOTATION_PREFIX, + KUBERNETES_DRIVER_ANNOTATIONS, + "annotation") + require(!allCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION), s"Annotation with key $SPARK_APP_NAME_ANNOTATION is not allowed as it is reserved for" + s" Spark bookkeeping operations.") - val allLabels = allCustomLabels.toMap ++ Map( + val allLabels = allCustomLabels ++ Map( SPARK_APP_ID_LABEL -> kubernetesAppId, SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index f1999f4badb87..a0ca380aafb99 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -59,33 +59,26 @@ private[spark] class KubernetesClusterSchedulerBackend( org.apache.spark.internal.config.EXECUTOR_CLASS_PATH) private val executorJarsDownloadDir = conf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION) - private val executorLabels = ConfigurationUtils.parseKeyValuePairs( - conf.get(KUBERNETES_EXECUTOR_LABELS), - KUBERNETES_EXECUTOR_LABELS.key, - "executor labels").toSeq ++ conf.getAllWithPrefix(KUBERNETES_EXECUTOR_LABEL_PREFIX) + private val executorLabels = ConfigurationUtils.combinePrefixedKeyValuePairsWithDeprecatedConf( + conf, + KUBERNETES_EXECUTOR_LABEL_PREFIX, + KUBERNETES_EXECUTOR_LABELS, + "executor label") require( - !executorLabels.map(_._1).contains(SPARK_APP_ID_LABEL), + !executorLabels.contains(SPARK_APP_ID_LABEL), s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is" + s" reserved for Spark.") require( - !executorLabels.map(_._1).contains(SPARK_EXECUTOR_ID_LABEL), + !executorLabels.contains(SPARK_EXECUTOR_ID_LABEL), s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" + s" Spark.") - executorLabels.groupBy(_._1).foreach { - case (key, values) => - require(values.size == 1, - s"Cannot have multiple values for a label key, got key $key with values $values") - } - private val executorAnnotations = ConfigurationUtils.parseKeyValuePairs( - conf.get(KUBERNETES_EXECUTOR_ANNOTATIONS), - KUBERNETES_EXECUTOR_ANNOTATIONS.key, - "executor annotations").toSeq ++ conf.getAllWithPrefix(KUBERNETES_EXECUTOR_ANNOTATION_PREFIX) - executorAnnotations.groupBy(_._1).foreach { - case (key, values) => - require(values.size == 1, - s"Cannot have multiple values for an annotation key, got key $key with values $values") - } + private val executorAnnotations = + ConfigurationUtils.combinePrefixedKeyValuePairsWithDeprecatedConf( + conf, + KUBERNETES_EXECUTOR_ANNOTATION_PREFIX, + KUBERNETES_EXECUTOR_ANNOTATIONS, + "executor annotation") private var shufflePodCache: Option[ShufflePodCache] = None private val executorDockerImage = conf.get(EXECUTOR_DOCKER_IMAGE) private val dockerImagePullPolicy = conf.get(DOCKER_IMAGE_PULL_POLICY) @@ -354,8 +347,8 @@ private[spark] class KubernetesClusterSchedulerBackend( val basePodBuilder = new PodBuilder() .withNewMetadata() .withName(name) - .withLabels(resolvedExecutorLabels.toMap.asJava) - .withAnnotations(executorAnnotations.toMap.asJava) + .withLabels(resolvedExecutorLabels.asJava) + .withAnnotations(executorAnnotations.asJava) .withOwnerReferences() .addNewOwnerReference() .withController(true) From 2abbe8554fb765241dd7512d31f62b604100c339 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 14 Jun 2017 12:20:36 -0700 Subject: [PATCH 4/7] Address comments. --- docs/running-on-kubernetes.md | 12 ++++++++---- .../deploy/kubernetes/ConfigurationUtils.scala | 10 ++++++++-- .../spark/deploy/kubernetes/submit/Client.scala | 16 ++++++++-------- 3 files changed, 24 insertions(+), 14 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 4c55909ad563c..52d847b4420cf 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -568,7 +568,8 @@ from the other deployment modes. See the [configuration page](configuration.html spark.kubernetes.driver.labels (none) - Deprecated. Use spark.kubernetes.driver.label. instead. + Deprecated. Use spark.kubernetes.driver.label. instead which supports = + and , characters in label values. Custom labels that will be added to the driver pod. This should be a comma-separated list of label key-value pairs, where each label is in the format key=value. Note that Spark also adds its own labels to the driver pod for bookkeeping purposes. @@ -578,7 +579,8 @@ from the other deployment modes. See the [configuration page](configuration.html spark.kubernetes.driver.annotations (none) - Deprecated. Use spark.kubernetes.driver.annotation. instead. + Deprecated. Use spark.kubernetes.driver.annotation. instead which supports + = and , characters in annotation values. Custom annotations that will be added to the driver pod. This should be a comma-separated list of label key-value pairs, where each annotation is in the format key=value. @@ -587,7 +589,8 @@ from the other deployment modes. See the [configuration page](configuration.html spark.kubernetes.executor.labels (none) - Deprecated. Use spark.kubernetes.executor.label. instead. + Deprecated. Use spark.kubernetes.executor.label. instead which supports + = and , characters in label values. Custom labels that will be added to the executor pods. This should be a comma-separated list of label key-value pairs, where each label is in the format key=value. Note that Spark also adds its own labels to the executor pods for bookkeeping purposes. @@ -597,7 +600,8 @@ from the other deployment modes. See the [configuration page](configuration.html spark.kubernetes.executor.annotations (none) - Deprecated. Use spark.kubernetes.executor.annotation. instead. + Deprecated. Use spark.kubernetes.executor.annotation. instead which supports + = and , characters in annotation values. Custom annotations that will be added to the executor pods. This should be a comma-separated list of annotation key-value pairs, where each annotation is in the format key=value. diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/ConfigurationUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/ConfigurationUtils.scala index fb57778afcd2d..34c129c7fa9d4 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/ConfigurationUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/ConfigurationUtils.scala @@ -17,10 +17,11 @@ package org.apache.spark.deploy.kubernetes -import org.apache.spark.internal.config.{ConfigEntry, OptionalConfigEntry} import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{ConfigEntry, OptionalConfigEntry} -object ConfigurationUtils { +object ConfigurationUtils extends Logging { def parseKeyValuePairs( maybeKeyValues: Option[String], configKey: String, @@ -45,6 +46,11 @@ object ConfigurationUtils { prefix: String, deprecatedConf: OptionalConfigEntry[String], configType: String): Map[String, String] = { + val deprecatedRawString = sparkConf.get(deprecatedConf) + deprecatedRawString.foreach { _ => + logWarning(s"Configuration with key ${deprecatedConf.key} is deprecated. Use" + + s" configurations with prefix $prefix instead.") + } val fromDeprecated = parseKeyValuePairs( sparkConf.get(deprecatedConf), deprecatedConf.key, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala index 2381789f8f1e5..a9699d8c34b4e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala @@ -84,24 +84,24 @@ private[spark] class Client( validateNoDuplicateFileNames(sparkJars) validateNoDuplicateFileNames(sparkFiles) - val allCustomLabels = ConfigurationUtils.combinePrefixedKeyValuePairsWithDeprecatedConf( + val driverCustomLabels = ConfigurationUtils.combinePrefixedKeyValuePairsWithDeprecatedConf( sparkConf, KUBERNETES_DRIVER_LABEL_PREFIX, KUBERNETES_DRIVER_LABELS, "label") - require(!allCustomLabels.contains(SPARK_APP_ID_LABEL), s"Label with key " + + require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), s"Label with key " + s" $SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping" + s" operations.") - val allCustomAnnotations = ConfigurationUtils.combinePrefixedKeyValuePairsWithDeprecatedConf( + val driverCustomAnnotations = ConfigurationUtils.combinePrefixedKeyValuePairsWithDeprecatedConf( sparkConf, KUBERNETES_DRIVER_ANNOTATION_PREFIX, KUBERNETES_DRIVER_ANNOTATIONS, "annotation") - require(!allCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION), + require(!driverCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION), s"Annotation with key $SPARK_APP_NAME_ANNOTATION is not allowed as it is reserved for" + s" Spark bookkeeping operations.") - val allLabels = allCustomLabels ++ Map( + val allDriverLabels = driverCustomLabels ++ Map( SPARK_APP_ID_LABEL -> kubernetesAppId, SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) @@ -147,8 +147,8 @@ private[spark] class Client( val basePod = new PodBuilder() .withNewMetadata() .withName(kubernetesDriverPodName) - .addToLabels(allLabels.asJava) - .addToAnnotations(allCustomAnnotations.toMap.asJava) + .addToLabels(allDriverLabels.asJava) + .addToAnnotations(driverCustomAnnotations.toMap.asJava) .addToAnnotations(SPARK_APP_NAME_ANNOTATION, appName) .endMetadata() .withNewSpec() @@ -157,7 +157,7 @@ private[spark] class Client( .endSpec() val maybeSubmittedDependencyUploader = initContainerComponentsProvider - .provideInitContainerSubmittedDependencyUploader(allLabels) + .provideInitContainerSubmittedDependencyUploader(allDriverLabels) val maybeSubmittedResourceIdentifiers = maybeSubmittedDependencyUploader.map { uploader => SubmittedResources(uploader.uploadJars(), uploader.uploadFiles()) } From eb8c9f15818287bdb924d2f1fb8f1e5a3b74e8fc Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 14 Jun 2017 12:22:54 -0700 Subject: [PATCH 5/7] Fix scalastyle --- .../apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala index 4b1e38d3f94c6..3945bef5bcfb8 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala @@ -100,7 +100,8 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { .set(org.apache.spark.internal.config.DRIVER_MEMORY, DRIVER_MEMORY_MB.toLong) .set(KUBERNETES_DRIVER_MEMORY_OVERHEAD, DRIVER_MEMORY_OVERHEAD_MB.toLong) .set(KUBERNETES_DRIVER_LABELS, s"$DEPRECATED_CUSTOM_LABEL_KEY=$DEPRECATED_CUSTOM_LABEL_VALUE") - .set(KUBERNETES_DRIVER_ANNOTATIONS, s"$DEPRECATED_CUSTOM_ANNOTATION_KEY=$DEPRECATED_CUSTOM_ANNOTATION_VALUE") + .set(KUBERNETES_DRIVER_ANNOTATIONS, + s"$DEPRECATED_CUSTOM_ANNOTATION_KEY=$DEPRECATED_CUSTOM_ANNOTATION_VALUE") .set(s"$KUBERNETES_DRIVER_LABEL_PREFIX$CUSTOM_LABEL_KEY", CUSTOM_LABEL_VALUE) .set(s"$KUBERNETES_DRIVER_ANNOTATION_PREFIX$CUSTOM_ANNOTATION_KEY", CUSTOM_ANNOTATION_VALUE) .set(org.apache.spark.internal.config.DRIVER_CLASS_PATH, DRIVER_EXTRA_CLASSPATH) From 29e02aa5e20a3d300469d7e34fd468069652e64a Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 14 Jun 2017 12:25:52 -0700 Subject: [PATCH 6/7] Use variable --- .../apache/spark/deploy/kubernetes/ConfigurationUtils.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/ConfigurationUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/ConfigurationUtils.scala index 34c129c7fa9d4..d268a50caf6fa 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/ConfigurationUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/ConfigurationUtils.scala @@ -46,13 +46,13 @@ object ConfigurationUtils extends Logging { prefix: String, deprecatedConf: OptionalConfigEntry[String], configType: String): Map[String, String] = { - val deprecatedRawString = sparkConf.get(deprecatedConf) - deprecatedRawString.foreach { _ => + val deprecatedKeyValuePairsString = sparkConf.get(deprecatedConf) + deprecatedKeyValuePairsString.foreach { _ => logWarning(s"Configuration with key ${deprecatedConf.key} is deprecated. Use" + s" configurations with prefix $prefix instead.") } val fromDeprecated = parseKeyValuePairs( - sparkConf.get(deprecatedConf), + deprecatedKeyValuePairsString, deprecatedConf.key, configType) val fromPrefix = sparkConf.getAllWithPrefix(prefix) From 256703dc766cb89be99318f43ed8243b87ad5c01 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 14 Jun 2017 13:05:23 -0700 Subject: [PATCH 7/7] Remove unused import --- .../org/apache/spark/deploy/kubernetes/ConfigurationUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/ConfigurationUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/ConfigurationUtils.scala index d268a50caf6fa..f461da4809b4d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/ConfigurationUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/ConfigurationUtils.scala @@ -19,7 +19,7 @@ package org.apache.spark.deploy.kubernetes import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.{ConfigEntry, OptionalConfigEntry} +import org.apache.spark.internal.config.OptionalConfigEntry object ConfigurationUtils extends Logging { def parseKeyValuePairs(