From 38a530d270ef405d53b501d182b1ad3540b3a2d3 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 1 Mar 2017 15:04:43 -0800 Subject: [PATCH] Allow custom annotations on the driver pod. --- docs/running-on-kubernetes.md | 8 +++ .../spark/deploy/kubernetes/Client.scala | 55 ++++++++++++------- .../spark/deploy/kubernetes/config.scala | 10 ++++ .../integrationtest/KubernetesSuite.scala | 23 +++++--- 4 files changed, 69 insertions(+), 27 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 9d49ac682972..a5806e98ee22 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -187,6 +187,14 @@ from the other deployment modes. See the [configuration page](configuration.html for bookkeeping purposes. + + spark.kubernetes.driver.annotations + (none) + + Custom annotations that will be added to the driver pod. This should be a comma-separated list of label key-value + pairs, where each annotation is in the format key=value. + + spark.kubernetes.driverSubmitTimeout 60s diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index c787d5917e38..c9831ce23ed0 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -18,7 +18,6 @@ package org.apache.spark.deploy.kubernetes import java.io.File import java.security.SecureRandom -import java.util import java.util.concurrent.{CountDownLatch, TimeUnit} import com.google.common.io.Files @@ -73,6 +72,7 @@ private[spark] class Client( private val serviceAccount = sparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME) private val customLabels = sparkConf.get(KUBERNETES_DRIVER_LABELS) + private val customAnnotations = sparkConf.get(KUBERNETES_DRIVER_ANNOTATIONS) private val kubernetesResourceCleaner = new KubernetesResourceCleaner @@ -90,7 +90,18 @@ private[spark] class Client( throw new SparkException(s"Main app resource file $mainAppResource is not a file or" + s" is a directory.") } - val parsedCustomLabels = parseCustomLabels(customLabels) + val parsedCustomLabels = parseKeyValuePairs(customLabels, KUBERNETES_DRIVER_LABELS.key, + "labels") + parsedCustomLabels.keys.foreach { key => + require(key != SPARK_APP_ID_LABEL, "Label with key" + + s" $SPARK_APP_ID_LABEL cannot be used in" + + " spark.kubernetes.driver.labels, as it is reserved for Spark's" + + " internal configuration.") + } + val parsedCustomAnnotations = parseKeyValuePairs( + customAnnotations, + KUBERNETES_DRIVER_ANNOTATIONS.key, + "annotations") var k8ConfBuilder = new K8SConfigBuilder() .withApiVersion("v1") .withMasterUrl(master) @@ -134,6 +145,7 @@ private[spark] class Client( val (driverPod, driverService) = launchDriverKubernetesComponents( kubernetesClient, parsedCustomLabels, + parsedCustomAnnotations, submitServerSecret, sslConfiguration) configureOwnerReferences( @@ -215,14 +227,15 @@ private[spark] class Client( private def launchDriverKubernetesComponents( kubernetesClient: KubernetesClient, - parsedCustomLabels: Map[String, String], + customLabels: Map[String, String], + customAnnotations: Map[String, String], submitServerSecret: Secret, sslConfiguration: SslConfiguration): (Pod, Service) = { val driverKubernetesSelectors = (Map( SPARK_DRIVER_LABEL -> kubernetesAppId, SPARK_APP_ID_LABEL -> kubernetesAppId, SPARK_APP_NAME_LABEL -> appName) - ++ parsedCustomLabels).asJava + ++ customLabels) val endpointsReadyFuture = SettableFuture.create[Endpoints] val endpointsReadyWatcher = new DriverEndpointsReadyWatcher(endpointsReadyFuture) val serviceReadyFuture = SettableFuture.create[Service] @@ -249,6 +262,7 @@ private[spark] class Client( val driverPod = createDriverPod( kubernetesClient, driverKubernetesSelectors, + customAnnotations, submitServerSecret, sslConfiguration) kubernetesResourceCleaner.registerOrUpdateResource(driverPod) @@ -342,7 +356,7 @@ private[spark] class Client( private def createDriverService( kubernetesClient: KubernetesClient, - driverKubernetesSelectors: java.util.Map[String, String], + driverKubernetesSelectors: Map[String, String], submitServerSecret: Secret): Service = { val driverSubmissionServicePort = new ServicePortBuilder() .withName(SUBMISSION_SERVER_PORT_NAME) @@ -352,11 +366,11 @@ private[spark] class Client( kubernetesClient.services().createNew() .withNewMetadata() .withName(kubernetesAppId) - .withLabels(driverKubernetesSelectors) + .withLabels(driverKubernetesSelectors.asJava) .endMetadata() .withNewSpec() .withType("NodePort") - .withSelector(driverKubernetesSelectors) + .withSelector(driverKubernetesSelectors.asJava) .withPorts(driverSubmissionServicePort) .endSpec() .done() @@ -364,7 +378,8 @@ private[spark] class Client( private def createDriverPod( kubernetesClient: KubernetesClient, - driverKubernetesSelectors: util.Map[String, String], + driverKubernetesSelectors: Map[String, String], + customAnnotations: Map[String, String], submitServerSecret: Secret, sslConfiguration: SslConfiguration): Pod = { val containerPorts = buildContainerPorts() @@ -376,7 +391,8 @@ private[spark] class Client( kubernetesClient.pods().createNew() .withNewMetadata() .withName(kubernetesAppId) - .withLabels(driverKubernetesSelectors) + .withLabels(driverKubernetesSelectors.asJava) + .withAnnotations(customAnnotations.asJava) .endMetadata() .withNewSpec() .withRestartPolicy("Never") @@ -601,20 +617,19 @@ private[spark] class Client( connectTimeoutMillis = 5000) } - private def parseCustomLabels(maybeLabels: Option[String]): Map[String, String] = { - maybeLabels.map(labels => { - labels.split(",").map(_.trim).filterNot(_.isEmpty).map(label => { - label.split("=", 2).toSeq match { + private def parseKeyValuePairs( + maybeKeyValues: Option[String], + configKey: String, + keyValueType: String): Map[String, String] = { + maybeKeyValues.map(keyValues => { + keyValues.split(",").map(_.trim).filterNot(_.isEmpty).map(keyValue => { + keyValue.split("=", 2).toSeq match { case Seq(k, v) => - require(k != SPARK_APP_ID_LABEL, "Label with key" + - s" $SPARK_APP_ID_LABEL cannot be used in" + - " spark.kubernetes.driver.labels, as it is reserved for Spark's" + - " internal configuration.") (k, v) case _ => - throw new SparkException("Custom labels set by spark.kubernetes.driver.labels" + - " must be a comma-separated list of key-value pairs, with format =." + - s" Got label: $label. All labels: $labels") + throw new SparkException(s"Custom $keyValueType set by $configKey must be a" + + s" comma-separated list of key-value pairs, with format =." + + s" Got value: $keyValue. All values: $keyValues") } }).toMap }).getOrElse(Map.empty[String, String]) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index a21ec2101cc6..bc2f9d578555 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -118,6 +118,16 @@ package object config { .stringConf .createOptional + private[spark] val KUBERNETES_DRIVER_ANNOTATIONS = + ConfigBuilder("spark.kubernetes.driver.annotations") + .doc(""" + | Custom annotations that will be added to the driver pod. + | This should be a comma-separated list of annotation key-value + | pairs, where each annotation is in the format key=value. + """.stripMargin) + .stringConf + .createOptional + private[spark] val KUBERNETES_DRIVER_SUBMIT_TIMEOUT = ConfigBuilder("spark.kubernetes.driverSubmitTimeout") .doc(""" diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index fe171db15b3d..11c85caa6fc9 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -229,7 +229,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { expectationsForStaticAllocation(sparkMetricsService) } - test("Run with custom labels") { + test("Run with custom labels and annotations") { val args = Array( "--master", s"k8s://https://${Minikube.getMinikubeIp}:8443", "--deploy-mode", "cluster", @@ -246,26 +246,35 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { "--conf", "spark.kubernetes.executor.docker.image=spark-executor:latest", "--conf", "spark.kubernetes.driver.docker.image=spark-driver:latest", "--conf", "spark.kubernetes.driver.labels=label1=label1value,label2=label2value", + "--conf", "spark.kubernetes.driver.annotations=" + + "annotation1=annotation1value," + + "annotation2=annotation2value", "--conf", "spark.kubernetes.submit.waitAppCompletion=false", EXAMPLES_JAR_FILE.getAbsolutePath) SparkSubmit.main(args) - val driverPodLabels = minikubeKubernetesClient + val driverPodMetadata = minikubeKubernetesClient .pods .withLabel("spark-app-name", "spark-pi") .list() .getItems .get(0) .getMetadata - .getLabels + val driverPodLabels = driverPodMetadata.getLabels // We can't match all of the selectors directly since one of the selectors is based on the // launch time. - assert(driverPodLabels.size == 5, "Unexpected number of pod labels.") - assert(driverPodLabels.get("spark-app-name") == "spark-pi", "Unexpected value for" + + assert(driverPodLabels.size === 5, "Unexpected number of pod labels.") + assert(driverPodLabels.get("spark-app-name") === "spark-pi", "Unexpected value for" + " spark-app-name label.") assert(driverPodLabels.get("spark-app-id").startsWith("spark-pi"), "Unexpected value for" + " spark-app-id label (should be prefixed with the app name).") - assert(driverPodLabels.get("label1") == "label1value", "Unexpected value for label1") - assert(driverPodLabels.get("label2") == "label2value", "Unexpected value for label2") + assert(driverPodLabels.get("label1") === "label1value", "Unexpected value for label1") + assert(driverPodLabels.get("label2") === "label2value", "Unexpected value for label2") + val driverPodAnnotations = driverPodMetadata.getAnnotations + assert(driverPodAnnotations.size === 2, "Unexpected number of pod annotations.") + assert(driverPodAnnotations.get("annotation1") === "annotation1value", + "Unexpected value for annotation1") + assert(driverPodAnnotations.get("annotation2") === "annotation2value", + "Unexpected value for annotation2") } test("Enable SSL on the driver submit server") {