diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index 9d49ac682972..a5806e98ee22 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -187,6 +187,14 @@ from the other deployment modes. See the [configuration page](configuration.html
for bookkeeping purposes.
+
+ spark.kubernetes.driver.annotations |
+ (none) |
+
+ Custom annotations that will be added to the driver pod. This should be a comma-separated list of label key-value
+ pairs, where each annotation is in the format key=value.
+ |
+
spark.kubernetes.driverSubmitTimeout |
60s |
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala
index c787d5917e38..c9831ce23ed0 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala
@@ -18,7 +18,6 @@ package org.apache.spark.deploy.kubernetes
import java.io.File
import java.security.SecureRandom
-import java.util
import java.util.concurrent.{CountDownLatch, TimeUnit}
import com.google.common.io.Files
@@ -73,6 +72,7 @@ private[spark] class Client(
private val serviceAccount = sparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME)
private val customLabels = sparkConf.get(KUBERNETES_DRIVER_LABELS)
+ private val customAnnotations = sparkConf.get(KUBERNETES_DRIVER_ANNOTATIONS)
private val kubernetesResourceCleaner = new KubernetesResourceCleaner
@@ -90,7 +90,18 @@ private[spark] class Client(
throw new SparkException(s"Main app resource file $mainAppResource is not a file or" +
s" is a directory.")
}
- val parsedCustomLabels = parseCustomLabels(customLabels)
+ val parsedCustomLabels = parseKeyValuePairs(customLabels, KUBERNETES_DRIVER_LABELS.key,
+ "labels")
+ parsedCustomLabels.keys.foreach { key =>
+ require(key != SPARK_APP_ID_LABEL, "Label with key" +
+ s" $SPARK_APP_ID_LABEL cannot be used in" +
+ " spark.kubernetes.driver.labels, as it is reserved for Spark's" +
+ " internal configuration.")
+ }
+ val parsedCustomAnnotations = parseKeyValuePairs(
+ customAnnotations,
+ KUBERNETES_DRIVER_ANNOTATIONS.key,
+ "annotations")
var k8ConfBuilder = new K8SConfigBuilder()
.withApiVersion("v1")
.withMasterUrl(master)
@@ -134,6 +145,7 @@ private[spark] class Client(
val (driverPod, driverService) = launchDriverKubernetesComponents(
kubernetesClient,
parsedCustomLabels,
+ parsedCustomAnnotations,
submitServerSecret,
sslConfiguration)
configureOwnerReferences(
@@ -215,14 +227,15 @@ private[spark] class Client(
private def launchDriverKubernetesComponents(
kubernetesClient: KubernetesClient,
- parsedCustomLabels: Map[String, String],
+ customLabels: Map[String, String],
+ customAnnotations: Map[String, String],
submitServerSecret: Secret,
sslConfiguration: SslConfiguration): (Pod, Service) = {
val driverKubernetesSelectors = (Map(
SPARK_DRIVER_LABEL -> kubernetesAppId,
SPARK_APP_ID_LABEL -> kubernetesAppId,
SPARK_APP_NAME_LABEL -> appName)
- ++ parsedCustomLabels).asJava
+ ++ customLabels)
val endpointsReadyFuture = SettableFuture.create[Endpoints]
val endpointsReadyWatcher = new DriverEndpointsReadyWatcher(endpointsReadyFuture)
val serviceReadyFuture = SettableFuture.create[Service]
@@ -249,6 +262,7 @@ private[spark] class Client(
val driverPod = createDriverPod(
kubernetesClient,
driverKubernetesSelectors,
+ customAnnotations,
submitServerSecret,
sslConfiguration)
kubernetesResourceCleaner.registerOrUpdateResource(driverPod)
@@ -342,7 +356,7 @@ private[spark] class Client(
private def createDriverService(
kubernetesClient: KubernetesClient,
- driverKubernetesSelectors: java.util.Map[String, String],
+ driverKubernetesSelectors: Map[String, String],
submitServerSecret: Secret): Service = {
val driverSubmissionServicePort = new ServicePortBuilder()
.withName(SUBMISSION_SERVER_PORT_NAME)
@@ -352,11 +366,11 @@ private[spark] class Client(
kubernetesClient.services().createNew()
.withNewMetadata()
.withName(kubernetesAppId)
- .withLabels(driverKubernetesSelectors)
+ .withLabels(driverKubernetesSelectors.asJava)
.endMetadata()
.withNewSpec()
.withType("NodePort")
- .withSelector(driverKubernetesSelectors)
+ .withSelector(driverKubernetesSelectors.asJava)
.withPorts(driverSubmissionServicePort)
.endSpec()
.done()
@@ -364,7 +378,8 @@ private[spark] class Client(
private def createDriverPod(
kubernetesClient: KubernetesClient,
- driverKubernetesSelectors: util.Map[String, String],
+ driverKubernetesSelectors: Map[String, String],
+ customAnnotations: Map[String, String],
submitServerSecret: Secret,
sslConfiguration: SslConfiguration): Pod = {
val containerPorts = buildContainerPorts()
@@ -376,7 +391,8 @@ private[spark] class Client(
kubernetesClient.pods().createNew()
.withNewMetadata()
.withName(kubernetesAppId)
- .withLabels(driverKubernetesSelectors)
+ .withLabels(driverKubernetesSelectors.asJava)
+ .withAnnotations(customAnnotations.asJava)
.endMetadata()
.withNewSpec()
.withRestartPolicy("Never")
@@ -601,20 +617,19 @@ private[spark] class Client(
connectTimeoutMillis = 5000)
}
- private def parseCustomLabels(maybeLabels: Option[String]): Map[String, String] = {
- maybeLabels.map(labels => {
- labels.split(",").map(_.trim).filterNot(_.isEmpty).map(label => {
- label.split("=", 2).toSeq match {
+ private def parseKeyValuePairs(
+ maybeKeyValues: Option[String],
+ configKey: String,
+ keyValueType: String): Map[String, String] = {
+ maybeKeyValues.map(keyValues => {
+ keyValues.split(",").map(_.trim).filterNot(_.isEmpty).map(keyValue => {
+ keyValue.split("=", 2).toSeq match {
case Seq(k, v) =>
- require(k != SPARK_APP_ID_LABEL, "Label with key" +
- s" $SPARK_APP_ID_LABEL cannot be used in" +
- " spark.kubernetes.driver.labels, as it is reserved for Spark's" +
- " internal configuration.")
(k, v)
case _ =>
- throw new SparkException("Custom labels set by spark.kubernetes.driver.labels" +
- " must be a comma-separated list of key-value pairs, with format =." +
- s" Got label: $label. All labels: $labels")
+ throw new SparkException(s"Custom $keyValueType set by $configKey must be a" +
+ s" comma-separated list of key-value pairs, with format =." +
+ s" Got value: $keyValue. All values: $keyValues")
}
}).toMap
}).getOrElse(Map.empty[String, String])
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala
index a21ec2101cc6..bc2f9d578555 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala
@@ -118,6 +118,16 @@ package object config {
.stringConf
.createOptional
+ private[spark] val KUBERNETES_DRIVER_ANNOTATIONS =
+ ConfigBuilder("spark.kubernetes.driver.annotations")
+ .doc("""
+ | Custom annotations that will be added to the driver pod.
+ | This should be a comma-separated list of annotation key-value
+ | pairs, where each annotation is in the format key=value.
+ """.stripMargin)
+ .stringConf
+ .createOptional
+
private[spark] val KUBERNETES_DRIVER_SUBMIT_TIMEOUT =
ConfigBuilder("spark.kubernetes.driverSubmitTimeout")
.doc("""
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala
index fe171db15b3d..11c85caa6fc9 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala
@@ -229,7 +229,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
expectationsForStaticAllocation(sparkMetricsService)
}
- test("Run with custom labels") {
+ test("Run with custom labels and annotations") {
val args = Array(
"--master", s"k8s://https://${Minikube.getMinikubeIp}:8443",
"--deploy-mode", "cluster",
@@ -246,26 +246,35 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
"--conf", "spark.kubernetes.executor.docker.image=spark-executor:latest",
"--conf", "spark.kubernetes.driver.docker.image=spark-driver:latest",
"--conf", "spark.kubernetes.driver.labels=label1=label1value,label2=label2value",
+ "--conf", "spark.kubernetes.driver.annotations=" +
+ "annotation1=annotation1value," +
+ "annotation2=annotation2value",
"--conf", "spark.kubernetes.submit.waitAppCompletion=false",
EXAMPLES_JAR_FILE.getAbsolutePath)
SparkSubmit.main(args)
- val driverPodLabels = minikubeKubernetesClient
+ val driverPodMetadata = minikubeKubernetesClient
.pods
.withLabel("spark-app-name", "spark-pi")
.list()
.getItems
.get(0)
.getMetadata
- .getLabels
+ val driverPodLabels = driverPodMetadata.getLabels
// We can't match all of the selectors directly since one of the selectors is based on the
// launch time.
- assert(driverPodLabels.size == 5, "Unexpected number of pod labels.")
- assert(driverPodLabels.get("spark-app-name") == "spark-pi", "Unexpected value for" +
+ assert(driverPodLabels.size === 5, "Unexpected number of pod labels.")
+ assert(driverPodLabels.get("spark-app-name") === "spark-pi", "Unexpected value for" +
" spark-app-name label.")
assert(driverPodLabels.get("spark-app-id").startsWith("spark-pi"), "Unexpected value for" +
" spark-app-id label (should be prefixed with the app name).")
- assert(driverPodLabels.get("label1") == "label1value", "Unexpected value for label1")
- assert(driverPodLabels.get("label2") == "label2value", "Unexpected value for label2")
+ assert(driverPodLabels.get("label1") === "label1value", "Unexpected value for label1")
+ assert(driverPodLabels.get("label2") === "label2value", "Unexpected value for label2")
+ val driverPodAnnotations = driverPodMetadata.getAnnotations
+ assert(driverPodAnnotations.size === 2, "Unexpected number of pod annotations.")
+ assert(driverPodAnnotations.get("annotation1") === "annotation1value",
+ "Unexpected value for annotation1")
+ assert(driverPodAnnotations.get("annotation2") === "annotation2value",
+ "Unexpected value for annotation2")
}
test("Enable SSL on the driver submit server") {