diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index c1f3a3ca653b9..dcfa70a85a970 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -66,7 +66,7 @@ The Spark master, specified either via passing the `--master` command line argum master string with `k8s://` will cause the Spark application to launch on the Kubernetes cluster, with the API server being contacted at `api_server_url`. If no HTTP protocol is specified in the URL, it defaults to `https`. For example, setting the master to `k8s://example.com:443` is equivalent to setting it to `k8s://https://example.com:443`, but to -connect without SSL on a different port, the master would be set to `k8s://http://example.com:8443`. +connect without TLS on a different port, the master would be set to `k8s://http://example.com:8443`. If you have a Kubernetes cluster setup, one way to discover the apiserver URL is by executing `kubectl cluster-info`. @@ -119,20 +119,20 @@ is currently supported. ## Advanced -### Setting Up SSL For Submitting the Driver +### Setting Up TLS For Submitting the Driver When submitting to Kubernetes, a pod is started for the driver, and the pod starts an HTTP server. This HTTP server receives the driver's configuration, including uploaded driver jars, from the client before starting the application. -Spark supports using SSL to encrypt the traffic in this bootstrapping process. It is recommended to configure this +Spark supports using TLS to encrypt the traffic in this bootstrapping process. It is recommended to configure this whenever possible. See the [security page](security.html) and [configuration](configuration.html) sections for more information on -configuring SSL; use the prefix `spark.ssl.kubernetes.submit` in configuring the SSL-related fields in the context +configuring TLS; use the prefix `spark.ssl.kubernetes.submission` in configuring the TLS-related fields in the context of submitting to Kubernetes. For example, to set the trustStore used when the local machine communicates with the driver -pod in starting the application, set `spark.ssl.kubernetes.submit.trustStore`. +pod in starting the application, set `spark.ssl.kubernetes.submission.trustStore`. One note about the keyStore is that it can be specified as either a file on the client machine or a file in the -container image's disk. Thus `spark.ssl.kubernetes.submit.keyStore` can be a URI with a scheme of either `file:` +container image's disk. Thus `spark.ssl.kubernetes.submission.keyStore` can be a URI with a scheme of either `file:` or `local:`. A scheme of `file:` corresponds to the keyStore being located on the client machine; it is mounted onto the driver container as a [secret volume](https://kubernetes.io/docs/user-guide/secrets/). When the URI has the scheme `local:`, the file is assumed to already be on the container's disk at the appropriate path. @@ -200,42 +200,88 @@ from the other deployment modes. See the [configuration page](configuration.html - spark.kubernetes.submit.caCertFile + spark.kubernetes.authenticate.submission.caCertFile (none) - CA cert file for connecting to Kubernetes over SSL. This file should be located on the submitting machine's disk. + Path to the CA cert file for connecting to the Kubernetes API server over TLS when starting the driver. This file + must be located on the submitting machine's disk. Specify this as a path as opposed to a URI (i.e. do not provide + a scheme). - spark.kubernetes.submit.clientKeyFile + spark.kubernetes.authenticate.submission.clientKeyFile (none) - Client key file for authenticating against the Kubernetes API server. This file should be located on the submitting - machine's disk. + Path to the client key file for authenticating against the Kubernetes API server when starting the driver. This file + must be located on the submitting machine's disk. Specify this as a path as opposed to a URI (i.e. do not provide + a scheme). - spark.kubernetes.submit.clientCertFile + spark.kubernetes.authenticate.submission.clientCertFile (none) - Client cert file for authenticating against the Kubernetes API server. This file should be located on the submitting - machine's disk. + Path to the client cert file for authenticating against the Kubernetes API server when starting the driver. This + file must be located on the submitting machine's disk. Specify this as a path as opposed to a URI (i.e. do not + provide a scheme). - spark.kubernetes.submit.oauthToken + spark.kubernetes.authenticate.submission.oauthToken (none) - OAuth token to use when authenticating against the against the Kubernetes API server. Note that unlike the other - authentication options, this should be the exact string value of the token to use for the authentication. + OAuth token to use when authenticating against the Kubernetes API server when starting the driver. Note + that unlike the other authentication options, this is expected to be the exact string value of the token to use for + the authentication. - spark.kubernetes.submit.serviceAccountName + spark.kubernetes.authenticate.driver.caCertFile + (none) + + Path to the CA cert file for connecting to the Kubernetes API server over TLS from the driver pod when requesting + executors. This file must be located on the submitting machine's disk, and will be uploaded to the driver pod. + Specify this as a path as opposed to a URI (i.e. do not provide a scheme). + + + + spark.kubernetes.authenticate.driver.clientKeyFile + (none) + + Path to the client key file for authenticating against the Kubernetes API server from the driver pod when requesting + executors. This file must be located on the submitting machine's disk, and will be uploaded to the driver pod. + Specify this as a path as opposed to a URI (i.e. do not provide a scheme). If this is specified, it is highly + recommended to set up TLS for the driver submission server, as this value is sensitive information that would be + passed to the driver pod in plaintext otherwise. + + + + spark.kubernetes.authenticate.driver.clientCertFile + (none) + + Path to the client cert file for authenticating against the Kubernetes API server from the driver pod when + requesting executors. This file must be located on the submitting machine's disk, and will be uploaded to the + driver pod. Specify this as a path as opposed to a URI (i.e. do not provide a scheme). + + + + spark.kubernetes.authenticate.driver.oauthToken + (none) + + OAuth token to use when authenticating against the against the Kubernetes API server from the driver pod when + requesting executors. Note that unlike the other authentication options, this must be the exact string value of + the token to use for the authentication. This token value is uploaded to the driver pod. If this is specified, it is + highly recommended to set up TLS for the driver submission server, as this value is sensitive information that would + be passed to the driver pod in plaintext otherwise. + + + + spark.kubernetes.authenticate.driver.serviceAccountName default Service account that is used when running the driver pod. The driver pod uses this service account when requesting - executor pods from the API server. + executor pods from the API server. Note that this cannot be specified alongside a CA cert file, client key file, + client cert file, and/or OAuth token. @@ -281,7 +327,7 @@ from the other deployment modes. See the [configuration page](configuration.html - spark.kubernetes.driverSubmitTimeout + spark.kubernetes.driverSubmissionTimeout 60s Time to wait for the driver pod to start running before aborting its execution. @@ -296,7 +342,7 @@ from the other deployment modes. See the [configuration page](configuration.html - spark.kubernetes.submit.waitAppCompletion + spark.kubernetes.submission.waitAppCompletion true In cluster mode, whether to wait for the application to finish before exiting the launcher process. When changed to diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 6f715ebad2d75..e6b2e31568653 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -21,7 +21,6 @@ import java.security.SecureRandom import java.util.ServiceLoader import java.util.concurrent.{CountDownLatch, TimeUnit} -import com.google.common.base.Charsets import com.google.common.io.Files import com.google.common.util.concurrent.SettableFuture import io.fabric8.kubernetes.api.model._ @@ -33,7 +32,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.deploy.rest.{AppResource, ContainerAppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, UploadedAppResource} +import org.apache.spark.deploy.rest.{AppResource, ContainerAppResource, KubernetesCreateSubmissionRequest, KubernetesCredentials, RemoteAppResource, UploadedAppResource} import org.apache.spark.deploy.rest.kubernetes._ import org.apache.spark.internal.Logging import org.apache.spark.util.{ShutdownHookManager, Utils} @@ -53,7 +52,7 @@ private[spark] class Client( .getOrElse("spark") private val kubernetesAppId = s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-") private val secretName = s"$SUBMISSION_APP_SECRET_PREFIX-$kubernetesAppId" - private val secretDirectory = s"$DRIVER_CONTAINER_SECRETS_BASE_DIR/$kubernetesAppId" + private val secretDirectory = s"$DRIVER_CONTAINER_SUBMISSION_SECRETS_BASE_DIR/$kubernetesAppId" private val driverDockerImage = sparkConf.get(DRIVER_DOCKER_IMAGE) private val uiPort = sparkConf.getInt("spark.ui.port", DEFAULT_UI_PORT) private val driverSubmitTimeoutSecs = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TIMEOUT) @@ -119,23 +118,22 @@ private[spark] class Client( customAnnotations, KUBERNETES_DRIVER_ANNOTATIONS.key, "annotations") + val driverPodKubernetesCredentials = new DriverPodKubernetesCredentialsProvider(sparkConf).get() var k8ConfBuilder = new K8SConfigBuilder() .withApiVersion("v1") .withMasterUrl(master) .withNamespace(namespace) - sparkConf.get(KUBERNETES_CA_CERT_FILE).foreach { + sparkConf.get(KUBERNETES_SUBMIT_CA_CERT_FILE).foreach { f => k8ConfBuilder = k8ConfBuilder.withCaCertFile(f) } - sparkConf.get(KUBERNETES_CLIENT_KEY_FILE).foreach { + sparkConf.get(KUBERNETES_SUBMIT_CLIENT_KEY_FILE).foreach { f => k8ConfBuilder = k8ConfBuilder.withClientKeyFile(f) } - sparkConf.get(KUBERNETES_CLIENT_CERT_FILE).foreach { + sparkConf.get(KUBERNETES_SUBMIT_CLIENT_CERT_FILE).foreach { f => k8ConfBuilder = k8ConfBuilder.withClientCertFile(f) } - sparkConf.get(KUBERNETES_OAUTH_TOKEN).foreach { token => + sparkConf.get(KUBERNETES_SUBMIT_OAUTH_TOKEN).foreach { token => k8ConfBuilder = k8ConfBuilder.withOauthToken(token) - // Remove the oauth token from Spark conf so that its doesn't appear in the Spark UI. - sparkConf.set(KUBERNETES_OAUTH_TOKEN, "") } val k8ClientConfig = k8ConfBuilder.build @@ -174,11 +172,6 @@ private[spark] class Client( .done() kubernetesResourceCleaner.registerOrUpdateResource(submitServerSecret) val sslConfiguration = sslConfigurationProvider.getSslConfiguration() - val driverKubernetesSelectors = (Map( - SPARK_DRIVER_LABEL -> kubernetesAppId, - SPARK_APP_ID_LABEL -> kubernetesAppId, - SPARK_APP_NAME_LABEL -> appName) - ++ parsedCustomLabels) val (driverPod, driverService) = launchDriverKubernetesComponents( kubernetesClient, driverServiceManager, @@ -198,7 +191,8 @@ private[spark] class Client( sslConfiguration, driverService, submitterLocalFiles, - submitterLocalJars) + submitterLocalJars, + driverPodKubernetesCredentials) // Now that the application has started, persist the components that were created beyond // the shutdown hook. We still want to purge the one-time secrets, so do not unregister // those. @@ -245,7 +239,8 @@ private[spark] class Client( sslConfiguration: SslConfiguration, driverService: Service, submitterLocalFiles: Iterable[String], - submitterLocalJars: Iterable[String]): Unit = { + submitterLocalJars: Iterable[String], + driverPodKubernetesCredentials: KubernetesCredentials): Unit = { sparkConf.getOption("spark.app.id").foreach { id => logWarning(s"Warning: Provided app id in spark.app.id as $id will be" + s" overridden as $kubernetesAppId") @@ -257,6 +252,12 @@ private[spark] class Client( sparkConf.setIfMissing("spark.driver.port", DEFAULT_DRIVER_PORT.toString) sparkConf.setIfMissing("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT.toString) + sparkConf.get(KUBERNETES_SUBMIT_OAUTH_TOKEN).foreach { _ => + sparkConf.set(KUBERNETES_SUBMIT_OAUTH_TOKEN, "") + } + sparkConf.get(KUBERNETES_DRIVER_OAUTH_TOKEN).foreach { _ => + sparkConf.set(KUBERNETES_DRIVER_OAUTH_TOKEN, "") + } val driverSubmitter = buildDriverSubmissionClient( kubernetesClient, driverServiceManager, @@ -266,7 +267,10 @@ private[spark] class Client( driverSubmitter.ping() logInfo(s"Submitting local resources to driver pod for application " + s"$kubernetesAppId ...") - val submitRequest = buildSubmissionRequest(submitterLocalFiles, submitterLocalJars) + val submitRequest = buildSubmissionRequest( + submitterLocalFiles, + submitterLocalJars, + driverPodKubernetesCredentials) driverSubmitter.submitApplication(submitRequest) logInfo("Successfully submitted local resources and driver configuration to" + " driver pod.") @@ -449,7 +453,7 @@ private[spark] class Client( .endSecret() .endVolume() .addToVolumes(sslConfiguration.sslPodVolumes: _*) - .withServiceAccount(serviceAccount) + .withServiceAccount(serviceAccount.getOrElse("default")) .addNewContainer() .withName(DRIVER_CONTAINER_NAME) .withImage(driverDockerImage) @@ -625,7 +629,8 @@ private[spark] class Client( private def buildSubmissionRequest( submitterLocalFiles: Iterable[String], - submitterLocalJars: Iterable[String]): KubernetesCreateSubmissionRequest = { + submitterLocalJars: Iterable[String], + driverPodKubernetesCredentials: KubernetesCredentials): KubernetesCreateSubmissionRequest = { val mainResourceUri = Utils.resolveURI(mainAppResource) val resolvedAppResource: AppResource = Option(mainResourceUri.getScheme) .getOrElse("file") match { @@ -648,7 +653,8 @@ private[spark] class Client( secret = secretBase64String, sparkProperties = sparkConf.getAll.toMap, uploadedJarsBase64Contents = uploadJarsBase64Contents, - uploadedFilesBase64Contents = uploadFilesBase64Contents) + uploadedFilesBase64Contents = uploadFilesBase64Contents, + driverPodKubernetesCredentials = driverPodKubernetesCredentials) } private def buildDriverSubmissionClient( diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/DriverPodKubernetesCredentialsProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/DriverPodKubernetesCredentialsProvider.scala new file mode 100644 index 0000000000000..cee47aad79393 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/DriverPodKubernetesCredentialsProvider.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes + +import java.io.File + +import com.google.common.io.{BaseEncoding, Files} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.deploy.rest.KubernetesCredentials +import org.apache.spark.internal.config.OptionalConfigEntry + +private[spark] class DriverPodKubernetesCredentialsProvider(sparkConf: SparkConf) { + + def get(): KubernetesCredentials = { + sparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME).foreach { _ => + require(sparkConf.get(KUBERNETES_DRIVER_OAUTH_TOKEN).isEmpty, + "Cannot specify both a service account and a driver pod OAuth token.") + require(sparkConf.get(KUBERNETES_DRIVER_CA_CERT_FILE).isEmpty, + "Cannot specify both a service account and a driver pod CA cert file.") + require(sparkConf.get(KUBERNETES_DRIVER_CLIENT_KEY_FILE).isEmpty, + "Cannot specify both a service account and a driver pod client key file.") + require(sparkConf.get(KUBERNETES_DRIVER_CLIENT_CERT_FILE).isEmpty, + "Cannot specify both a service account and a driver pod client cert file.") + } + val oauthToken = sparkConf.get(KUBERNETES_DRIVER_OAUTH_TOKEN) + val caCertDataBase64 = safeFileConfToBase64(KUBERNETES_DRIVER_CA_CERT_FILE, + s"Driver CA cert file provided at %s does not exist or is not a file.") + val clientKeyDataBase64 = safeFileConfToBase64(KUBERNETES_DRIVER_CLIENT_KEY_FILE, + s"Driver client key file provided at %s does not exist or is not a file.") + val clientCertDataBase64 = safeFileConfToBase64(KUBERNETES_DRIVER_CLIENT_CERT_FILE, + s"Driver client cert file provided at %s does not exist or is not a file.") + val serviceAccountName = sparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME) + KubernetesCredentials( + oauthToken = oauthToken, + caCertDataBase64 = caCertDataBase64, + clientKeyDataBase64 = clientKeyDataBase64, + clientCertDataBase64 = clientCertDataBase64) + } + + private def safeFileConfToBase64( + conf: OptionalConfigEntry[String], + fileNotFoundFormatString: String): Option[String] = { + sparkConf.get(conf) + .map(new File(_)) + .map { file => + require(file.isFile, String.format(fileNotFoundFormatString, file.getAbsolutePath)) + BaseEncoding.base64().encode(Files.toByteArray(file)) + } + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala index 89369b30694ee..554ed17ff25c4 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala @@ -22,33 +22,62 @@ import com.google.common.base.Charsets import com.google.common.io.Files import io.fabric8.kubernetes.client.{Config, ConfigBuilder, DefaultKubernetesClient} +import org.apache.spark.SparkConf +import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -private[spark] object KubernetesClientBuilder { - private val API_SERVER_TOKEN = new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH) - private val CA_CERT_FILE = new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH) +private[spark] class KubernetesClientBuilder(sparkConf: SparkConf, namespace: String) { + private val SERVICE_ACCOUNT_TOKEN = new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH) + private val SERVICE_ACCOUNT_CA_CERT = new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH) + private val oauthTokenFile = sparkConf.get(KUBERNETES_DRIVER_MOUNTED_OAUTH_TOKEN) + private val caCertFile = sparkConf.get(KUBERNETES_DRIVER_MOUNTED_CA_CERT_FILE) + private val clientKeyFile = sparkConf.get(KUBERNETES_DRIVER_MOUNTED_CLIENT_KEY_FILE) + private val clientCertFile = sparkConf.get(KUBERNETES_DRIVER_MOUNTED_CLIENT_CERT_FILE) /** - * Creates a {@link KubernetesClient}, expecting to be from - * within the context of a pod. When doing so, credentials files - * are picked up from canonical locations, as they are injected - * into the pod's disk space. + * Creates a {@link KubernetesClient}, expecting to be from within the context of a pod. When + * doing so, service account token files can be picked up from canonical locations. */ - def buildFromWithinPod( - kubernetesNamespace: String): DefaultKubernetesClient = { - var clientConfigBuilder = new ConfigBuilder() + def buildFromWithinPod(): DefaultKubernetesClient = { + val baseClientConfigBuilder = new ConfigBuilder() .withApiVersion("v1") .withMasterUrl(KUBERNETES_MASTER_INTERNAL_URL) - .withNamespace(kubernetesNamespace) + .withNamespace(namespace) - if (CA_CERT_FILE.isFile) { - clientConfigBuilder = clientConfigBuilder.withCaCertFile(CA_CERT_FILE.getAbsolutePath) - } + val configBuilder = oauthTokenFile + .orElse(caCertFile) + .orElse(clientKeyFile) + .orElse(clientCertFile) + .map { _ => + var mountedAuthConfigBuilder = baseClientConfigBuilder + oauthTokenFile.foreach { tokenFilePath => + val tokenFile = new File(tokenFilePath) + mountedAuthConfigBuilder = mountedAuthConfigBuilder + .withOauthToken(Files.toString(tokenFile, Charsets.UTF_8)) + } + caCertFile.foreach { caFile => + mountedAuthConfigBuilder = mountedAuthConfigBuilder.withCaCertFile(caFile) + } + clientKeyFile.foreach { keyFile => + mountedAuthConfigBuilder = mountedAuthConfigBuilder.withClientKeyFile(keyFile) + } + clientCertFile.foreach { certFile => + mountedAuthConfigBuilder = mountedAuthConfigBuilder.withClientCertFile(certFile) + } + mountedAuthConfigBuilder + }.getOrElse { + var serviceAccountConfigBuilder = baseClientConfigBuilder + if (SERVICE_ACCOUNT_CA_CERT.isFile) { + serviceAccountConfigBuilder = serviceAccountConfigBuilder.withCaCertFile( + SERVICE_ACCOUNT_CA_CERT.getAbsolutePath) + } - if (API_SERVER_TOKEN.isFile) { - clientConfigBuilder = clientConfigBuilder.withOauthToken( - Files.toString(API_SERVER_TOKEN, Charsets.UTF_8)) + if (SERVICE_ACCOUNT_TOKEN.isFile) { + serviceAccountConfigBuilder = serviceAccountConfigBuilder.withOauthToken( + Files.toString(SERVICE_ACCOUNT_TOKEN, Charsets.UTF_8)) + } + serviceAccountConfigBuilder } - new DefaultKubernetesClient(clientConfigBuilder.build) + new DefaultKubernetesClient(configBuilder.build) } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SslConfigurationProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SslConfigurationProvider.scala index 4c031fcba91ab..4bbe3ed385a4d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SslConfigurationProvider.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SslConfigurationProvider.scala @@ -49,7 +49,8 @@ private[spark] class SslConfigurationProvider( kubernetesResourceCleaner: KubernetesResourceCleaner) { private val SECURE_RANDOM = new SecureRandom() private val sslSecretsName = s"$SUBMISSION_SSL_SECRETS_PREFIX-$kubernetesAppId" - private val sslSecretsDirectory = s"$DRIVER_CONTAINER_SECRETS_BASE_DIR/$kubernetesAppId-ssl" + private val sslSecretsDirectory = DRIVER_CONTAINER_SUBMISSION_SECRETS_BASE_DIR + + s"/$kubernetesAppId-ssl" def getSslConfiguration(): SslConfiguration = { val (driverSubmitSslOptions, isKeyStoreLocalFile) = parseDriverSubmitSslOptions() diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index 0c4269080335f..e33c761ecc8d1 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -27,233 +27,240 @@ package object config { private[spark] val KUBERNETES_NAMESPACE = ConfigBuilder("spark.kubernetes.namespace") - .doc(""" - | The namespace that will be used for running the driver and - | executor pods. When using spark-submit in cluster mode, - | this can also be passed to spark-submit via the - | --kubernetes-namespace command line argument. - """.stripMargin) + .doc("The namespace that will be used for running the driver and executor pods. When using" + + " spark-submit in cluster mode, this can also be passed to spark-submit via the" + + " --kubernetes-namespace command line argument.") .stringConf .createWithDefault("default") private[spark] val DRIVER_DOCKER_IMAGE = ConfigBuilder("spark.kubernetes.driver.docker.image") - .doc(""" - | Docker image to use for the driver. Specify this using the - | standard Docker tag format. - """.stripMargin) + .doc("Docker image to use for the driver. Specify this using the standard Docker tag format.") .stringConf .createWithDefault(s"spark-driver:$sparkVersion") private[spark] val EXECUTOR_DOCKER_IMAGE = ConfigBuilder("spark.kubernetes.executor.docker.image") - .doc(""" - | Docker image to use for the executors. Specify this using - | the standard Docker tag format. - """.stripMargin) + .doc("Docker image to use for the executors. Specify this using the standard Docker tag" + + " format.") .stringConf .createWithDefault(s"spark-executor:$sparkVersion") - private[spark] val KUBERNETES_CA_CERT_FILE = - ConfigBuilder("spark.kubernetes.submit.caCertFile") - .doc(""" - | CA cert file for connecting to Kubernetes over SSL. This - | file should be located on the submitting machine's disk. - """.stripMargin) + private val APISERVER_SUBMIT_CONF_PREFIX = "spark.kubernetes.authenticate.submission" + private val APISERVER_DRIVER_CONF_PREFIX = "spark.kubernetes.authenticate.driver" + + private[spark] val KUBERNETES_SUBMIT_CA_CERT_FILE = + ConfigBuilder(s"$APISERVER_SUBMIT_CONF_PREFIX.caCertFile") + .doc("Path to the CA cert file for connecting to Kubernetes over SSL when creating" + + " Kubernetes resources for the driver. This file should be located on the submitting" + + " machine's disk.") + .stringConf + .createOptional + + private[spark] val KUBERNETES_SUBMIT_CLIENT_KEY_FILE = + ConfigBuilder(s"$APISERVER_SUBMIT_CONF_PREFIX.clientKeyFile") + .doc("Path to the client key file for authenticating against the Kubernetes API server" + + " when initially creating Kubernetes resources for the driver. This file should be" + + " located on the submitting machine's disk.") + .stringConf + .createOptional + + private[spark] val KUBERNETES_SUBMIT_CLIENT_CERT_FILE = + ConfigBuilder(s"$APISERVER_SUBMIT_CONF_PREFIX.clientCertFile") + .doc("Path to the client cert file for authenticating against the Kubernetes API server" + + " when initially creating Kubernetes resources for the driver. This file should be" + + " located on the submitting machine's disk.") + .stringConf + .createOptional + + private[spark] val KUBERNETES_SUBMIT_OAUTH_TOKEN = + ConfigBuilder(s"$APISERVER_SUBMIT_CONF_PREFIX.oauthToken") + .doc("OAuth token to use when authenticating against the against the Kubernetes API server" + + " when initially creating Kubernetes resources for the driver. Note that unlike the other" + + " authentication options, this should be the exact string value of the token to use for" + + " the authentication.") + .stringConf + .createOptional + + private[spark] val KUBERNETES_DRIVER_CA_CERT_FILE = + ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.caCertFile") + .doc("Path to the CA cert file for connecting to Kubernetes over TLS from the driver pod" + + " when requesting executors. This file should be located on the submitting machine's disk" + + " and will be uploaded to the driver pod.") + .stringConf + .createOptional + + private[spark] val KUBERNETES_DRIVER_CLIENT_KEY_FILE = + ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.clientKeyFile") + .doc("Path to the client key file for authenticating against the Kubernetes API server from" + + " the driver pod when requesting executors. This file should be located on the submitting" + + " machine's disk, and will be uploaded to the driver pod.") + .stringConf + .createOptional + + private[spark] val KUBERNETES_DRIVER_CLIENT_CERT_FILE = + ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.clientCertFile") + .doc("Path to the client cert file for authenticating against the Kubernetes API server" + + " from the driver pod when requesting executors. This file should be located on the" + + " submitting machine's disk, and will be uploaded to the driver pod.") .stringConf .createOptional - private[spark] val KUBERNETES_CLIENT_KEY_FILE = - ConfigBuilder("spark.kubernetes.submit.clientKeyFile") - .doc(""" - | Client key file for authenticating against the Kubernetes - | API server. This file should be located on the submitting - | machine's disk. - """.stripMargin) + private[spark] val KUBERNETES_DRIVER_OAUTH_TOKEN = + ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.oauthToken") + .doc("OAuth token to use when authenticating against the Kubernetes API server from the" + + " driver pod when requesting executors. Note that unlike the other authentication options" + + " this should be the exact string value of the token to use for the authentication. This" + + " token value is mounted as a secret on the driver pod.") .stringConf .createOptional - private[spark] val KUBERNETES_CLIENT_CERT_FILE = - ConfigBuilder("spark.kubernetes.submit.clientCertFile") - .doc(""" - | Client cert file for authenticating against the - | Kubernetes API server. This file should be located on - | the submitting machine's disk. - """.stripMargin) + private[spark] val KUBERNETES_DRIVER_MOUNTED_CA_CERT_FILE = + ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.mounted.caCertFile") + .doc("Path on the driver pod's disk containing the CA cert file to use when authenticating" + + " against Kubernetes.") .stringConf .createOptional - private[spark] val KUBERNETES_OAUTH_TOKEN = - ConfigBuilder("spark.kubernetes.submit.oauthToken") - .doc(""" - | OAuth token to use when authenticating against the - | against the Kubernetes API server. Note that unlike - | the other authentication options, this should be the - | exact string value of the token to use for the - | authentication. - """.stripMargin) + private[spark] val KUBERNETES_DRIVER_MOUNTED_CLIENT_KEY_FILE = + ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.mounted.clientKeyFile") + .doc("Path on the driver pod's disk containing the client key file to use when" + + " authenticating against Kubernetes.") + .internal() + .stringConf + .createOptional + + private[spark] val KUBERNETES_DRIVER_MOUNTED_CLIENT_CERT_FILE = + ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.mounted.clientCertFile") + .doc("Path on the driver pod's disk containing the client cert file to use when" + + " authenticating against Kubernetes.") + .internal() + .stringConf + .createOptional + + private[spark] val KUBERNETES_DRIVER_MOUNTED_OAUTH_TOKEN = + ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.mounted.oauthTokenFile") + .doc("Path on the driver pod's disk containing the OAuth token file to use when" + + " authenticating against Kubernetes.") + .internal() .stringConf .createOptional private[spark] val KUBERNETES_SERVICE_ACCOUNT_NAME = - ConfigBuilder("spark.kubernetes.submit.serviceAccountName") - .doc(""" - | Service account that is used when running the driver pod. - | The driver pod uses this service account when requesting - | executor pods from the API server. - """.stripMargin) + ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.serviceAccountName") + .doc("Service account that is used when running the driver pod. The driver pod uses" + + " this service account when requesting executor pods from the API server. If specific" + + " credentials are given for the driver pod to use, the driver will favor" + + " using those credentials instead.") .stringConf - .createWithDefault("default") + .createOptional // Note that while we set a default for this when we start up the // scheduler, the specific default value is dynamically determined // based on the executor memory. private[spark] val KUBERNETES_EXECUTOR_MEMORY_OVERHEAD = ConfigBuilder("spark.kubernetes.executor.memoryOverhead") - .doc(""" - | The amount of off-heap memory (in megabytes) to be - | allocated per executor. This is memory that accounts for - | things like VM overheads, interned strings, other native - | overheads, etc. This tends to grow with the executor size - | (typically 6-10%). - """.stripMargin) + .doc("The amount of off-heap memory (in megabytes) to be allocated per executor. This" + + " is memory that accounts for things like VM overheads, interned strings, other native" + + " overheads, etc. This tends to grow with the executor size. (typically 6-10%).") .bytesConf(ByteUnit.MiB) .createOptional private[spark] val KUBERNETES_DRIVER_MEMORY_OVERHEAD = ConfigBuilder("spark.kubernetes.driver.memoryOverhead") - .doc(""" - | The amount of off-heap memory (in megabytes) to be - | allocated for the driver and the driver submission server. - | This is memory that accounts for things like VM overheads, - | interned strings, other native overheads, etc. This tends - | to grow with the driver's memory size (typically 6-10%). - """.stripMargin) + .doc("The amount of off-heap memory (in megabytes) to be allocated for the driver and the" + + " driver submission server. This is memory that accounts for things like VM overheads," + + " interned strings, other native overheads, etc. This tends to grow with the driver's" + + " memory size (typically 6-10%).") .bytesConf(ByteUnit.MiB) .createOptional private[spark] val KUBERNETES_DRIVER_LABELS = ConfigBuilder("spark.kubernetes.driver.labels") - .doc(""" - | Custom labels that will be added to the driver pod. - | This should be a comma-separated list of label key-value - | pairs, where each label is in the format key=value. Note - | that Spark also adds its own labels to the driver pod - | for bookkeeping purposes. - """.stripMargin) + .doc("Custom labels that will be added to the driver pod. This should be a comma-separated" + + " list of label key-value pairs, where each label is in the format key=value. Note that" + + " Spark also adds its own labels to the driver pod for bookkeeping purposes.") .stringConf .createOptional private[spark] val KUBERNETES_DRIVER_ANNOTATIONS = ConfigBuilder("spark.kubernetes.driver.annotations") - .doc(""" - | Custom annotations that will be added to the driver pod. - | This should be a comma-separated list of annotation key-value - | pairs, where each annotation is in the format key=value. - """.stripMargin) + .doc("Custom annotations that will be added to the driver pod. This should be a" + + " comma-separated list of annotation key-value pairs, where each annotation is in the" + + " format key=value.") .stringConf .createOptional private[spark] val KUBERNETES_DRIVER_SUBMIT_TIMEOUT = - ConfigBuilder("spark.kubernetes.driverSubmitTimeout") - .doc(""" - | Time to wait for the driver process to start running - | before aborting its execution. - """.stripMargin) + ConfigBuilder("spark.kubernetes.driverSubmissionTimeout") + .doc("Time to wait for the driver process to start running before aborting its execution.") .timeConf(TimeUnit.SECONDS) .createWithDefault(60L) private[spark] val KUBERNETES_DRIVER_SUBMIT_KEYSTORE = - ConfigBuilder("spark.ssl.kubernetes.submit.keyStore") - .doc(""" - | KeyStore file for the driver submission server listening - | on SSL. Can be pre-mounted on the driver container - | or uploaded from the submitting client. - """.stripMargin) + ConfigBuilder("spark.ssl.kubernetes.submission.keyStore") + .doc("KeyStore file for the driver submission server listening on SSL. Can be pre-mounted" + + " on the driver container or uploaded from the submitting client.") .stringConf .createOptional private[spark] val KUBERNETES_DRIVER_SUBMIT_TRUSTSTORE = - ConfigBuilder("spark.ssl.kubernetes.submit.trustStore") - .doc(""" - | TrustStore containing certificates for communicating - | to the driver submission server over SSL. - """.stripMargin) + ConfigBuilder("spark.ssl.kubernetes.submission.trustStore") + .doc("TrustStore containing certificates for communicating to the driver submission server" + + " over SSL.") .stringConf .createOptional private[spark] val DRIVER_SUBMIT_SSL_ENABLED = - ConfigBuilder("spark.ssl.kubernetes.submit.enabled") - .doc(""" - | Whether or not to use SSL when sending the - | application dependencies to the driver pod. - | - """.stripMargin) + ConfigBuilder("spark.ssl.kubernetes.submission.enabled") + .doc("Whether or not to use SSL when sending the application dependencies to the driver pod.") .booleanConf .createWithDefault(false) private[spark] val KUBERNETES_DRIVER_SERVICE_NAME = ConfigBuilder("spark.kubernetes.driver.service.name") - .doc(""" - | Kubernetes service that exposes the driver pod - | for external access. - """.stripMargin) + .doc("Kubernetes service that exposes the driver pod for external access.") .internal() .stringConf .createOptional private[spark] val KUBERNETES_DRIVER_SUBMIT_SERVER_MEMORY = ConfigBuilder("spark.kubernetes.driver.submissionServerMemory") - .doc(""" - | The amount of memory to allocate for the driver submission server. - """.stripMargin) + .doc("The amount of memory to allocate for the driver submission server.") .bytesConf(ByteUnit.MiB) .createWithDefaultString("256m") private[spark] val EXPOSE_KUBERNETES_DRIVER_SERVICE_UI_PORT = ConfigBuilder("spark.kubernetes.driver.service.exposeUiPort") - .doc(""" - | Whether to expose the driver Web UI port as a service NodePort. Turned off by default - | because NodePort is a limited resource. Use alternatives such as Ingress if possible. - """.stripMargin) + .doc("Whether to expose the driver Web UI port as a service NodePort. Turned off by default" + + " because NodePort is a limited resource. Use alternatives if possible.") .booleanConf .createWithDefault(false) private[spark] val KUBERNETES_DRIVER_POD_NAME = ConfigBuilder("spark.kubernetes.driver.pod.name") - .doc(""" - | Name of the driver pod. - """.stripMargin) + .doc("Name of the driver pod.") .internal() .stringConf .createOptional private[spark] val DRIVER_SERVICE_MANAGER_TYPE = ConfigBuilder("spark.kubernetes.driver.serviceManagerType") - .doc(s""" - | A tag indicating which class to use for creating the - | Kubernetes service and determining its URI for the submission - | client. - """.stripMargin) + .doc("A tag indicating which class to use for creating the Kubernetes service and" + + " determining its URI for the submission client.") .stringConf .createWithDefault(NodePortUrisDriverServiceManager.TYPE) private[spark] val WAIT_FOR_APP_COMPLETION = - ConfigBuilder("spark.kubernetes.submit.waitAppCompletion") - .doc( - """ - | In cluster mode, whether to wait for the application to finish before exiting the - | launcher process. - """.stripMargin) + ConfigBuilder("spark.kubernetes.submission.waitAppCompletion") + .doc("In cluster mode, whether to wait for the application to finish before exiting the" + + " launcher process.") .booleanConf .createWithDefault(true) private[spark] val REPORT_INTERVAL = ConfigBuilder("spark.kubernetes.report.interval") - .doc( - """ - | Interval between reports of the current app status in cluster mode. - """.stripMargin) + .doc("Interval between reports of the current app status in cluster mode.") .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("1s") } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala index 4af065758e674..23d216e799fff 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala @@ -24,7 +24,8 @@ package object constants { private[spark] val SPARK_EXECUTOR_ID_LABEL = "spark-exec-id" // Secrets - private[spark] val DRIVER_CONTAINER_SECRETS_BASE_DIR = "/var/run/secrets/spark-submission" + private[spark] val DRIVER_CONTAINER_SUBMISSION_SECRETS_BASE_DIR = + "/var/run/secrets/spark-submission" private[spark] val SUBMISSION_APP_SECRET_NAME = "spark-submission-server-secret" private[spark] val SUBMISSION_APP_SECRET_PREFIX = "spark-submission-server-secret" private[spark] val SUBMISSION_APP_SECRET_VOLUME_NAME = "spark-submission-secret-volume" @@ -73,7 +74,7 @@ package object constants { // Miscellaneous private[spark] val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver" - private[spark] val KUBERNETES_SUBMIT_SSL_NAMESPACE = "kubernetes.submit" + private[spark] val KUBERNETES_SUBMIT_SSL_NAMESPACE = "kubernetes.submission" private[spark] val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc" private[spark] val MEMORY_OVERHEAD_FACTOR = 0.10 private[spark] val MEMORY_OVERHEAD_MIN = 384L diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala index 0d2d1a1c6f5e3..1ea44109c5f5e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala @@ -20,14 +20,21 @@ import com.fasterxml.jackson.annotation.{JsonSubTypes, JsonTypeInfo} import org.apache.spark.SPARK_VERSION +case class KubernetesCredentials( + oauthToken: Option[String], + caCertDataBase64: Option[String], + clientKeyDataBase64: Option[String], + clientCertDataBase64: Option[String]) + case class KubernetesCreateSubmissionRequest( - appResource: AppResource, - mainClass: String, - appArgs: Array[String], - sparkProperties: Map[String, String], - secret: String, - uploadedJarsBase64Contents: TarGzippedData, - uploadedFilesBase64Contents: TarGzippedData) extends SubmitRestProtocolRequest { + appResource: AppResource, + mainClass: String, + appArgs: Array[String], + sparkProperties: Map[String, String], + secret: String, + driverPodKubernetesCredentials: KubernetesCredentials, + uploadedJarsBase64Contents: TarGzippedData, + uploadedFilesBase64Contents: TarGzippedData) extends SubmitRestProtocolRequest { message = "create" clientSparkVersion = SPARK_VERSION } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala index 5952acc0d5916..4688521a59d38 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala @@ -31,7 +31,9 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.{SecurityManager, SPARK_VERSION => sparkVersion, SparkConf, SSLOptions} import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.rest._ +import org.apache.spark.internal.config.OptionalConfigEntry import org.apache.spark.util.{ShutdownHookManager, ThreadUtils, Utils} private case class KubernetesSparkRestServerArguments( @@ -152,6 +154,7 @@ private[spark] class KubernetesSparkRestServer( appArgs, sparkProperties, secret, + driverPodKubernetesCredentials, uploadedJars, uploadedFiles) => val decodedSecret = Base64.decodeBase64(secret) @@ -214,6 +217,8 @@ private[spark] class KubernetesSparkRestServer( } else { resolvedSparkProperties.remove("spark.files") } + resolvedSparkProperties ++= writeKubernetesCredentials( + driverPodKubernetesCredentials, tempDir) val command = new ArrayBuffer[String] command += javaExecutable @@ -280,6 +285,48 @@ private[spark] class KubernetesSparkRestServer( CompressionUtils.unpackAndWriteCompressedFiles(files, workingDir) } + private def writeKubernetesCredentials( + kubernetesCredentials: KubernetesCredentials, + rootTempDir: File): Map[String, String] = { + val resolvedDirectory = new File(rootTempDir, "kubernetes-credentials") + if (!resolvedDirectory.mkdir()) { + throw new IllegalStateException(s"Failed to create credentials dir at " + + resolvedDirectory.getAbsolutePath) + } + val oauthTokenFile = writeRawStringCredentialAndGetConf("oauth-token.txt", resolvedDirectory, + KUBERNETES_DRIVER_MOUNTED_OAUTH_TOKEN, kubernetesCredentials.oauthToken) + val caCertFile = writeBase64CredentialAndGetConf("ca.crt", resolvedDirectory, + KUBERNETES_DRIVER_MOUNTED_CA_CERT_FILE, kubernetesCredentials.caCertDataBase64) + val clientKeyFile = writeBase64CredentialAndGetConf("key.key", resolvedDirectory, + KUBERNETES_DRIVER_MOUNTED_CLIENT_KEY_FILE, kubernetesCredentials.clientKeyDataBase64) + val clientCertFile = writeBase64CredentialAndGetConf("cert.crt", resolvedDirectory, + KUBERNETES_DRIVER_MOUNTED_CLIENT_CERT_FILE, kubernetesCredentials.clientCertDataBase64) + (oauthTokenFile ++ caCertFile ++ clientKeyFile ++ clientCertFile).toMap + } + + private def writeRawStringCredentialAndGetConf( + fileName: String, + dir: File, + conf: OptionalConfigEntry[String], + credential: Option[String]): Option[(String, String)] = { + credential.map { cred => + val credentialFile = new File(dir, fileName) + Files.write(cred, credentialFile, Charsets.UTF_8) + (conf.key, credentialFile.getAbsolutePath) + } + } + + private def writeBase64CredentialAndGetConf( + fileName: String, + dir: File, + conf: OptionalConfigEntry[String], + credential: Option[String]): Option[(String, String)] = { + credential.map { cred => + val credentialFile = new File(dir, fileName) + Files.write(BaseEncoding.base64().decode(cred), credentialFile) + (conf.key, credentialFile.getAbsolutePath) + } + } /** * Retrieve the path on the driver container where the main app resource is, and what value it diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/NodePortUrisDriverServiceManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/NodePortUrisDriverServiceManager.scala index fa8362677f38f..1416476824793 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/NodePortUrisDriverServiceManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/NodePortUrisDriverServiceManager.scala @@ -39,8 +39,8 @@ private[spark] class NodePortUrisDriverServiceManager extends DriverServiceManag val urlScheme = if (sparkConf.get(DRIVER_SUBMIT_SSL_ENABLED)) { "https" } else { - logWarning("Submitting application details, application secret, and local" + - " jars to the cluster over an insecure connection. You should configure SSL" + + logWarning("Submitting application details, application secret, Kubernetes credentials," + + " and local jars to the cluster over an insecure connection. You should configure SSL" + " to secure this step.") "http" } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 90907ff83ed84..234829a541c30 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -16,17 +16,14 @@ */ package org.apache.spark.scheduler.cluster.kubernetes -import java.util.UUID -import java.util.concurrent.Executors import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} -import com.google.common.util.concurrent.ThreadFactoryBuilder import io.fabric8.kubernetes.api.model.{ContainerPortBuilder, EnvVarBuilder, Pod, QuantityBuilder} import scala.collection.JavaConverters._ import scala.concurrent.{ExecutionContext, Future} import org.apache.spark.{SparkContext, SparkException} -import org.apache.spark.deploy.kubernetes.{Client, KubernetesClientBuilder} +import org.apache.spark.deploy.kubernetes.KubernetesClientBuilder import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ import org.apache.spark.rpc.RpcEndpointAddress @@ -76,8 +73,8 @@ private[spark] class KubernetesClusterSchedulerBackend( private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("kubernetes-executor-requests")) - private val kubernetesClient = KubernetesClientBuilder - .buildFromWithinPod(kubernetesNamespace) + private val kubernetesClient = new KubernetesClientBuilder(conf, kubernetesNamespace) + .buildFromWithinPod() private val driverPod = try { kubernetesClient.pods().inNamespace(kubernetesNamespace). diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index 6aa1c1fee0d47..16564ca746b40 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.kubernetes.integrationtest import java.io.File import java.nio.file.Paths import java.util.UUID +import java.util.concurrent.TimeUnit import com.google.common.base.Charsets import com.google.common.collect.ImmutableList @@ -54,6 +55,11 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { private val HELPER_JAR_FILE = Paths.get("target", "integration-tests-spark-jobs-helpers") .toFile .listFiles()(0) + private val SUBMITTER_LOCAL_MAIN_APP_RESOURCE = s"file://${EXAMPLES_JAR_FILE.getAbsolutePath}" + private val CONTAINER_LOCAL_MAIN_APP_RESOURCE = s"local:///opt/spark/examples/" + + s"integration-tests-jars/${EXAMPLES_JAR_FILE.getName}" + private val CONTAINER_LOCAL_HELPER_JAR_PATH = s"local:///opt/spark/examples/" + + s"integration-tests-jars/${HELPER_JAR_FILE.getName}" private val TEST_EXISTENCE_FILE = Paths.get("test-data", "input.txt").toFile private val TEST_EXISTENCE_FILE_CONTENTS = Files.toString(TEST_EXISTENCE_FILE, Charsets.UTF_8) @@ -68,6 +74,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { private var clientConfig: Config = _ private var keyStoreFile: File = _ private var trustStoreFile: File = _ + private var sparkConf: SparkConf = _ override def beforeAll(): Unit = { Minikube.startMinikube() @@ -100,6 +107,22 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { || servicesList.getItems == null || servicesList.getItems.isEmpty) } + sparkConf = new SparkConf(true) + .setMaster(s"k8s://https://${Minikube.getMinikubeIp}:8443") + .set(KUBERNETES_SUBMIT_CA_CERT_FILE, clientConfig.getCaCertFile) + .set(KUBERNETES_SUBMIT_CLIENT_KEY_FILE, clientConfig.getClientKeyFile) + .set(KUBERNETES_SUBMIT_CLIENT_CERT_FILE, clientConfig.getClientCertFile) + .set(KUBERNETES_NAMESPACE, NAMESPACE) + .set(DRIVER_DOCKER_IMAGE, "spark-driver:latest") + .set(EXECUTOR_DOCKER_IMAGE, "spark-executor:latest") + .setJars(Seq(HELPER_JAR_FILE.getAbsolutePath)) + .set("spark.executor.memory", "500m") + .set("spark.executor.cores", "1") + .set("spark.executors.instances", "1") + .set("spark.app.name", "spark-pi") + .set("spark.ui.enabled", "true") + .set("spark.testing", "false") + .set(WAIT_FOR_APP_COMPLETION, false) } after { @@ -112,7 +135,10 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { .delete }) // spark-submit sets system properties so we have to clear them - new SparkConf(true).getAll.map(_._1).foreach { System.clearProperty } + new SparkConf(true) + .getAll.map(_._1) + .filter(_ != "spark.docker.test.persistMinikube") + .foreach { System.clearProperty } } override def afterAll(): Unit = { @@ -159,28 +185,10 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { // We'll make assertions based on spark rest api, so we need to turn on // spark.ui.enabled explicitly since the scalatest-maven-plugin would set it // to false by default. - val sparkConf = new SparkConf(true) - .setMaster(s"k8s://https://${Minikube.getMinikubeIp}:8443") - .set("spark.kubernetes.submit.caCertFile", clientConfig.getCaCertFile) - .set("spark.kubernetes.submit.clientKeyFile", clientConfig.getClientKeyFile) - .set("spark.kubernetes.submit.clientCertFile", clientConfig.getClientCertFile) - .set("spark.kubernetes.namespace", NAMESPACE) - .set("spark.kubernetes.driver.docker.image", "spark-driver:latest") - .set("spark.kubernetes.executor.docker.image", "spark-executor:latest") - .set("spark.jars", HELPER_JAR_FILE.getAbsolutePath) - .set("spark.executor.memory", "500m") - .set("spark.executor.cores", "1") - .set("spark.executors.instances", "1") - .set("spark.app.name", "spark-pi") - .set("spark.ui.enabled", "true") - .set("spark.testing", "false") - .set("spark.kubernetes.submit.waitAppCompletion", "false") - val mainAppResource = s"file://${EXAMPLES_JAR_FILE.getAbsolutePath}" - new Client( sparkConf = sparkConf, mainClass = SPARK_PI_MAIN_CLASS, - mainAppResource = mainAppResource, + mainAppResource = SUBMITTER_LOCAL_MAIN_APP_RESOURCE, appArgs = Array.empty[String]).run() val sparkMetricsService = getSparkMetricsService("spark-pi") expectationsForStaticAllocation(sparkMetricsService) @@ -199,64 +207,38 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { "--class", SPARK_PI_MAIN_CLASS, "--conf", "spark.ui.enabled=true", "--conf", "spark.testing=false", - "--conf", s"spark.kubernetes.submit.caCertFile=${clientConfig.getCaCertFile}", - "--conf", s"spark.kubernetes.submit.clientKeyFile=${clientConfig.getClientKeyFile}", - "--conf", s"spark.kubernetes.submit.clientCertFile=${clientConfig.getClientCertFile}", - "--conf", "spark.kubernetes.executor.docker.image=spark-executor:latest", - "--conf", "spark.kubernetes.driver.docker.image=spark-driver:latest", - "--conf", "spark.kubernetes.submit.waitAppCompletion=false", + "--conf", s"${KUBERNETES_SUBMIT_CA_CERT_FILE.key}=${clientConfig.getCaCertFile}", + "--conf", s"${KUBERNETES_SUBMIT_CLIENT_KEY_FILE.key}=${clientConfig.getClientKeyFile}", + "--conf", s"${KUBERNETES_SUBMIT_CLIENT_CERT_FILE.key}=${clientConfig.getClientCertFile}", + "--conf", s"${EXECUTOR_DOCKER_IMAGE.key}=spark-executor:latest", + "--conf", s"${DRIVER_DOCKER_IMAGE.key}=spark-driver:latest", + "--conf", s"${WAIT_FOR_APP_COMPLETION.key}=false", EXAMPLES_JAR_FILE.getAbsolutePath) SparkSubmit.main(args) val sparkMetricsService = getSparkMetricsService("spark-pi") expectationsForStaticAllocation(sparkMetricsService) } - test("Run using spark-submit with the examples jar on the docker image") { - val args = Array( - "--master", s"k8s://${Minikube.getMinikubeIp}:8443", - "--deploy-mode", "cluster", - "--kubernetes-namespace", NAMESPACE, - "--name", "spark-pi", - "--executor-memory", "512m", - "--executor-cores", "1", - "--num-executors", "1", - "--jars", s"local:///opt/spark/examples/integration-tests-jars/${HELPER_JAR_FILE.getName}", - "--class", SPARK_PI_MAIN_CLASS, - "--conf", s"spark.kubernetes.submit.caCertFile=${clientConfig.getCaCertFile}", - "--conf", s"spark.kubernetes.submit.clientKeyFile=${clientConfig.getClientKeyFile}", - "--conf", s"spark.kubernetes.submit.clientCertFile=${clientConfig.getClientCertFile}", - "--conf", "spark.kubernetes.executor.docker.image=spark-executor:latest", - "--conf", "spark.kubernetes.driver.docker.image=spark-driver:latest", - "--conf", "spark.kubernetes.submit.waitAppCompletion=false", - s"local:///opt/spark/examples/integration-tests-jars/${EXAMPLES_JAR_FILE.getName}") - SparkSubmit.main(args) + test("Run with the examples jar on the docker image") { + sparkConf.setJars(Seq(CONTAINER_LOCAL_HELPER_JAR_PATH)) + new Client( + sparkConf = sparkConf, + mainClass = SPARK_PI_MAIN_CLASS, + mainAppResource = CONTAINER_LOCAL_MAIN_APP_RESOURCE, + appArgs = Array.empty[String]).run() val sparkMetricsService = getSparkMetricsService("spark-pi") expectationsForStaticAllocation(sparkMetricsService) } test("Run with custom labels and annotations") { - val args = Array( - "--master", s"k8s://https://${Minikube.getMinikubeIp}:8443", - "--deploy-mode", "cluster", - "--kubernetes-namespace", NAMESPACE, - "--name", "spark-pi", - "--executor-memory", "512m", - "--executor-cores", "1", - "--num-executors", "1", - "--jars", HELPER_JAR_FILE.getAbsolutePath, - "--class", SPARK_PI_MAIN_CLASS, - "--conf", s"spark.kubernetes.submit.caCertFile=${clientConfig.getCaCertFile}", - "--conf", s"spark.kubernetes.submit.clientKeyFile=${clientConfig.getClientKeyFile}", - "--conf", s"spark.kubernetes.submit.clientCertFile=${clientConfig.getClientCertFile}", - "--conf", "spark.kubernetes.executor.docker.image=spark-executor:latest", - "--conf", "spark.kubernetes.driver.docker.image=spark-driver:latest", - "--conf", "spark.kubernetes.driver.labels=label1=label1value,label2=label2value", - "--conf", "spark.kubernetes.driver.annotations=" + - "annotation1=annotation1value," + - "annotation2=annotation2value", - "--conf", "spark.kubernetes.submit.waitAppCompletion=false", - EXAMPLES_JAR_FILE.getAbsolutePath) - SparkSubmit.main(args) + sparkConf.set(KUBERNETES_DRIVER_LABELS, "label1=label1value,label2=label2value") + sparkConf.set(KUBERNETES_DRIVER_ANNOTATIONS, "annotation1=annotation1value," + + "annotation2=annotation2value") + new Client( + sparkConf = sparkConf, + mainClass = SPARK_PI_MAIN_CLASS, + mainAppResource = SUBMITTER_LOCAL_MAIN_APP_RESOURCE, + appArgs = Array.empty[String]).run() val driverPodMetadata = minikubeKubernetesClient .pods .withLabel("spark-app-name", "spark-pi") @@ -283,57 +265,22 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { } test("Enable SSL on the driver submit server") { - val args = Array( - "--master", s"k8s://https://${Minikube.getMinikubeIp}:8443", - "--deploy-mode", "cluster", - "--kubernetes-namespace", NAMESPACE, - "--name", "spark-pi", - "--executor-memory", "512m", - "--executor-cores", "1", - "--num-executors", "1", - "--jars", HELPER_JAR_FILE.getAbsolutePath, - "--class", SPARK_PI_MAIN_CLASS, - "--conf", s"spark.kubernetes.submit.caCertFile=${clientConfig.getCaCertFile}", - "--conf", s"spark.kubernetes.submit.clientKeyFile=${clientConfig.getClientKeyFile}", - "--conf", s"spark.kubernetes.submit.clientCertFile=${clientConfig.getClientCertFile}", - "--conf", "spark.kubernetes.executor.docker.image=spark-executor:latest", - "--conf", "spark.kubernetes.driver.docker.image=spark-driver:latest", - "--conf", "spark.ssl.kubernetes.submit.enabled=true", - "--conf", "spark.ssl.kubernetes.submit.keyStore=" + - s"file://${keyStoreFile.getAbsolutePath}", - "--conf", "spark.ssl.kubernetes.submit.keyStorePassword=changeit", - "--conf", "spark.ssl.kubernetes.submit.keyPassword=changeit", - "--conf", "spark.ssl.kubernetes.submit.trustStore=" + - s"file://${trustStoreFile.getAbsolutePath}", - "--conf", s"spark.ssl.kubernetes.driverlaunch.trustStorePassword=changeit", - "--conf", "spark.kubernetes.submit.waitAppCompletion=false", - EXAMPLES_JAR_FILE.getAbsolutePath) - SparkSubmit.main(args) + sparkConf.set(KUBERNETES_DRIVER_SUBMIT_KEYSTORE, s"file://${keyStoreFile.getAbsolutePath}") + sparkConf.set("spark.ssl.kubernetes.submission.keyStorePassword", "changeit") + sparkConf.set("spark.ssl.kubernetes.submission.keyPassword", "changeit") + sparkConf.set(KUBERNETES_DRIVER_SUBMIT_TRUSTSTORE, + s"file://${trustStoreFile.getAbsolutePath}") + sparkConf.set(DRIVER_SUBMIT_SSL_ENABLED, true) + new Client( + sparkConf = sparkConf, + mainClass = SPARK_PI_MAIN_CLASS, + mainAppResource = SUBMITTER_LOCAL_MAIN_APP_RESOURCE, + appArgs = Array.empty[String]).run() } test("Added files should exist on the driver.") { - val args = Array( - "--master", s"k8s://https://${Minikube.getMinikubeIp}:8443", - "--deploy-mode", "cluster", - "--kubernetes-namespace", NAMESPACE, - "--name", "spark-file-existence-test", - "--executor-memory", "512m", - "--executor-cores", "1", - "--num-executors", "1", - "--jars", HELPER_JAR_FILE.getAbsolutePath, - "--files", TEST_EXISTENCE_FILE.getAbsolutePath, - "--class", FILE_EXISTENCE_MAIN_CLASS, - "--conf", "spark.ui.enabled=false", - "--conf", "spark.testing=true", - "--conf", s"spark.kubernetes.submit.caCertFile=${clientConfig.getCaCertFile}", - "--conf", s"spark.kubernetes.submit.clientKeyFile=${clientConfig.getClientKeyFile}", - "--conf", s"spark.kubernetes.submit.clientCertFile=${clientConfig.getClientCertFile}", - "--conf", "spark.kubernetes.executor.docker.image=spark-executor:latest", - "--conf", "spark.kubernetes.driver.docker.image=spark-driver:latest", - "--conf", "spark.kubernetes.submit.waitAppCompletion=false", - EXAMPLES_JAR_FILE.getAbsolutePath, - TEST_EXISTENCE_FILE.getName, - TEST_EXISTENCE_FILE_CONTENTS) + sparkConf.set("spark.files", TEST_EXISTENCE_FILE.getAbsolutePath) + sparkConf.setAppName("spark-file-existence-test") val podCompletedFuture = SettableFuture.create[Boolean] val watch = new Watcher[Pod] { override def eventReceived(action: Action, pod: Pod): Unit = { @@ -364,8 +311,12 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { .pods .withLabel("spark-app-name", "spark-file-existence-test") .watch(watch)) { _ => - SparkSubmit.main(args) - assert(podCompletedFuture.get, "Failed to run driver pod") + new Client( + sparkConf = sparkConf, + mainClass = FILE_EXISTENCE_MAIN_CLASS, + mainAppResource = CONTAINER_LOCAL_MAIN_APP_RESOURCE, + appArgs = Array(TEST_EXISTENCE_FILE.getName, TEST_EXISTENCE_FILE_CONTENTS)).run() + assert(podCompletedFuture.get(60, TimeUnit.SECONDS), "Failed to run driver pod") val driverPod = minikubeKubernetesClient .pods .withLabel("spark-app-name", "spark-file-existence-test") @@ -386,27 +337,12 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { Utils.tryWithResource(minikubeKubernetesClient.services() .withLabel("spark-app-name", "spark-pi") .watch(externalUriProviderWatch)) { _ => - val args = Array( - "--master", s"k8s://https://${Minikube.getMinikubeIp}:8443", - "--deploy-mode", "cluster", - "--kubernetes-namespace", NAMESPACE, - "--name", "spark-pi", - "--executor-memory", "512m", - "--executor-cores", "1", - "--num-executors", "1", - "--jars", HELPER_JAR_FILE.getAbsolutePath, - "--class", SPARK_PI_MAIN_CLASS, - "--conf", "spark.ui.enabled=true", - "--conf", "spark.testing=false", - "--conf", s"spark.kubernetes.submit.caCertFile=${clientConfig.getCaCertFile}", - "--conf", s"spark.kubernetes.submit.clientKeyFile=${clientConfig.getClientKeyFile}", - "--conf", s"spark.kubernetes.submit.clientCertFile=${clientConfig.getClientCertFile}", - "--conf", "spark.kubernetes.executor.docker.image=spark-executor:latest", - "--conf", "spark.kubernetes.driver.docker.image=spark-driver:latest", - "--conf", "spark.kubernetes.submit.waitAppCompletion=false", - "--conf", s"${DRIVER_SERVICE_MANAGER_TYPE.key}=${ExternalSuppliedUrisDriverServiceManager.TYPE}", - EXAMPLES_JAR_FILE.getAbsolutePath) - SparkSubmit.main(args) + sparkConf.set(DRIVER_SERVICE_MANAGER_TYPE, ExternalSuppliedUrisDriverServiceManager.TYPE) + new Client( + sparkConf = sparkConf, + mainClass = SPARK_PI_MAIN_CLASS, + mainAppResource = SUBMITTER_LOCAL_MAIN_APP_RESOURCE, + appArgs = Array.empty[String]).run() val sparkMetricsService = getSparkMetricsService("spark-pi") expectationsForStaticAllocation(sparkMetricsService) assert(externalUriProviderWatch.annotationSet.get) @@ -425,4 +361,17 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { "Resolved URI annotation not set on driver service.") } } + + test("Mount the Kubernetes credentials onto the driver pod") { + sparkConf.set(KUBERNETES_DRIVER_CA_CERT_FILE, clientConfig.getCaCertFile) + sparkConf.set(KUBERNETES_DRIVER_CLIENT_KEY_FILE, clientConfig.getClientKeyFile) + sparkConf.set(KUBERNETES_DRIVER_CLIENT_CERT_FILE, clientConfig.getClientCertFile) + new Client( + sparkConf = sparkConf, + mainClass = SPARK_PI_MAIN_CLASS, + mainAppResource = SUBMITTER_LOCAL_MAIN_APP_RESOURCE, + appArgs = Array.empty[String]).run() + val sparkMetricsService = getSparkMetricsService("spark-pi") + expectationsForStaticAllocation(sparkMetricsService) + } }