Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private[spark] class KubernetesShuffleBlockHandler (
try {
Some(kubernetesClient
.pods()
.withLabels(Map(SPARK_ROLE_LABEL -> "driver").asJava)
.withLabels(Map(SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE).asJava)
.watch(new Watcher[Pod] {
override def eventReceived(action: Watcher.Action, p: Pod): Unit = {
action match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,13 @@ package object config extends Logging {
.stringConf
.createOptional

private[spark] val KUBERNETES_EXECUTOR_POD_NAME_PREFIX =
ConfigBuilder("spark.kubernetes.executor.podNamePrefix")
.doc("Prefix to use in front of the executor pod names.")
.internal()
.stringConf
.createWithDefault("spark")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this default ever be used? seems the value will be set before its ever accessed

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's tricky because it's always set on the submission client, but in order to get the value directly we need to use sparkConf.get(...). If the parameter is a config entry of type optional, then we need to call .get on the option, and require it to be present. I think it's ok if the contract is that it's always provided but there's still a default value here. At worst the executor names are incorrect but since we're doing all of our logic based on the ID label, it should not matter for correctness.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compromise with ...getOrElse("spark") ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be more or less the same effect as this, so we might as well encode the default here - code in the config class is preferred over complexity in the scheduler backend class.


private[spark] val KUBERNETES_SHUFFLE_NAMESPACE =
ConfigBuilder("spark.kubernetes.shuffle.namespace")
.doc("Namespace of the shuffle service")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ package org.apache.spark.deploy.kubernetes
package object constants {
// Labels
private[spark] val SPARK_DRIVER_LABEL = "spark-driver"
private[spark] val SPARK_APP_ID_LABEL = "spark-app-id"
private[spark] val SPARK_APP_NAME_LABEL = "spark-app-name"
private[spark] val SPARK_APP_ID_LABEL = "spark-app-selector"
private[spark] val SPARK_EXECUTOR_ID_LABEL = "spark-exec-id"
private[spark] val SPARK_ROLE_LABEL = "spark-role"
private[spark] val SPARK_POD_DRIVER_ROLE = "driver"
private[spark] val SPARK_POD_EXECUTOR_ROLE = "executor"
private[spark] val SPARK_APP_NAME_ANNOTATION = "spark-app-name"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should "reserved" identifiers like "spark-app-name" in annotation keys be documented?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They could be but the error message might be self-documenting enough, and I'm not sure if it's worth the space. Do you anticipate this being an annotation that people will want to set?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems more like a low-prob corner case. If it's a lot of doc, then might not be worth it. If it's low effort then I'd consider it worth doing

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly would be worried if there is potential for silent failures

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


// Credentials secrets
private[spark] val DRIVER_CREDENTIALS_SECRETS_BASE_DIR =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
package org.apache.spark.deploy.kubernetes.submit

import java.io.File
import java.util.Collections
import java.util.{Collections, UUID}

import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, OwnerReferenceBuilder, PodBuilder, QuantityBuilder}
import io.fabric8.kubernetes.client.KubernetesClient
import scala.collection.JavaConverters._

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.SparkConf
import org.apache.spark.deploy.kubernetes.{ConfigurationUtils, SparkKubernetesClientFactory}
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
Expand All @@ -43,22 +43,21 @@ import org.apache.spark.util.Utils
* where different steps of submission should be factored out into separate classes.
*/
private[spark] class Client(
appName: String,
kubernetesAppId: String,
mainClass: String,
sparkConf: SparkConf,
appArgs: Array[String],
sparkJars: Seq[String],
sparkFiles: Seq[String],
waitForAppCompletion: Boolean,
kubernetesClient: KubernetesClient,
initContainerComponentsProvider: DriverInitContainerComponentsProvider,
kubernetesCredentialsMounterProvider: DriverPodKubernetesCredentialsMounterProvider,
loggingPodStatusWatcher: LoggingPodStatusWatcher)
extends Logging {

appName: String,
kubernetesResourceNamePrefix: String,
kubernetesAppId: String,
mainClass: String,
sparkConf: SparkConf,
appArgs: Array[String],
sparkJars: Seq[String],
sparkFiles: Seq[String],
waitForAppCompletion: Boolean,
kubernetesClient: KubernetesClient,
initContainerComponentsProvider: DriverInitContainerComponentsProvider,
kubernetesCredentialsMounterProvider: DriverPodKubernetesCredentialsMounterProvider,
loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging {
private val kubernetesDriverPodName = sparkConf.get(KUBERNETES_DRIVER_POD_NAME)
.getOrElse(kubernetesAppId)
.getOrElse(s"$kubernetesResourceNamePrefix-driver")
private val driverDockerImage = sparkConf.get(DRIVER_DOCKER_IMAGE)
private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY)

Expand Down Expand Up @@ -86,15 +85,16 @@ private[spark] class Client(
val parsedCustomLabels = ConfigurationUtils.parseKeyValuePairs(
customLabels, KUBERNETES_DRIVER_LABELS.key, "labels")
require(!parsedCustomLabels.contains(SPARK_APP_ID_LABEL), s"Label with key " +
s" $SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping operations.")
require(!parsedCustomLabels.contains(SPARK_APP_NAME_LABEL), s"Label with key" +
s" $SPARK_APP_NAME_LABEL is not allowed as it is reserved for Spark bookkeeping operations.")
s" $SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping" +
s" operations.")
val parsedCustomAnnotations = ConfigurationUtils.parseKeyValuePairs(
customAnnotations, KUBERNETES_DRIVER_ANNOTATIONS.key, "annotations")
require(!parsedCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION), s"Annotation with key" +
s" $SPARK_APP_NAME_ANNOTATION is not allowed as it is reserved for Spark bookkeeping" +
s" operations.")
val allLabels = parsedCustomLabels ++ Map(
SPARK_APP_ID_LABEL -> kubernetesAppId,
SPARK_APP_NAME_LABEL -> appName,
SPARK_ROLE_LABEL -> "driver")
val parsedCustomAnnotations = ConfigurationUtils.parseKeyValuePairs(
customAnnotations, KUBERNETES_DRIVER_ANNOTATIONS.key, "annotations")
SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE)

val driverExtraClasspathEnv = driverExtraClasspath.map { classPath =>
new EnvVarBuilder()
Expand Down Expand Up @@ -140,6 +140,7 @@ private[spark] class Client(
.withName(kubernetesDriverPodName)
.addToLabels(allLabels.asJava)
.addToAnnotations(parsedCustomAnnotations.asJava)
.addToAnnotations(SPARK_APP_NAME_ANNOTATION, appName)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

annotations don't have the same restrictions labels do?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't seem that way since we put in full init-container specs in the annotations.

.endMetadata()
.withNewSpec()
.withRestartPolicy("Never")
Expand Down Expand Up @@ -186,6 +187,7 @@ private[spark] class Client(
}
resolvedSparkConf.setIfMissing(KUBERNETES_DRIVER_POD_NAME, kubernetesDriverPodName)
resolvedSparkConf.set("spark.app.id", kubernetesAppId)
resolvedSparkConf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, kubernetesResourceNamePrefix)
// We don't need this anymore since we just set the JVM options on the environment
resolvedSparkConf.remove(org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
val resolvedLocalClasspath = containerLocalizedFilesResolver
Expand Down Expand Up @@ -234,11 +236,11 @@ private[spark] class Client(
throw e
}
if (waitForAppCompletion) {
logInfo(s"Waiting for application $kubernetesAppId to finish...")
logInfo(s"Waiting for application $appName to finish...")
loggingPodStatusWatcher.awaitCompletion()
logInfo(s"Application $kubernetesAppId finished.")
logInfo(s"Application $appName finished.")
} else {
logInfo(s"Deployed Spark application $kubernetesAppId into Kubernetes.")
logInfo(s"Deployed Spark application $appName into Kubernetes.")
}
}
}
Expand Down Expand Up @@ -279,15 +281,21 @@ private[spark] object Client {
val sparkFiles = sparkConf.getOption("spark.files")
.map(_.split(","))
.getOrElse(Array.empty[String])
val appName = sparkConf.getOption("spark.app.name")
.getOrElse("spark")
val kubernetesAppId = s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-")
val appName = sparkConf.getOption("spark.app.name").getOrElse("spark")
// The resource name prefix is derived from the application name, making it easy to connect the
// names of the Kubernetes resources from e.g. Kubectl or the Kubernetes dashboard to the
// application the user submitted. However, we can't use the application name in the label, as
// label values are considerably restrictive, e.g. must be no longer than 63 characters in
// length. So we generate a separate identifier for the app ID itself, and bookkeeping that
// requires finding "all pods for this application" should use the kubernetesAppId.
val kubernetesResourceNamePrefix = s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-")
val kubernetesAppId = s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}"
val namespace = sparkConf.get(KUBERNETES_NAMESPACE)
val master = resolveK8sMaster(sparkConf.get("spark.master"))
val sslOptionsProvider = new ResourceStagingServerSslOptionsProviderImpl(sparkConf)
val initContainerComponentsProvider = new DriverInitContainerComponentsProviderImpl(
sparkConf,
kubernetesAppId,
kubernetesResourceNamePrefix,
namespace,
sparkJars,
sparkFiles,
Expand All @@ -300,14 +308,16 @@ private[spark] object Client {
None,
None)) { kubernetesClient =>
val kubernetesCredentialsMounterProvider =
new DriverPodKubernetesCredentialsMounterProviderImpl(sparkConf, kubernetesAppId)
new DriverPodKubernetesCredentialsMounterProviderImpl(
sparkConf, kubernetesResourceNamePrefix)
val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION)
val loggingInterval = Option(sparkConf.get(REPORT_INTERVAL))
.filter( _ => waitForAppCompletion)
val loggingPodStatusWatcher = new LoggingPodStatusWatcherImpl(
kubernetesAppId, loggingInterval)
kubernetesResourceNamePrefix, loggingInterval)
new Client(
appName,
kubernetesResourceNamePrefix,
kubernetesAppId,
mainClass,
sparkConf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.spark.deploy.kubernetes.submit

import java.io.File

import org.apache.spark.{SparkConf, SSLOptions}
import org.apache.spark.deploy.kubernetes.{InitContainerResourceStagingServerSecretPluginImpl, OptionRequirements, SparkPodInitContainerBootstrap, SparkPodInitContainerBootstrapImpl}
import org.apache.spark.deploy.kubernetes.config._
Expand Down Expand Up @@ -46,12 +44,12 @@ private[spark] trait DriverInitContainerComponentsProvider {
}

private[spark] class DriverInitContainerComponentsProviderImpl(
sparkConf: SparkConf,
kubernetesAppId: String,
namespace: String,
sparkJars: Seq[String],
sparkFiles: Seq[String],
resourceStagingServerExternalSslOptions: SSLOptions)
sparkConf: SparkConf,
kubernetesResourceNamePrefix: String,
namespace: String,
sparkJars: Seq[String],
sparkFiles: Seq[String],
resourceStagingServerExternalSslOptions: SSLOptions)
extends DriverInitContainerComponentsProvider {

private val maybeResourceStagingServerUri = sparkConf.get(RESOURCE_STAGING_SERVER_URI)
Expand Down Expand Up @@ -99,10 +97,10 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
private val jarsDownloadPath = sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION)
private val filesDownloadPath = sparkConf.get(INIT_CONTAINER_FILES_DOWNLOAD_LOCATION)
private val maybeSecretName = maybeResourceStagingServerUri.map { _ =>
s"$kubernetesAppId-init-secret"
s"$kubernetesResourceNamePrefix-init-secret"
}
private val configMapName = s"$kubernetesAppId-init-config"
private val configMapKey = s"$kubernetesAppId-init-config-key"
private val configMapName = s"$kubernetesResourceNamePrefix-init-config"
private val configMapKey = s"$kubernetesResourceNamePrefix-init-config-key"
private val initContainerImage = sparkConf.get(INIT_CONTAINER_DOCKER_IMAGE)
private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY)
private val downloadTimeoutMinutes = sparkConf.get(INIT_CONTAINER_MOUNT_TIMEOUT)
Expand All @@ -116,29 +114,29 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
filesResourceId <- maybeSubmittedResourceIds.map(_.filesResourceId)
} yield {
new SubmittedDependencyInitContainerConfigPluginImpl(
// Configure the init-container with the internal URI over the external URI.
maybeResourceStagingServerInternalUri.getOrElse(stagingServerUri),
jarsResourceId,
filesResourceId,
INIT_CONTAINER_SUBMITTED_JARS_SECRET_KEY,
INIT_CONTAINER_SUBMITTED_FILES_SECRET_KEY,
INIT_CONTAINER_STAGING_SERVER_TRUSTSTORE_SECRET_KEY,
INIT_CONTAINER_STAGING_SERVER_CLIENT_CERT_SECRET_KEY,
resourceStagingServerInternalSslEnabled,
maybeResourceStagingServerInternalTrustStore,
maybeResourceStagingServerInternalClientCert,
maybeResourceStagingServerInternalTrustStorePassword,
maybeResourceStagingServerInternalTrustStoreType,
INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH)
// Configure the init-container with the internal URI over the external URI.
maybeResourceStagingServerInternalUri.getOrElse(stagingServerUri),
jarsResourceId,
filesResourceId,
INIT_CONTAINER_SUBMITTED_JARS_SECRET_KEY,
INIT_CONTAINER_SUBMITTED_FILES_SECRET_KEY,
INIT_CONTAINER_STAGING_SERVER_TRUSTSTORE_SECRET_KEY,
INIT_CONTAINER_STAGING_SERVER_CLIENT_CERT_SECRET_KEY,
resourceStagingServerInternalSslEnabled,
maybeResourceStagingServerInternalTrustStore,
maybeResourceStagingServerInternalClientCert,
maybeResourceStagingServerInternalTrustStorePassword,
maybeResourceStagingServerInternalTrustStoreType,
INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH)
}
new SparkInitContainerConfigMapBuilderImpl(
sparkJars,
sparkFiles,
jarsDownloadPath,
filesDownloadPath,
configMapName,
configMapKey,
submittedDependencyConfigPlugin)
sparkJars,
sparkFiles,
jarsDownloadPath,
filesDownloadPath,
configMapName,
configMapKey,
submittedDependencyConfigPlugin)
}

