Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,14 @@ from the other deployment modes. See the [configuration page](configuration.html
for bookkeeping purposes.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.annotations</code></td>
<td>(none)</td>
<td>
Custom annotations that will be added to the driver pod. This should be a comma-separated list of label key-value
pairs, where each annotation is in the format <code>key=value</code>.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driverSubmitTimeout</code></td>
<td>60s</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package org.apache.spark.deploy.kubernetes

import java.io.File
import java.security.SecureRandom
import java.util
import java.util.concurrent.{CountDownLatch, TimeUnit}

import com.google.common.io.Files
Expand Down Expand Up @@ -73,6 +72,7 @@ private[spark] class Client(

private val serviceAccount = sparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME)
private val customLabels = sparkConf.get(KUBERNETES_DRIVER_LABELS)
private val customAnnotations = sparkConf.get(KUBERNETES_DRIVER_ANNOTATIONS)

private val kubernetesResourceCleaner = new KubernetesResourceCleaner

Expand All @@ -90,7 +90,18 @@ private[spark] class Client(
throw new SparkException(s"Main app resource file $mainAppResource is not a file or" +
s" is a directory.")
}
val parsedCustomLabels = parseCustomLabels(customLabels)
val parsedCustomLabels = parseKeyValuePairs(customLabels, KUBERNETES_DRIVER_LABELS.key,
"labels")
parsedCustomLabels.keys.foreach { key =>
require(key != SPARK_APP_ID_LABEL, "Label with key" +
s" $SPARK_APP_ID_LABEL cannot be used in" +
" spark.kubernetes.driver.labels, as it is reserved for Spark's" +
" internal configuration.")
}
val parsedCustomAnnotations = parseKeyValuePairs(
customAnnotations,
KUBERNETES_DRIVER_ANNOTATIONS.key,
"annotations")
var k8ConfBuilder = new K8SConfigBuilder()
.withApiVersion("v1")
.withMasterUrl(master)
Expand Down Expand Up @@ -134,6 +145,7 @@ private[spark] class Client(
val (driverPod, driverService) = launchDriverKubernetesComponents(
kubernetesClient,
parsedCustomLabels,
parsedCustomAnnotations,
submitServerSecret,
sslConfiguration)
configureOwnerReferences(
Expand Down Expand Up @@ -215,14 +227,15 @@ private[spark] class Client(

private def launchDriverKubernetesComponents(
kubernetesClient: KubernetesClient,
parsedCustomLabels: Map[String, String],
customLabels: Map[String, String],
customAnnotations: Map[String, String],
submitServerSecret: Secret,
sslConfiguration: SslConfiguration): (Pod, Service) = {
val driverKubernetesSelectors = (Map(
SPARK_DRIVER_LABEL -> kubernetesAppId,
SPARK_APP_ID_LABEL -> kubernetesAppId,
SPARK_APP_NAME_LABEL -> appName)
++ parsedCustomLabels).asJava
++ customLabels)
val endpointsReadyFuture = SettableFuture.create[Endpoints]
val endpointsReadyWatcher = new DriverEndpointsReadyWatcher(endpointsReadyFuture)
val serviceReadyFuture = SettableFuture.create[Service]
Expand All @@ -249,6 +262,7 @@ private[spark] class Client(
val driverPod = createDriverPod(
kubernetesClient,
driverKubernetesSelectors,
customAnnotations,
submitServerSecret,
sslConfiguration)
kubernetesResourceCleaner.registerOrUpdateResource(driverPod)
Expand Down Expand Up @@ -342,7 +356,7 @@ private[spark] class Client(

private def createDriverService(
kubernetesClient: KubernetesClient,
driverKubernetesSelectors: java.util.Map[String, String],
driverKubernetesSelectors: Map[String, String],
submitServerSecret: Secret): Service = {
val driverSubmissionServicePort = new ServicePortBuilder()
.withName(SUBMISSION_SERVER_PORT_NAME)
Expand All @@ -352,19 +366,20 @@ private[spark] class Client(
kubernetesClient.services().createNew()
.withNewMetadata()
.withName(kubernetesAppId)
.withLabels(driverKubernetesSelectors)
.withLabels(driverKubernetesSelectors.asJava)
.endMetadata()
.withNewSpec()
.withType("NodePort")
.withSelector(driverKubernetesSelectors)
.withSelector(driverKubernetesSelectors.asJava)
.withPorts(driverSubmissionServicePort)
.endSpec()
.done()
}

private def createDriverPod(
kubernetesClient: KubernetesClient,
driverKubernetesSelectors: util.Map[String, String],
driverKubernetesSelectors: Map[String, String],
customAnnotations: Map[String, String],
submitServerSecret: Secret,
sslConfiguration: SslConfiguration): Pod = {
val containerPorts = buildContainerPorts()
Expand All @@ -376,7 +391,8 @@ private[spark] class Client(
kubernetesClient.pods().createNew()
.withNewMetadata()
.withName(kubernetesAppId)
.withLabels(driverKubernetesSelectors)
.withLabels(driverKubernetesSelectors.asJava)
.withAnnotations(customAnnotations.asJava)
.endMetadata()
.withNewSpec()
.withRestartPolicy("Never")
Expand Down Expand Up @@ -601,20 +617,19 @@ private[spark] class Client(
connectTimeoutMillis = 5000)
}

private def parseCustomLabels(maybeLabels: Option[String]): Map[String, String] = {
maybeLabels.map(labels => {
labels.split(",").map(_.trim).filterNot(_.isEmpty).map(label => {
label.split("=", 2).toSeq match {
private def parseKeyValuePairs(
maybeKeyValues: Option[String],
configKey: String,
keyValueType: String): Map[String, String] = {
maybeKeyValues.map(keyValues => {
keyValues.split(",").map(_.trim).filterNot(_.isEmpty).map(keyValue => {
keyValue.split("=", 2).toSeq match {
case Seq(k, v) =>
require(k != SPARK_APP_ID_LABEL, "Label with key" +
s" $SPARK_APP_ID_LABEL cannot be used in" +
" spark.kubernetes.driver.labels, as it is reserved for Spark's" +
" internal configuration.")
(k, v)
case _ =>
throw new SparkException("Custom labels set by spark.kubernetes.driver.labels" +
" must be a comma-separated list of key-value pairs, with format <key>=<value>." +
s" Got label: $label. All labels: $labels")
throw new SparkException(s"Custom $keyValueType set by $configKey must be a" +
s" comma-separated list of key-value pairs, with format <key>=<value>." +
s" Got value: $keyValue. All values: $keyValues")
}
}).toMap
}).getOrElse(Map.empty[String, String])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,16 @@ package object config {
.stringConf
.createOptional

private[spark] val KUBERNETES_DRIVER_ANNOTATIONS =
ConfigBuilder("spark.kubernetes.driver.annotations")
.doc("""
| Custom annotations that will be added to the driver pod.
| This should be a comma-separated list of annotation key-value
| pairs, where each annotation is in the format key=value.
""".stripMargin)
.stringConf
.createOptional

private[spark] val KUBERNETES_DRIVER_SUBMIT_TIMEOUT =
ConfigBuilder("spark.kubernetes.driverSubmitTimeout")
.doc("""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
expectationsForStaticAllocation(sparkMetricsService)
}

test("Run with custom labels") {
test("Run with custom labels and annotations") {
val args = Array(
"--master", s"k8s://https://${Minikube.getMinikubeIp}:8443",
"--deploy-mode", "cluster",
Expand All @@ -246,26 +246,35 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
"--conf", "spark.kubernetes.executor.docker.image=spark-executor:latest",
"--conf", "spark.kubernetes.driver.docker.image=spark-driver:latest",
"--conf", "spark.kubernetes.driver.labels=label1=label1value,label2=label2value",
"--conf", "spark.kubernetes.driver.annotations=" +
"annotation1=annotation1value," +
"annotation2=annotation2value",
"--conf", "spark.kubernetes.submit.waitAppCompletion=false",
EXAMPLES_JAR_FILE.getAbsolutePath)
SparkSubmit.main(args)
val driverPodLabels = minikubeKubernetesClient
val driverPodMetadata = minikubeKubernetesClient
.pods
.withLabel("spark-app-name", "spark-pi")
.list()
.getItems
.get(0)
.getMetadata
.getLabels
val driverPodLabels = driverPodMetadata.getLabels
// We can't match all of the selectors directly since one of the selectors is based on the
// launch time.
assert(driverPodLabels.size == 5, "Unexpected number of pod labels.")
assert(driverPodLabels.get("spark-app-name") == "spark-pi", "Unexpected value for" +
assert(driverPodLabels.size === 5, "Unexpected number of pod labels.")
assert(driverPodLabels.get("spark-app-name") === "spark-pi", "Unexpected value for" +
" spark-app-name label.")
assert(driverPodLabels.get("spark-app-id").startsWith("spark-pi"), "Unexpected value for" +
" spark-app-id label (should be prefixed with the app name).")
assert(driverPodLabels.get("label1") == "label1value", "Unexpected value for label1")
assert(driverPodLabels.get("label2") == "label2value", "Unexpected value for label2")
assert(driverPodLabels.get("label1") === "label1value", "Unexpected value for label1")
assert(driverPodLabels.get("label2") === "label2value", "Unexpected value for label2")
val driverPodAnnotations = driverPodMetadata.getAnnotations
assert(driverPodAnnotations.size === 2, "Unexpected number of pod annotations.")
assert(driverPodAnnotations.get("annotation1") === "annotation1value",
"Unexpected value for annotation1")
assert(driverPodAnnotations.get("annotation2") === "annotation2value",
"Unexpected value for annotation2")
}

test("Enable SSL on the driver submit server") {
Expand Down