Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,23 @@ from the other deployment modes. See the [configuration page](configuration.html
pairs, where each annotation is in the format <code>key=value</code>.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.labels</code></td>
<td>(none)</td>
<td>
Custom labels that will be added to the executor pods. This should be a comma-separated list of label key-value
pairs, where each label is in the format <code>key=value</code>. Note that Spark also adds its own labels to the
executor pods for bookkeeping purposes.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.annotations</code></td>
<td>(none)</td>
<td>
Custom annotations that will be added to the executor pods. This should be a comma-separated list of annotation
key-value pairs, where each annotation is in the format <code>key=value</code>.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.pod.name</code></td>
<td>(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,22 @@ package object config extends Logging {
.stringConf
.createOptional

private[spark] val KUBERNETES_EXECUTOR_LABELS =
ConfigBuilder("spark.kubernetes.executor.labels")
.doc("Custom labels that will be added to the executor pods. This should be a" +
" comma-separated list of label key-value pairs, where each label is in the format" +
" key=value.")
.stringConf
.createOptional

private[spark] val KUBERNETES_EXECUTOR_ANNOTATIONS =
ConfigBuilder("spark.kubernetes.executor.annotations")
.doc("Custom annotations that will be added to the executor pods. This should be a" +
" comma-separated list of annotation key-value pairs, where each annotation is in the" +
" format key=value.")
.stringConf
.createOptional

private[spark] val KUBERNETES_DRIVER_POD_NAME =
ConfigBuilder("spark.kubernetes.driver.pod.name")
.doc("Name of the driver pod.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import java.util.Collections
import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, OwnerReferenceBuilder, PodBuilder}
import scala.collection.JavaConverters._

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.SparkConf
import org.apache.spark.deploy.kubernetes.ConfigurationUtils
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.rest.kubernetes.ResourceStagingServerSslOptionsProviderImpl
Expand Down Expand Up @@ -75,18 +76,16 @@ private[spark] class Client(
def run(): Unit = {
validateNoDuplicateFileNames(sparkJars)
validateNoDuplicateFileNames(sparkFiles)
val parsedCustomLabels = parseKeyValuePairs(customLabels, KUBERNETES_DRIVER_LABELS.key,
"labels")
val parsedCustomLabels = ConfigurationUtils.parseKeyValuePairs(
customLabels, KUBERNETES_DRIVER_LABELS.key, "labels")
require(!parsedCustomLabels.contains(SPARK_APP_ID_LABEL), s"Label with key " +
s" $SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping operations.")
require(!parsedCustomLabels.contains(SPARK_APP_NAME_LABEL), s"Label with key" +
s" $SPARK_APP_NAME_LABEL is not allowed as it is reserved for Spark bookkeeping operations.")
val allLabels = parsedCustomLabels ++
Map(SPARK_APP_ID_LABEL -> kubernetesAppId, SPARK_APP_NAME_LABEL -> appName)
val parsedCustomAnnotations = parseKeyValuePairs(
customAnnotations,
KUBERNETES_DRIVER_ANNOTATIONS.key,
"annotations")
val parsedCustomAnnotations = ConfigurationUtils.parseKeyValuePairs(
customAnnotations, KUBERNETES_DRIVER_ANNOTATIONS.key, "annotations")
Utils.tryWithResource(kubernetesClientProvider.get) { kubernetesClient =>
val driverExtraClasspathEnv = driverExtraClasspath.map { classPath =>
new EnvVarBuilder()
Expand Down Expand Up @@ -237,24 +236,6 @@ private[spark] class Client(
s" file name $fileName is shared by all of these URIs: $urisWithFileName")
}
}

private def parseKeyValuePairs(
maybeKeyValues: Option[String],
configKey: String,
keyValueType: String): Map[String, String] = {
maybeKeyValues.map(keyValues => {
keyValues.split(",").map(_.trim).filterNot(_.isEmpty).map(keyValue => {
keyValue.split("=", 2).toSeq match {
case Seq(k, v) =>
(k, v)
case _ =>
throw new SparkException(s"Custom $keyValueType set by $configKey must be a" +
s" comma-separated list of key-value pairs, with format <key>=<value>." +
s" Got value: $keyValue. All values: $keyValues")
}
}).toMap
}).getOrElse(Map.empty[String, String])
}
}

private[spark] object Client {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,23 @@ private[spark] class KubernetesClusterSchedulerBackend(
private val executorExtraClasspath = conf.get(
org.apache.spark.internal.config.EXECUTOR_CLASS_PATH)
private val executorJarsDownloadDir = conf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION)

private val executorLabels = ConfigurationUtils.parseKeyValuePairs(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

separate issue, but do we have two versions of parseKeyValuePairs ? Also one in org.apache.spark.deploy.kubernetes.submit.Client

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah Client should be using the utility class.

conf.get(KUBERNETES_EXECUTOR_LABELS),
KUBERNETES_EXECUTOR_LABELS.key,
"executor labels")
require(
!executorLabels.contains(SPARK_APP_ID_LABEL),
s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.")
require(
!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL),
s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" +
s" Spark.")
private val executorAnnotations = ConfigurationUtils.parseKeyValuePairs(
conf.get(KUBERNETES_EXECUTOR_ANNOTATIONS),
KUBERNETES_EXECUTOR_ANNOTATIONS.key,
"executor annotations")

private var shufflePodCache: Option[ShufflePodCache] = None
private val executorDockerImage = conf.get(EXECUTOR_DOCKER_IMAGE)
private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE)
Expand Down Expand Up @@ -250,8 +267,10 @@ private[spark] class KubernetesClusterSchedulerBackend(
// executorId and applicationId
val hostname = name.substring(Math.max(0, name.length - 63))

val selectors = Map(SPARK_EXECUTOR_ID_LABEL -> executorId,
SPARK_APP_ID_LABEL -> applicationId()).asJava
val resolvedExecutorLabels = Map(
SPARK_EXECUTOR_ID_LABEL -> executorId,
SPARK_APP_ID_LABEL -> applicationId()) ++
executorLabels
val executorMemoryQuantity = new QuantityBuilder(false)
.withAmount(s"${executorMemoryMb}M")
.build()
Expand Down Expand Up @@ -300,7 +319,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
val basePodBuilder = new PodBuilder()
.withNewMetadata()
.withName(name)
.withLabels(selectors)
.withLabels(resolvedExecutorLabels.asJava)
.withAnnotations(executorAnnotations.asJava)
.withOwnerReferences()
.addNewOwnerReference()
.withController(true)
Expand Down