diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index c10630fc5c5c6..52d847b4420cf 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -524,10 +524,52 @@ from the other deployment modes. See the [configuration page](configuration.html (typically 6-10%). + + spark.kubernetes.driver.label.[labelKey] + (none) + + Adds a label to the driver pod, with key labelKey and the value as the configuration's value. For + example, setting spark.kubernetes.driver.label.identifier to myIdentifier will result in + the driver pod having a label with key identifier and value myIdentifier. Multiple labels + can be added by setting multiple configurations with this prefix. + + + + spark.kubernetes.driver.annotation.[annotationKey] + (none) + + Adds an annotation to the driver pod, with key annotationKey and the value as the configuration's + value. For example, setting spark.kubernetes.driver.annotation.identifier to myIdentifier + will result in the driver pod having an annotation with key identifier and value + myIdentifier. Multiple annotations can be added by setting multiple configurations with this prefix. + + + + spark.kubernetes.executor.label.[labelKey] + (none) + + Adds a label to all executor pods, with key labelKey and the value as the configuration's value. For + example, setting spark.kubernetes.executor.label.identifier to myIdentifier will result in + the executor pods having a label with key identifier and value myIdentifier. Multiple + labels can be added by setting multiple configurations with this prefix. + + + + spark.kubernetes.executor.annotation.[annotationKey] + (none) + + Adds an annotation to the executor pods, with key annotationKey and the value as the configuration's + value. For example, setting spark.kubernetes.executor.annotation.identifier to myIdentifier + will result in the executor pods having an annotation with key identifier and value + myIdentifier. Multiple annotations can be added by setting multiple configurations with this prefix. + + spark.kubernetes.driver.labels (none) + Deprecated. Use spark.kubernetes.driver.label. instead which supports = + and , characters in label values. Custom labels that will be added to the driver pod. This should be a comma-separated list of label key-value pairs, where each label is in the format key=value. Note that Spark also adds its own labels to the driver pod for bookkeeping purposes. @@ -537,6 +579,8 @@ from the other deployment modes. See the [configuration page](configuration.html spark.kubernetes.driver.annotations (none) + Deprecated. Use spark.kubernetes.driver.annotation. instead which supports + = and , characters in annotation values. Custom annotations that will be added to the driver pod. This should be a comma-separated list of label key-value pairs, where each annotation is in the format key=value. @@ -545,6 +589,8 @@ from the other deployment modes. See the [configuration page](configuration.html spark.kubernetes.executor.labels (none) + Deprecated. Use spark.kubernetes.executor.label. instead which supports + = and , characters in label values. Custom labels that will be added to the executor pods. This should be a comma-separated list of label key-value pairs, where each label is in the format key=value. Note that Spark also adds its own labels to the executor pods for bookkeeping purposes. @@ -554,6 +600,8 @@ from the other deployment modes. See the [configuration page](configuration.html spark.kubernetes.executor.annotations (none) + Deprecated. Use spark.kubernetes.executor.annotation. instead which supports + = and , characters in annotation values. Custom annotations that will be added to the executor pods. This should be a comma-separated list of annotation key-value pairs, where each annotation is in the format key=value. diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/ConfigurationUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/ConfigurationUtils.scala index f3bd598556019..f461da4809b4d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/ConfigurationUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/ConfigurationUtils.scala @@ -17,9 +17,11 @@ package org.apache.spark.deploy.kubernetes -import org.apache.spark.SparkException +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.OptionalConfigEntry -object ConfigurationUtils { +object ConfigurationUtils extends Logging { def parseKeyValuePairs( maybeKeyValues: Option[String], configKey: String, @@ -38,4 +40,29 @@ object ConfigurationUtils { }).toMap }).getOrElse(Map.empty[String, String]) } + + def combinePrefixedKeyValuePairsWithDeprecatedConf( + sparkConf: SparkConf, + prefix: String, + deprecatedConf: OptionalConfigEntry[String], + configType: String): Map[String, String] = { + val deprecatedKeyValuePairsString = sparkConf.get(deprecatedConf) + deprecatedKeyValuePairsString.foreach { _ => + logWarning(s"Configuration with key ${deprecatedConf.key} is deprecated. Use" + + s" configurations with prefix $prefix instead.") + } + val fromDeprecated = parseKeyValuePairs( + deprecatedKeyValuePairsString, + deprecatedConf.key, + configType) + val fromPrefix = sparkConf.getAllWithPrefix(prefix) + val combined = fromDeprecated.toSeq ++ fromPrefix + combined.groupBy(_._1).foreach { + case (key, values) => + require(values.size == 1, + s"Cannot have multiple values for a given $configType key, got key $key with" + + s" values $values") + } + combined.toMap + } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index d1fd88fc880d1..70ea19e44ef8c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -113,6 +113,11 @@ package object config extends Logging { .bytesConf(ByteUnit.MiB) .createOptional + private[spark] val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label." + private[spark] val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation." + private[spark] val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label." + private[spark] val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation." + private[spark] val KUBERNETES_DRIVER_LABELS = ConfigBuilder("spark.kubernetes.driver.labels") .doc("Custom labels that will be added to the driver pod. This should be a comma-separated" + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala index c2e616eadc1e0..a9699d8c34b4e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala @@ -29,6 +29,7 @@ import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ import org.apache.spark.deploy.rest.kubernetes.ResourceStagingServerSslOptionsProviderImpl import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.ConfigEntry import org.apache.spark.launcher.SparkLauncher import org.apache.spark.util.Utils @@ -82,17 +83,25 @@ private[spark] class Client( def run(): Unit = { validateNoDuplicateFileNames(sparkJars) validateNoDuplicateFileNames(sparkFiles) - val parsedCustomLabels = ConfigurationUtils.parseKeyValuePairs( - customLabels, KUBERNETES_DRIVER_LABELS.key, "labels") - require(!parsedCustomLabels.contains(SPARK_APP_ID_LABEL), s"Label with key " + - s" $SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping" + - s" operations.") - val parsedCustomAnnotations = ConfigurationUtils.parseKeyValuePairs( - customAnnotations, KUBERNETES_DRIVER_ANNOTATIONS.key, "annotations") - require(!parsedCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION), s"Annotation with key" + - s" $SPARK_APP_NAME_ANNOTATION is not allowed as it is reserved for Spark bookkeeping" + - s" operations.") - val allLabels = parsedCustomLabels ++ Map( + + val driverCustomLabels = ConfigurationUtils.combinePrefixedKeyValuePairsWithDeprecatedConf( + sparkConf, + KUBERNETES_DRIVER_LABEL_PREFIX, + KUBERNETES_DRIVER_LABELS, + "label") + require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), s"Label with key " + + s" $SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping" + + s" operations.") + + val driverCustomAnnotations = ConfigurationUtils.combinePrefixedKeyValuePairsWithDeprecatedConf( + sparkConf, + KUBERNETES_DRIVER_ANNOTATION_PREFIX, + KUBERNETES_DRIVER_ANNOTATIONS, + "annotation") + require(!driverCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION), + s"Annotation with key $SPARK_APP_NAME_ANNOTATION is not allowed as it is reserved for" + + s" Spark bookkeeping operations.") + val allDriverLabels = driverCustomLabels ++ Map( SPARK_APP_ID_LABEL -> kubernetesAppId, SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) @@ -138,8 +147,8 @@ private[spark] class Client( val basePod = new PodBuilder() .withNewMetadata() .withName(kubernetesDriverPodName) - .addToLabels(allLabels.asJava) - .addToAnnotations(parsedCustomAnnotations.asJava) + .addToLabels(allDriverLabels.asJava) + .addToAnnotations(driverCustomAnnotations.toMap.asJava) .addToAnnotations(SPARK_APP_NAME_ANNOTATION, appName) .endMetadata() .withNewSpec() @@ -148,7 +157,7 @@ private[spark] class Client( .endSpec() val maybeSubmittedDependencyUploader = initContainerComponentsProvider - .provideInitContainerSubmittedDependencyUploader(allLabels) + .provideInitContainerSubmittedDependencyUploader(allDriverLabels) val maybeSubmittedResourceIdentifiers = maybeSubmittedDependencyUploader.map { uploader => SubmittedResources(uploader.uploadJars(), uploader.uploadFiles()) } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 85ce5f01200b2..4165eb8cbd067 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -62,10 +62,11 @@ private[spark] class KubernetesClusterSchedulerBackend( org.apache.spark.internal.config.EXECUTOR_CLASS_PATH) private val executorJarsDownloadDir = conf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION) - private val executorLabels = ConfigurationUtils.parseKeyValuePairs( - conf.get(KUBERNETES_EXECUTOR_LABELS), - KUBERNETES_EXECUTOR_LABELS.key, - "executor labels") + private val executorLabels = ConfigurationUtils.combinePrefixedKeyValuePairsWithDeprecatedConf( + conf, + KUBERNETES_EXECUTOR_LABEL_PREFIX, + KUBERNETES_EXECUTOR_LABELS, + "executor label") require( !executorLabels.contains(SPARK_APP_ID_LABEL), s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is" + @@ -74,11 +75,13 @@ private[spark] class KubernetesClusterSchedulerBackend( !executorLabels.contains(SPARK_EXECUTOR_ID_LABEL), s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" + s" Spark.") - private val executorAnnotations = ConfigurationUtils.parseKeyValuePairs( - conf.get(KUBERNETES_EXECUTOR_ANNOTATIONS), - KUBERNETES_EXECUTOR_ANNOTATIONS.key, - "executor annotations") + private val executorAnnotations = + ConfigurationUtils.combinePrefixedKeyValuePairsWithDeprecatedConf( + conf, + KUBERNETES_EXECUTOR_ANNOTATION_PREFIX, + KUBERNETES_EXECUTOR_ANNOTATIONS, + "executor annotation") private var shufflePodCache: Option[ShufflePodCache] = None private val executorDockerImage = conf.get(EXECUTOR_DOCKER_IMAGE) private val dockerImagePullPolicy = conf.get(DOCKER_IMAGE_PULL_POLICY) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala index 193f36a7423b2..3945bef5bcfb8 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala @@ -49,12 +49,17 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { private val APP_ID = "spark-id" private val CUSTOM_LABEL_KEY = "customLabel" private val CUSTOM_LABEL_VALUE = "customLabelValue" + private val DEPRECATED_CUSTOM_LABEL_KEY = "deprecatedCustomLabel" + private val DEPRECATED_CUSTOM_LABEL_VALUE = "deprecatedCustomLabelValue" private val ALL_EXPECTED_LABELS = Map( CUSTOM_LABEL_KEY -> CUSTOM_LABEL_VALUE, + DEPRECATED_CUSTOM_LABEL_KEY -> DEPRECATED_CUSTOM_LABEL_VALUE, SPARK_APP_ID_LABEL -> APP_ID, SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) private val CUSTOM_ANNOTATION_KEY = "customAnnotation" private val CUSTOM_ANNOTATION_VALUE = "customAnnotationValue" + private val DEPRECATED_CUSTOM_ANNOTATION_KEY = "deprecatedCustomAnnotation" + private val DEPRECATED_CUSTOM_ANNOTATION_VALUE = "deprecatedCustomAnnotationValue" private val INIT_CONTAINER_SECRET_NAME = "init-container-secret" private val INIT_CONTAINER_SECRET_DATA = Map("secret-key" -> "secret-data") private val MAIN_CLASS = "org.apache.spark.examples.SparkPi" @@ -94,8 +99,11 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { .set(DRIVER_DOCKER_IMAGE, CUSTOM_DRIVER_IMAGE) .set(org.apache.spark.internal.config.DRIVER_MEMORY, DRIVER_MEMORY_MB.toLong) .set(KUBERNETES_DRIVER_MEMORY_OVERHEAD, DRIVER_MEMORY_OVERHEAD_MB.toLong) - .set(KUBERNETES_DRIVER_LABELS, s"$CUSTOM_LABEL_KEY=$CUSTOM_LABEL_VALUE") - .set(KUBERNETES_DRIVER_ANNOTATIONS, s"$CUSTOM_ANNOTATION_KEY=$CUSTOM_ANNOTATION_VALUE") + .set(KUBERNETES_DRIVER_LABELS, s"$DEPRECATED_CUSTOM_LABEL_KEY=$DEPRECATED_CUSTOM_LABEL_VALUE") + .set(KUBERNETES_DRIVER_ANNOTATIONS, + s"$DEPRECATED_CUSTOM_ANNOTATION_KEY=$DEPRECATED_CUSTOM_ANNOTATION_VALUE") + .set(s"$KUBERNETES_DRIVER_LABEL_PREFIX$CUSTOM_LABEL_KEY", CUSTOM_LABEL_VALUE) + .set(s"$KUBERNETES_DRIVER_ANNOTATION_PREFIX$CUSTOM_ANNOTATION_KEY", CUSTOM_ANNOTATION_VALUE) .set(org.apache.spark.internal.config.DRIVER_CLASS_PATH, DRIVER_EXTRA_CLASSPATH) .set(org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS, DRIVER_JAVA_OPTIONS) private val EXECUTOR_INIT_CONF_KEY = "executor-init-conf" @@ -444,6 +452,7 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { private def podHasCorrectAnnotations(pod: Pod): Boolean = { val expectedAnnotations = Map( + DEPRECATED_CUSTOM_ANNOTATION_KEY -> DEPRECATED_CUSTOM_ANNOTATION_VALUE, CUSTOM_ANNOTATION_KEY -> CUSTOM_ANNOTATION_VALUE, SPARK_APP_NAME_ANNOTATION -> APP_NAME, BOOTSTRAPPED_POD_ANNOTATION -> TRUE)