override def provideContainerLocalizedFilesResolver(): ContainerLocalizedFilesResolver = {
Expand All @@ -158,14 +156,13 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
driverPodLabels: Map[String, String]): Option[SubmittedDependencyUploader] = {
maybeResourceStagingServerUri.map { stagingServerUri =>
new SubmittedDependencyUploaderImpl(
kubernetesAppId,
driverPodLabels,
namespace,
stagingServerUri,
sparkJars,
sparkFiles,
resourceStagingServerExternalSslOptions,
RetrofitClientFactoryImpl)
driverPodLabels,
namespace,
stagingServerUri,
sparkJars,
sparkFiles,
resourceStagingServerExternalSslOptions,
RetrofitClientFactoryImpl)
}
}

Expand All @@ -178,15 +175,15 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
filesResourceSecret <- maybeSubmittedResourceSecrets.map(_.filesResourceSecret)
} yield {
new SubmittedDependencySecretBuilderImpl(
secretName,
jarsResourceSecret,
filesResourceSecret,
INIT_CONTAINER_SUBMITTED_JARS_SECRET_KEY,
INIT_CONTAINER_SUBMITTED_FILES_SECRET_KEY,
INIT_CONTAINER_STAGING_SERVER_TRUSTSTORE_SECRET_KEY,
INIT_CONTAINER_STAGING_SERVER_CLIENT_CERT_SECRET_KEY,
maybeResourceStagingServerInternalTrustStore,
maybeResourceStagingServerInternalClientCert)
secretName,
jarsResourceSecret,
filesResourceSecret,
INIT_CONTAINER_SUBMITTED_JARS_SECRET_KEY,
INIT_CONTAINER_SUBMITTED_FILES_SECRET_KEY,
INIT_CONTAINER_STAGING_SERVER_TRUSTSTORE_SECRET_KEY,
INIT_CONTAINER_STAGING_SERVER_CLIENT_CERT_SECRET_KEY,
maybeResourceStagingServerInternalTrustStore,
maybeResourceStagingServerInternalClientCert)
}
}

Expand All @@ -196,13 +193,13 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
secret, INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH)
}
new SparkPodInitContainerBootstrapImpl(
initContainerImage,
dockerImagePullPolicy,
jarsDownloadPath,
filesDownloadPath,
downloadTimeoutMinutes,
configMapName,
configMapKey,
resourceStagingServerSecretPlugin)
initContainerImage,
dockerImagePullPolicy,
jarsDownloadPath,
filesDownloadPath,
downloadTimeoutMinutes,
configMapName,
configMapKey,
resourceStagingServerSecretPlugin)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ private[spark] trait SubmittedDependencyUploader {
* Resource Staging Service.
*/
private[spark] class SubmittedDependencyUploaderImpl(
kubernetesAppId: String,
podLabels: Map[String, String],
podNamespace: String,
stagingServerUri: String,
Expand Down
Loading