From 8fca03eeb8bf75c54e8ce8f6a399d76fc9c3d421 Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 14 Mar 2017 15:10:30 -0700 Subject: [PATCH 1/9] Allow the driver pod's credentials to be shipped through secrets. --- docs/running-on-kubernetes.md | 46 +++- .../spark/deploy/kubernetes/Client.scala | 55 +++-- ...iverPodKubernetesCredentialsProvider.scala | 100 ++++++++ .../kubernetes/KubernetesClientBuilder.scala | 51 +++- .../kubernetes/SslConfigurationProvider.scala | 2 +- .../spark/deploy/kubernetes/config.scala | 86 +++++-- .../spark/deploy/kubernetes/constants.scala | 12 +- .../integrationtest/KubernetesSuite.scala | 220 +++++++----------- 8 files changed, 384 insertions(+), 188 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/DriverPodKubernetesCredentialsProvider.scala diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index c1f3a3ca653b9..e51a3db634f64 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -200,14 +200,14 @@ from the other deployment modes. See the [configuration page](configuration.html - spark.kubernetes.submit.caCertFile + spark.kubernetes.apiserver.submit.caCertFile (none) CA cert file for connecting to Kubernetes over SSL. This file should be located on the submitting machine's disk. - spark.kubernetes.submit.clientKeyFile + spark.kubernetes.apiserver.submit.clientKeyFile (none) Client key file for authenticating against the Kubernetes API server. This file should be located on the submitting @@ -215,7 +215,7 @@ from the other deployment modes. See the [configuration page](configuration.html - spark.kubernetes.submit.clientCertFile + spark.kubernetes.apiserver.submit.clientCertFile (none) Client cert file for authenticating against the Kubernetes API server. This file should be located on the submitting @@ -223,7 +223,7 @@ from the other deployment modes. See the [configuration page](configuration.html - spark.kubernetes.submit.oauthToken + spark.kubernetes.apiserver.submit.oauthToken (none) OAuth token to use when authenticating against the against the Kubernetes API server. Note that unlike the other @@ -231,11 +231,45 @@ from the other deployment modes. See the [configuration page](configuration.html - spark.kubernetes.submit.serviceAccountName + spark.kubernetes.apiserver.driver.caCertFile + (none) + + CA cert file for connecting to Kubernetes over SSL from the driver pod when requesting executors. This file should + be located on the submitting machine's disk, and will be uploaded as a secret to the driver pod. + + + + spark.kubernetes.apiserver.driver.clientKeyFile + (none) + + Client key file for authenticating against the Kubernetes API server from the driver pod when requesting executors. + This file should be located on the submitting machine's disk, and will be uploaded as a secret to the driver pod. + + + + spark.kubernetes.apiserver.driver.clientCertFile + (none) + + Client cert file for authenticating against the Kubernetes API server from the driver pod when requesting executors. + This file should be located on the submitting machine's disk, and will be uploaded as a secret to the driver pod. + + + + spark.kubernetes.apiserver.driver.oauthToken + (none) + + OAuth token to use when authenticating against the against the Kubernetes API server from the driver pod when + requesting executors. Note that unlike the other authentication options, this should be the exact string value of + the token to use for the authentication. This token value is mounted as a secret on the driver pod. + + + + spark.kubernetes.apiserver.driver.serviceAccountName default Service account that is used when running the driver pod. The driver pod uses this service account when requesting - executor pods from the API server. + executor pods from the API server. Note that this cannot be specified alongside a CA cert file, client key file, + client cert file, and/or OAuth token. diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 6f715ebad2d75..851ba1b248ff5 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -53,7 +53,7 @@ private[spark] class Client( .getOrElse("spark") private val kubernetesAppId = s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-") private val secretName = s"$SUBMISSION_APP_SECRET_PREFIX-$kubernetesAppId" - private val secretDirectory = s"$DRIVER_CONTAINER_SECRETS_BASE_DIR/$kubernetesAppId" + private val secretDirectory = s"$DRIVER_CONTAINER_SUBMISSION_SECRETS_BASE_DIR/$kubernetesAppId" private val driverDockerImage = sparkConf.get(DRIVER_DOCKER_IMAGE) private val uiPort = sparkConf.getInt("spark.ui.port", DEFAULT_UI_PORT) private val driverSubmitTimeoutSecs = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TIMEOUT) @@ -123,19 +123,19 @@ private[spark] class Client( .withApiVersion("v1") .withMasterUrl(master) .withNamespace(namespace) - sparkConf.get(KUBERNETES_CA_CERT_FILE).foreach { + sparkConf.get(KUBERNETES_SUBMIT_CA_CERT_FILE).foreach { f => k8ConfBuilder = k8ConfBuilder.withCaCertFile(f) } - sparkConf.get(KUBERNETES_CLIENT_KEY_FILE).foreach { + sparkConf.get(KUBERNETES_SUBMIT_CLIENT_KEY_FILE).foreach { f => k8ConfBuilder = k8ConfBuilder.withClientKeyFile(f) } - sparkConf.get(KUBERNETES_CLIENT_CERT_FILE).foreach { + sparkConf.get(KUBERNETES_SUBMIT_CLIENT_CERT_FILE).foreach { f => k8ConfBuilder = k8ConfBuilder.withClientCertFile(f) } - sparkConf.get(KUBERNETES_OAUTH_TOKEN).foreach { token => + sparkConf.get(KUBERNETES_SUBMIT_OAUTH_TOKEN).foreach { token => k8ConfBuilder = k8ConfBuilder.withOauthToken(token) // Remove the oauth token from Spark conf so that its doesn't appear in the Spark UI. - sparkConf.set(KUBERNETES_OAUTH_TOKEN, "") + sparkConf.set(KUBERNETES_SUBMIT_OAUTH_TOKEN, "") } val k8ClientConfig = k8ConfBuilder.build @@ -165,6 +165,8 @@ private[spark] class Client( try { val sslConfigurationProvider = new SslConfigurationProvider( sparkConf, kubernetesAppId, kubernetesClient, kubernetesResourceCleaner) + val kubernetesClientCredentialsProvider = new DriverPodKubernetesCredentialsProvider( + sparkConf, kubernetesAppId) val submitServerSecret = kubernetesClient.secrets().createNew() .withNewMetadata() .withName(secretName) @@ -172,26 +174,28 @@ private[spark] class Client( .withData(Map((SUBMISSION_APP_SECRET_NAME, secretBase64String)).asJava) .withType("Opaque") .done() + val driverPodCredentials = kubernetesClientCredentialsProvider.getDriverPodKubernetesCredentials() + val resolvedCredentials = kubernetesClient + .secrets() + .create(driverPodCredentials.credentialsSecret) + kubernetesResourceCleaner.registerOrUpdateResource(resolvedCredentials) kubernetesResourceCleaner.registerOrUpdateResource(submitServerSecret) val sslConfiguration = sslConfigurationProvider.getSslConfiguration() - val driverKubernetesSelectors = (Map( - SPARK_DRIVER_LABEL -> kubernetesAppId, - SPARK_APP_ID_LABEL -> kubernetesAppId, - SPARK_APP_NAME_LABEL -> appName) - ++ parsedCustomLabels) val (driverPod, driverService) = launchDriverKubernetesComponents( kubernetesClient, driverServiceManager, parsedCustomLabels, parsedCustomAnnotations, submitServerSecret, - sslConfiguration) + sslConfiguration, + driverPodCredentials) configureOwnerReferences( kubernetesClient, submitServerSecret, sslConfiguration.sslSecrets, driverPod, - driverService) + driverService, + driverPodCredentials.credentialsSecret) submitApplicationToDriverServer( kubernetesClient, driverServiceManager, @@ -294,7 +298,8 @@ private[spark] class Client( customLabels: Map[String, String], customAnnotations: Map[String, String], submitServerSecret: Secret, - sslConfiguration: SslConfiguration): (Pod, Service) = { + sslConfiguration: SslConfiguration, + driverPodKubernetesCredentials: DriverPodKubernetesCredentials): (Pod, Service) = { val driverKubernetesSelectors = (Map( SPARK_DRIVER_LABEL -> kubernetesAppId, SPARK_APP_ID_LABEL -> kubernetesAppId, @@ -327,7 +332,8 @@ private[spark] class Client( driverKubernetesSelectors, customAnnotations, submitServerSecret, - sslConfiguration) + sslConfiguration, + driverPodKubernetesCredentials) waitForReadyKubernetesComponents(kubernetesClient, endpointsReadyFuture, serviceReadyFuture, podReadyFuture) (driverPod, driverService) @@ -347,7 +353,8 @@ private[spark] class Client( submitServerSecret: Secret, sslSecrets: Array[Secret], driverPod: Pod, - driverService: Service): Service = { + driverService: Service, + credentialsSecret: Secret): Service = { val driverPodOwnerRef = new OwnerReferenceBuilder() .withName(driverPod.getMetadata.getName) .withUid(driverPod.getMetadata.getUid) @@ -381,6 +388,15 @@ private[spark] class Client( .endMetadata() .done() kubernetesResourceCleaner.registerOrUpdateResource(updatedService) + val updatedCredentialsSecret = kubernetesClient + .secrets() + .withName(credentialsSecret.getMetadata.getName) + .edit() + .editMetadata() + .addToOwnerReferences(driverPodOwnerRef) + .endMetadata() + .done() + kubernetesResourceCleaner.registerOrUpdateResource(updatedCredentialsSecret) updatedService } @@ -421,7 +437,8 @@ private[spark] class Client( driverKubernetesSelectors: Map[String, String], customAnnotations: Map[String, String], submitServerSecret: Secret, - sslConfiguration: SslConfiguration): Pod = { + sslConfiguration: SslConfiguration, + driverPodKubernetesCredentials: DriverPodKubernetesCredentials): Pod = { val containerPorts = buildContainerPorts() val probePingHttpGet = new HTTPGetActionBuilder() .withScheme(if (sslConfiguration.sslOptions.enabled) "HTTPS" else "HTTP") @@ -448,8 +465,9 @@ private[spark] class Client( .withSecretName(submitServerSecret.getMetadata.getName) .endSecret() .endVolume() + .addToVolumes(driverPodKubernetesCredentials.credentialsSecretVolume) .addToVolumes(sslConfiguration.sslPodVolumes: _*) - .withServiceAccount(serviceAccount) + .withServiceAccount(serviceAccount.getOrElse("default")) .addNewContainer() .withName(DRIVER_CONTAINER_NAME) .withImage(driverDockerImage) @@ -459,6 +477,7 @@ private[spark] class Client( .withMountPath(secretDirectory) .withReadOnly(true) .endVolumeMount() + .addToVolumeMounts(driverPodKubernetesCredentials.credentialsSecretVolumeMount) .addToVolumeMounts(sslConfiguration.sslPodVolumeMounts: _*) .addNewEnv() .withName(ENV_SUBMISSION_SECRET_LOCATION) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/DriverPodKubernetesCredentialsProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/DriverPodKubernetesCredentialsProvider.scala new file mode 100644 index 0000000000000..1390c41c4178f --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/DriverPodKubernetesCredentialsProvider.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes + +import java.io.File + +import com.google.common.base.Charsets +import com.google.common.io.{BaseEncoding, Files} +import io.fabric8.kubernetes.api.model.{Secret, SecretBuilder, Volume, VolumeBuilder, VolumeMount, VolumeMountBuilder} +import scala.collection.JavaConverters._ + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.internal.config.OptionalConfigEntry + +private[spark] case class DriverPodKubernetesCredentials( + credentialsSecret: Secret, + credentialsSecretVolume: Volume, + credentialsSecretVolumeMount: VolumeMount) + +private[spark] class DriverPodKubernetesCredentialsProvider( + sparkConf: SparkConf, + kubernetesAppId: String) { + + def getDriverPodKubernetesCredentials(): DriverPodKubernetesCredentials = { + val oauthTokenSecretMapping = sparkConf + .get(KUBERNETES_DRIVER_OAUTH_TOKEN) + .map(token => (DRIVER_CONTAINER_OAUTH_TOKEN_SECRET_NAME, + BaseEncoding.base64().encode(token.getBytes(Charsets.UTF_8)))) + val caCertSecretMapping = convertFileConfToSecretMapping(KUBERNETES_DRIVER_CA_CERT_FILE, + DRIVER_CONTAINER_CA_CERT_FILE_SECRET_NAME) + val clientKeyFileSecretMapping = convertFileConfToSecretMapping( + KUBERNETES_DRIVER_CLIENT_KEY_FILE, DRIVER_CONTAINER_CLIENT_KEY_FILE_SECRET_NAME) + val clientCertFileSecretMapping = convertFileConfToSecretMapping( + KUBERNETES_DRIVER_CLIENT_CERT_FILE, DRIVER_CONTAINER_CLIENT_CERT_FILE_SECRET_NAME) + val secretData = (oauthTokenSecretMapping ++ + caCertSecretMapping ++ + clientKeyFileSecretMapping ++ + clientCertFileSecretMapping).toMap + val credentialsSecret = new SecretBuilder() + .withNewMetadata() + .withName(s"$DRIVER_CONTAINER_KUBERNETES_CREDENTIALS_SECRET_NAME-$kubernetesAppId") + .endMetadata() + .withData(secretData.asJava) + .build() + val credentialsSecretVolume = new VolumeBuilder() + .withName(DRIVER_CONTAINER_KUBERNETES_CREDENTIALS_VOLUME_NAME) + .withNewSecret() + .withSecretName(credentialsSecret.getMetadata.getName) + .endSecret() + .build() + val credentialsSecretVolumeMount = new VolumeMountBuilder() + .withName(credentialsSecretVolume.getName) + .withReadOnly(true) + .withMountPath(DRIVER_CONTAINER_KUBERNETES_CREDENTIALS_SECRETS_BASE_DIR) + .build() + // Cannot use both service account and mounted secrets + sparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME).foreach { _ => + require(oauthTokenSecretMapping.isEmpty, + "Cannot specify both a service account and a driver pod OAuth token.") + require(caCertSecretMapping.isEmpty, + "Cannot specify both a service account and a driver pod CA cert file.") + require(clientKeyFileSecretMapping.isEmpty, + "Cannot specify both a service account and a driver pod client key file.") + require(clientCertFileSecretMapping.isEmpty, + "Cannot specify both a service account and a driver pod client cert file.") + } + DriverPodKubernetesCredentials( + credentialsSecret, + credentialsSecretVolume, + credentialsSecretVolumeMount) + } + + private def convertFileConfToSecretMapping( + conf: OptionalConfigEntry[String], + secretName: String): Option[(String, String)] = { + sparkConf.get(conf).map(new File(_)).map { file => + if (!file.isFile()) { + throw new SparkException(s"File provided for ${conf.key} at ${file.getAbsolutePath}" + + s" does not exist or is not a file.") + } + (secretName, BaseEncoding.base64().encode(Files.toByteArray(file))) + } + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala index 89369b30694ee..54c1452d58a14 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala @@ -25,8 +25,18 @@ import io.fabric8.kubernetes.client.{Config, ConfigBuilder, DefaultKubernetesCli import org.apache.spark.deploy.kubernetes.constants._ private[spark] object KubernetesClientBuilder { - private val API_SERVER_TOKEN = new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH) - private val CA_CERT_FILE = new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH) + private val SERVICE_ACCOUNT_TOKEN = new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH) + private val SERVICE_ACCOUNT_CA_CERT = new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH) + private val MOUNTED_CREDENTIALS_BASE_DIR = new File( + DRIVER_CONTAINER_KUBERNETES_CREDENTIALS_SECRETS_BASE_DIR) + private val MOUNTED_TOKEN = new File(MOUNTED_CREDENTIALS_BASE_DIR, + DRIVER_CONTAINER_OAUTH_TOKEN_SECRET_NAME) + private val MOUNTED_CA_CERT = new File(MOUNTED_CREDENTIALS_BASE_DIR, + DRIVER_CONTAINER_CA_CERT_FILE_SECRET_NAME) + private val MOUNTED_CLIENT_KEY = new File(MOUNTED_CREDENTIALS_BASE_DIR, + DRIVER_CONTAINER_CLIENT_KEY_FILE_SECRET_NAME) + private val MOUNTED_CLIENT_CERT = new File(MOUNTED_CREDENTIALS_BASE_DIR, + DRIVER_CONTAINER_CLIENT_CERT_FILE_SECRET_NAME) /** * Creates a {@link KubernetesClient}, expecting to be from @@ -34,20 +44,41 @@ private[spark] object KubernetesClientBuilder { * are picked up from canonical locations, as they are injected * into the pod's disk space. */ - def buildFromWithinPod( - kubernetesNamespace: String): DefaultKubernetesClient = { + def buildFromWithinPod(kubernetesNamespace: String): DefaultKubernetesClient = { var clientConfigBuilder = new ConfigBuilder() .withApiVersion("v1") .withMasterUrl(KUBERNETES_MASTER_INTERNAL_URL) .withNamespace(kubernetesNamespace) - if (CA_CERT_FILE.isFile) { - clientConfigBuilder = clientConfigBuilder.withCaCertFile(CA_CERT_FILE.getAbsolutePath) - } + if (MOUNTED_TOKEN.isFile || + MOUNTED_CA_CERT.isFile || + MOUNTED_CLIENT_KEY.isFile || + MOUNTED_CLIENT_CERT.isFile) { + if (MOUNTED_TOKEN.isFile) { + clientConfigBuilder = clientConfigBuilder.withOauthToken( + Files.toString(MOUNTED_TOKEN, Charsets.UTF_8)) + } + if (MOUNTED_CA_CERT.isFile) { + clientConfigBuilder = clientConfigBuilder.withCaCertFile(MOUNTED_CA_CERT.getAbsolutePath) + } + if (MOUNTED_CLIENT_KEY.isFile) { + clientConfigBuilder = clientConfigBuilder.withClientKeyFile( + MOUNTED_CLIENT_KEY.getAbsolutePath) + } + if (MOUNTED_CLIENT_CERT.isFile) { + clientConfigBuilder = clientConfigBuilder.withClientCertFile( + MOUNTED_CLIENT_CERT.getAbsolutePath) + } + } else { + if (SERVICE_ACCOUNT_CA_CERT.isFile) { + clientConfigBuilder = clientConfigBuilder.withCaCertFile( + SERVICE_ACCOUNT_CA_CERT.getAbsolutePath) + } - if (API_SERVER_TOKEN.isFile) { - clientConfigBuilder = clientConfigBuilder.withOauthToken( - Files.toString(API_SERVER_TOKEN, Charsets.UTF_8)) + if (SERVICE_ACCOUNT_TOKEN.isFile) { + clientConfigBuilder = clientConfigBuilder.withOauthToken( + Files.toString(SERVICE_ACCOUNT_TOKEN, Charsets.UTF_8)) + } } new DefaultKubernetesClient(clientConfigBuilder.build) } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SslConfigurationProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SslConfigurationProvider.scala index 4c031fcba91ab..1351a2ea1e018 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SslConfigurationProvider.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SslConfigurationProvider.scala @@ -49,7 +49,7 @@ private[spark] class SslConfigurationProvider( kubernetesResourceCleaner: KubernetesResourceCleaner) { private val SECURE_RANDOM = new SecureRandom() private val sslSecretsName = s"$SUBMISSION_SSL_SECRETS_PREFIX-$kubernetesAppId" - private val sslSecretsDirectory = s"$DRIVER_CONTAINER_SECRETS_BASE_DIR/$kubernetesAppId-ssl" + private val sslSecretsDirectory = s"$DRIVER_CONTAINER_SUBMISSION_SECRETS_BASE_DIR/$kubernetesAppId-ssl" def getSslConfiguration(): SslConfiguration = { val (driverSubmitSslOptions, isKeyStoreLocalFile) = parseDriverSubmitSslOptions() diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index 0c4269080335f..1548cf82a4ad7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -54,40 +54,47 @@ package object config { .stringConf .createWithDefault(s"spark-executor:$sparkVersion") - private[spark] val KUBERNETES_CA_CERT_FILE = - ConfigBuilder("spark.kubernetes.submit.caCertFile") + private val APISERVER_SUBMIT_CONF_PREFIX = "spark.kubernetes.apiserver.submit" + private val APISERVER_DRIVER_CONF_PREFIX = "spark.kubernetes.apiserver.driver" + + private[spark] val KUBERNETES_SUBMIT_CA_CERT_FILE = + ConfigBuilder(s"$APISERVER_SUBMIT_CONF_PREFIX.caCertFile") .doc(""" - | CA cert file for connecting to Kubernetes over SSL. This + | CA cert file for connecting to Kubernetes over SSL when + | creating Kubernetes resources for the driver. This | file should be located on the submitting machine's disk. """.stripMargin) .stringConf .createOptional - private[spark] val KUBERNETES_CLIENT_KEY_FILE = - ConfigBuilder("spark.kubernetes.submit.clientKeyFile") + private[spark] val KUBERNETES_SUBMIT_CLIENT_KEY_FILE = + ConfigBuilder(s"$APISERVER_SUBMIT_CONF_PREFIX.clientKeyFile") .doc(""" | Client key file for authenticating against the Kubernetes - | API server. This file should be located on the submitting + | API server when initially creating Kubernetes resources for + | the driver. This file should be located on the submitting | machine's disk. """.stripMargin) .stringConf .createOptional - private[spark] val KUBERNETES_CLIENT_CERT_FILE = - ConfigBuilder("spark.kubernetes.submit.clientCertFile") + private[spark] val KUBERNETES_SUBMIT_CLIENT_CERT_FILE = + ConfigBuilder(s"$APISERVER_SUBMIT_CONF_PREFIX.clientCertFile") .doc(""" | Client cert file for authenticating against the - | Kubernetes API server. This file should be located on + | Kubernetes API server when initially creating Kubernetes + | resources for the driver. This file should be located on | the submitting machine's disk. """.stripMargin) .stringConf .createOptional - private[spark] val KUBERNETES_OAUTH_TOKEN = - ConfigBuilder("spark.kubernetes.submit.oauthToken") + private[spark] val KUBERNETES_SUBMIT_OAUTH_TOKEN = + ConfigBuilder(s"$APISERVER_SUBMIT_CONF_PREFIX.oauthToken") .doc(""" | OAuth token to use when authenticating against the - | against the Kubernetes API server. Note that unlike + | against the Kubernetes API server when initially creating + | Kubernetes resources for the driver. Note that unlike | the other authentication options, this should be the | exact string value of the token to use for the | authentication. @@ -95,15 +102,64 @@ package object config { .stringConf .createOptional + private[spark] val KUBERNETES_DRIVER_CA_CERT_FILE = + ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.caCertFile") + .doc(""" + | CA cert file for connecting to Kubernetes over SSL from + | the driver pod when requesting executors. This file should + | be located on the submitting machine's disk, and will be + | uploaded as a secret to the driver pod. + """.stripMargin) + .stringConf + .createOptional + + private[spark] val KUBERNETES_DRIVER_CLIENT_KEY_FILE = + ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.clientKeyFile") + .doc(""" + | Client key file for authenticating against the Kubernetes + | API server from the driver pod when requesting executors. + | This file should be located on the submitting machine's disk, + | and will be uploaded as a secret to the driver pod. + """.stripMargin) + .stringConf + .createOptional + + private[spark] val KUBERNETES_DRIVER_CLIENT_CERT_FILE = + ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.clientCertFile") + .doc(""" + | Client cert file for authenticating against the + | Kubernetes API server from the driver pod when requesting + | executors. This file should be located on the submitting + | machine's disk, and will be uploaded as a secret to the + | driver pod. + """.stripMargin) + .stringConf + .createOptional + + private[spark] val KUBERNETES_DRIVER_OAUTH_TOKEN = + ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.oauthToken") + .doc(""" + | OAuth token to use when authenticating against the + | against the Kubernetes API server from the driver pod + | when requesting executors. Note that unlike the other + | authentication options, this should be the exact string + | value of the token to use for the authentication. This + | token value is mounted as a secret on the driver pod. + """.stripMargin) + .stringConf + .createOptional + private[spark] val KUBERNETES_SERVICE_ACCOUNT_NAME = - ConfigBuilder("spark.kubernetes.submit.serviceAccountName") + ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.serviceAccountName") .doc(""" | Service account that is used when running the driver pod. | The driver pod uses this service account when requesting - | executor pods from the API server. + | executor pods from the API server. If specific credentials + | are given for the driver pod to use, the driver will favor + | using those credentials instead. """.stripMargin) .stringConf - .createWithDefault("default") + .createOptional // Note that while we set a default for this when we start up the // scheduler, the specific default value is dynamically determined diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala index 4af065758e674..7ec64315fcfab 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala @@ -24,7 +24,17 @@ package object constants { private[spark] val SPARK_EXECUTOR_ID_LABEL = "spark-exec-id" // Secrets - private[spark] val DRIVER_CONTAINER_SECRETS_BASE_DIR = "/var/run/secrets/spark-submission" + private[spark] val DRIVER_CONTAINER_SUBMISSION_SECRETS_BASE_DIR = "/var/run/secrets/spark-submission" + private[spark] val DRIVER_CONTAINER_KUBERNETES_CREDENTIALS_SECRETS_BASE_DIR = + "/var/run/secrets/kubernetes-credentials" + private[spark] val DRIVER_CONTAINER_KUBERNETES_CREDENTIALS_SECRET_NAME = + "driver-kubernetes-credentials" + private[spark] val DRIVER_CONTAINER_KUBERNETES_CREDENTIALS_VOLUME_NAME = + "driver-kubernetes-credentials-volume" + private[spark] val DRIVER_CONTAINER_OAUTH_TOKEN_SECRET_NAME = "client-oauth-token" + private[spark] val DRIVER_CONTAINER_CLIENT_KEY_FILE_SECRET_NAME = "client-key" + private[spark] val DRIVER_CONTAINER_CLIENT_CERT_FILE_SECRET_NAME = "client-cert" + private[spark] val DRIVER_CONTAINER_CA_CERT_FILE_SECRET_NAME = "ca-cert" private[spark] val SUBMISSION_APP_SECRET_NAME = "spark-submission-server-secret" private[spark] val SUBMISSION_APP_SECRET_PREFIX = "spark-submission-server-secret" private[spark] val SUBMISSION_APP_SECRET_VOLUME_NAME = "spark-submission-secret-volume" diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index 6aa1c1fee0d47..8bc9e527b6e9f 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.kubernetes.integrationtest import java.io.File import java.nio.file.Paths import java.util.UUID +import java.util.concurrent.TimeUnit import com.google.common.base.Charsets import com.google.common.collect.ImmutableList @@ -54,6 +55,11 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { private val HELPER_JAR_FILE = Paths.get("target", "integration-tests-spark-jobs-helpers") .toFile .listFiles()(0) + private val SUBMITTER_LOCAL_MAIN_APP_RESOURCE = s"file://${EXAMPLES_JAR_FILE.getAbsolutePath}" + private val CONTAINER_LOCAL_MAIN_APP_RESOURCE = s"local:///opt/spark/examples/" + + s"integration-tests-jars/${EXAMPLES_JAR_FILE.getName}" + private val CONTAINER_LOCAL_HELPER_JAR_PATH = s"local:///opt/spark/examples/" + + s"integration-tests-jars/${HELPER_JAR_FILE.getName}" private val TEST_EXISTENCE_FILE = Paths.get("test-data", "input.txt").toFile private val TEST_EXISTENCE_FILE_CONTENTS = Files.toString(TEST_EXISTENCE_FILE, Charsets.UTF_8) @@ -68,6 +74,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { private var clientConfig: Config = _ private var keyStoreFile: File = _ private var trustStoreFile: File = _ + private var sparkConf: SparkConf = _ override def beforeAll(): Unit = { Minikube.startMinikube() @@ -100,6 +107,22 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { || servicesList.getItems == null || servicesList.getItems.isEmpty) } + sparkConf = new SparkConf(true) + .setMaster(s"k8s://https://${Minikube.getMinikubeIp}:8443") + .set(KUBERNETES_SUBMIT_CA_CERT_FILE, clientConfig.getCaCertFile) + .set(KUBERNETES_SUBMIT_CLIENT_KEY_FILE, clientConfig.getClientKeyFile) + .set(KUBERNETES_SUBMIT_CLIENT_CERT_FILE, clientConfig.getClientCertFile) + .set(KUBERNETES_NAMESPACE, NAMESPACE) + .set(DRIVER_DOCKER_IMAGE, "spark-driver:latest") + .set(EXECUTOR_DOCKER_IMAGE, "spark-executor:latest") + .setJars(Seq(HELPER_JAR_FILE.getAbsolutePath)) + .set("spark.executor.memory", "500m") + .set("spark.executor.cores", "1") + .set("spark.executors.instances", "1") + .set("spark.app.name", "spark-pi") + .set("spark.ui.enabled", "true") + .set("spark.testing", "false") + .set(WAIT_FOR_APP_COMPLETION, false) } after { @@ -159,28 +182,10 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { // We'll make assertions based on spark rest api, so we need to turn on // spark.ui.enabled explicitly since the scalatest-maven-plugin would set it // to false by default. - val sparkConf = new SparkConf(true) - .setMaster(s"k8s://https://${Minikube.getMinikubeIp}:8443") - .set("spark.kubernetes.submit.caCertFile", clientConfig.getCaCertFile) - .set("spark.kubernetes.submit.clientKeyFile", clientConfig.getClientKeyFile) - .set("spark.kubernetes.submit.clientCertFile", clientConfig.getClientCertFile) - .set("spark.kubernetes.namespace", NAMESPACE) - .set("spark.kubernetes.driver.docker.image", "spark-driver:latest") - .set("spark.kubernetes.executor.docker.image", "spark-executor:latest") - .set("spark.jars", HELPER_JAR_FILE.getAbsolutePath) - .set("spark.executor.memory", "500m") - .set("spark.executor.cores", "1") - .set("spark.executors.instances", "1") - .set("spark.app.name", "spark-pi") - .set("spark.ui.enabled", "true") - .set("spark.testing", "false") - .set("spark.kubernetes.submit.waitAppCompletion", "false") - val mainAppResource = s"file://${EXAMPLES_JAR_FILE.getAbsolutePath}" - new Client( sparkConf = sparkConf, mainClass = SPARK_PI_MAIN_CLASS, - mainAppResource = mainAppResource, + mainAppResource = SUBMITTER_LOCAL_MAIN_APP_RESOURCE, appArgs = Array.empty[String]).run() val sparkMetricsService = getSparkMetricsService("spark-pi") expectationsForStaticAllocation(sparkMetricsService) @@ -199,64 +204,38 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { "--class", SPARK_PI_MAIN_CLASS, "--conf", "spark.ui.enabled=true", "--conf", "spark.testing=false", - "--conf", s"spark.kubernetes.submit.caCertFile=${clientConfig.getCaCertFile}", - "--conf", s"spark.kubernetes.submit.clientKeyFile=${clientConfig.getClientKeyFile}", - "--conf", s"spark.kubernetes.submit.clientCertFile=${clientConfig.getClientCertFile}", - "--conf", "spark.kubernetes.executor.docker.image=spark-executor:latest", - "--conf", "spark.kubernetes.driver.docker.image=spark-driver:latest", - "--conf", "spark.kubernetes.submit.waitAppCompletion=false", + "--conf", s"${KUBERNETES_SUBMIT_CA_CERT_FILE.key}=${clientConfig.getCaCertFile}", + "--conf", s"${KUBERNETES_SUBMIT_CLIENT_KEY_FILE.key}=${clientConfig.getClientKeyFile}", + "--conf", s"${KUBERNETES_SUBMIT_CLIENT_CERT_FILE.key}=${clientConfig.getClientCertFile}", + "--conf", s"${EXECUTOR_DOCKER_IMAGE.key}=spark-executor:latest", + "--conf", s"${DRIVER_DOCKER_IMAGE.key}=spark-driver:latest", + "--conf", s"${WAIT_FOR_APP_COMPLETION.key}=false", EXAMPLES_JAR_FILE.getAbsolutePath) SparkSubmit.main(args) val sparkMetricsService = getSparkMetricsService("spark-pi") expectationsForStaticAllocation(sparkMetricsService) } - test("Run using spark-submit with the examples jar on the docker image") { - val args = Array( - "--master", s"k8s://${Minikube.getMinikubeIp}:8443", - "--deploy-mode", "cluster", - "--kubernetes-namespace", NAMESPACE, - "--name", "spark-pi", - "--executor-memory", "512m", - "--executor-cores", "1", - "--num-executors", "1", - "--jars", s"local:///opt/spark/examples/integration-tests-jars/${HELPER_JAR_FILE.getName}", - "--class", SPARK_PI_MAIN_CLASS, - "--conf", s"spark.kubernetes.submit.caCertFile=${clientConfig.getCaCertFile}", - "--conf", s"spark.kubernetes.submit.clientKeyFile=${clientConfig.getClientKeyFile}", - "--conf", s"spark.kubernetes.submit.clientCertFile=${clientConfig.getClientCertFile}", - "--conf", "spark.kubernetes.executor.docker.image=spark-executor:latest", - "--conf", "spark.kubernetes.driver.docker.image=spark-driver:latest", - "--conf", "spark.kubernetes.submit.waitAppCompletion=false", - s"local:///opt/spark/examples/integration-tests-jars/${EXAMPLES_JAR_FILE.getName}") - SparkSubmit.main(args) + test("Run with the examples jar on the docker image") { + sparkConf.setJars(Seq(CONTAINER_LOCAL_HELPER_JAR_PATH)) + new Client( + sparkConf = sparkConf, + mainClass = SPARK_PI_MAIN_CLASS, + mainAppResource = CONTAINER_LOCAL_MAIN_APP_RESOURCE, + appArgs = Array.empty[String]).run() val sparkMetricsService = getSparkMetricsService("spark-pi") expectationsForStaticAllocation(sparkMetricsService) } test("Run with custom labels and annotations") { - val args = Array( - "--master", s"k8s://https://${Minikube.getMinikubeIp}:8443", - "--deploy-mode", "cluster", - "--kubernetes-namespace", NAMESPACE, - "--name", "spark-pi", - "--executor-memory", "512m", - "--executor-cores", "1", - "--num-executors", "1", - "--jars", HELPER_JAR_FILE.getAbsolutePath, - "--class", SPARK_PI_MAIN_CLASS, - "--conf", s"spark.kubernetes.submit.caCertFile=${clientConfig.getCaCertFile}", - "--conf", s"spark.kubernetes.submit.clientKeyFile=${clientConfig.getClientKeyFile}", - "--conf", s"spark.kubernetes.submit.clientCertFile=${clientConfig.getClientCertFile}", - "--conf", "spark.kubernetes.executor.docker.image=spark-executor:latest", - "--conf", "spark.kubernetes.driver.docker.image=spark-driver:latest", - "--conf", "spark.kubernetes.driver.labels=label1=label1value,label2=label2value", - "--conf", "spark.kubernetes.driver.annotations=" + - "annotation1=annotation1value," + - "annotation2=annotation2value", - "--conf", "spark.kubernetes.submit.waitAppCompletion=false", - EXAMPLES_JAR_FILE.getAbsolutePath) - SparkSubmit.main(args) + sparkConf.set(KUBERNETES_DRIVER_LABELS, "label1=label1value,label2=label2value") + sparkConf.set(KUBERNETES_DRIVER_ANNOTATIONS, "annotation1=annotation1value," + + "annotation2=annotation2value") + new Client( + sparkConf = sparkConf, + mainClass = SPARK_PI_MAIN_CLASS, + mainAppResource = SUBMITTER_LOCAL_MAIN_APP_RESOURCE, + appArgs = Array.empty[String]).run() val driverPodMetadata = minikubeKubernetesClient .pods .withLabel("spark-app-name", "spark-pi") @@ -283,57 +262,22 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { } test("Enable SSL on the driver submit server") { - val args = Array( - "--master", s"k8s://https://${Minikube.getMinikubeIp}:8443", - "--deploy-mode", "cluster", - "--kubernetes-namespace", NAMESPACE, - "--name", "spark-pi", - "--executor-memory", "512m", - "--executor-cores", "1", - "--num-executors", "1", - "--jars", HELPER_JAR_FILE.getAbsolutePath, - "--class", SPARK_PI_MAIN_CLASS, - "--conf", s"spark.kubernetes.submit.caCertFile=${clientConfig.getCaCertFile}", - "--conf", s"spark.kubernetes.submit.clientKeyFile=${clientConfig.getClientKeyFile}", - "--conf", s"spark.kubernetes.submit.clientCertFile=${clientConfig.getClientCertFile}", - "--conf", "spark.kubernetes.executor.docker.image=spark-executor:latest", - "--conf", "spark.kubernetes.driver.docker.image=spark-driver:latest", - "--conf", "spark.ssl.kubernetes.submit.enabled=true", - "--conf", "spark.ssl.kubernetes.submit.keyStore=" + - s"file://${keyStoreFile.getAbsolutePath}", - "--conf", "spark.ssl.kubernetes.submit.keyStorePassword=changeit", - "--conf", "spark.ssl.kubernetes.submit.keyPassword=changeit", - "--conf", "spark.ssl.kubernetes.submit.trustStore=" + - s"file://${trustStoreFile.getAbsolutePath}", - "--conf", s"spark.ssl.kubernetes.driverlaunch.trustStorePassword=changeit", - "--conf", "spark.kubernetes.submit.waitAppCompletion=false", - EXAMPLES_JAR_FILE.getAbsolutePath) - SparkSubmit.main(args) + sparkConf.set(KUBERNETES_DRIVER_SUBMIT_KEYSTORE, s"file://${keyStoreFile.getAbsolutePath}") + sparkConf.set("spark.ssl.kubernetes.submit.keyStorePassword", "changeit") + sparkConf.set("spark.ssl.kubernetes.submit.keyPassword", "changeit") + sparkConf.set(KUBERNETES_DRIVER_SUBMIT_TRUSTSTORE, + s"file://${trustStoreFile.getAbsolutePath}") + sparkConf.set(DRIVER_SUBMIT_SSL_ENABLED, true) + new Client( + sparkConf = sparkConf, + mainClass = SPARK_PI_MAIN_CLASS, + mainAppResource = SUBMITTER_LOCAL_MAIN_APP_RESOURCE, + appArgs = Array.empty[String]).run() } test("Added files should exist on the driver.") { - val args = Array( - "--master", s"k8s://https://${Minikube.getMinikubeIp}:8443", - "--deploy-mode", "cluster", - "--kubernetes-namespace", NAMESPACE, - "--name", "spark-file-existence-test", - "--executor-memory", "512m", - "--executor-cores", "1", - "--num-executors", "1", - "--jars", HELPER_JAR_FILE.getAbsolutePath, - "--files", TEST_EXISTENCE_FILE.getAbsolutePath, - "--class", FILE_EXISTENCE_MAIN_CLASS, - "--conf", "spark.ui.enabled=false", - "--conf", "spark.testing=true", - "--conf", s"spark.kubernetes.submit.caCertFile=${clientConfig.getCaCertFile}", - "--conf", s"spark.kubernetes.submit.clientKeyFile=${clientConfig.getClientKeyFile}", - "--conf", s"spark.kubernetes.submit.clientCertFile=${clientConfig.getClientCertFile}", - "--conf", "spark.kubernetes.executor.docker.image=spark-executor:latest", - "--conf", "spark.kubernetes.driver.docker.image=spark-driver:latest", - "--conf", "spark.kubernetes.submit.waitAppCompletion=false", - EXAMPLES_JAR_FILE.getAbsolutePath, - TEST_EXISTENCE_FILE.getName, - TEST_EXISTENCE_FILE_CONTENTS) + sparkConf.set("spark.files", TEST_EXISTENCE_FILE.getAbsolutePath) + sparkConf.setAppName("spark-file-existence-test") val podCompletedFuture = SettableFuture.create[Boolean] val watch = new Watcher[Pod] { override def eventReceived(action: Action, pod: Pod): Unit = { @@ -364,8 +308,12 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { .pods .withLabel("spark-app-name", "spark-file-existence-test") .watch(watch)) { _ => - SparkSubmit.main(args) - assert(podCompletedFuture.get, "Failed to run driver pod") + new Client( + sparkConf = sparkConf, + mainClass = FILE_EXISTENCE_MAIN_CLASS, + mainAppResource = CONTAINER_LOCAL_MAIN_APP_RESOURCE, + appArgs = Array(TEST_EXISTENCE_FILE.getName, TEST_EXISTENCE_FILE_CONTENTS)).run() + assert(podCompletedFuture.get(60, TimeUnit.SECONDS), "Failed to run driver pod") val driverPod = minikubeKubernetesClient .pods .withLabel("spark-app-name", "spark-file-existence-test") @@ -386,27 +334,12 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { Utils.tryWithResource(minikubeKubernetesClient.services() .withLabel("spark-app-name", "spark-pi") .watch(externalUriProviderWatch)) { _ => - val args = Array( - "--master", s"k8s://https://${Minikube.getMinikubeIp}:8443", - "--deploy-mode", "cluster", - "--kubernetes-namespace", NAMESPACE, - "--name", "spark-pi", - "--executor-memory", "512m", - "--executor-cores", "1", - "--num-executors", "1", - "--jars", HELPER_JAR_FILE.getAbsolutePath, - "--class", SPARK_PI_MAIN_CLASS, - "--conf", "spark.ui.enabled=true", - "--conf", "spark.testing=false", - "--conf", s"spark.kubernetes.submit.caCertFile=${clientConfig.getCaCertFile}", - "--conf", s"spark.kubernetes.submit.clientKeyFile=${clientConfig.getClientKeyFile}", - "--conf", s"spark.kubernetes.submit.clientCertFile=${clientConfig.getClientCertFile}", - "--conf", "spark.kubernetes.executor.docker.image=spark-executor:latest", - "--conf", "spark.kubernetes.driver.docker.image=spark-driver:latest", - "--conf", "spark.kubernetes.submit.waitAppCompletion=false", - "--conf", s"${DRIVER_SERVICE_MANAGER_TYPE.key}=${ExternalSuppliedUrisDriverServiceManager.TYPE}", - EXAMPLES_JAR_FILE.getAbsolutePath) - SparkSubmit.main(args) + sparkConf.set(DRIVER_SERVICE_MANAGER_TYPE, ExternalSuppliedUrisDriverServiceManager.TYPE) + new Client( + sparkConf = sparkConf, + mainClass = SPARK_PI_MAIN_CLASS, + mainAppResource = SUBMITTER_LOCAL_MAIN_APP_RESOURCE, + appArgs = Array.empty[String]).run() val sparkMetricsService = getSparkMetricsService("spark-pi") expectationsForStaticAllocation(sparkMetricsService) assert(externalUriProviderWatch.annotationSet.get) @@ -425,4 +358,17 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { "Resolved URI annotation not set on driver service.") } } + + test("Mount the Kubernetes credentials onto the driver pod") { + sparkConf.set(KUBERNETES_DRIVER_CA_CERT_FILE, clientConfig.getCaCertFile) + sparkConf.set(KUBERNETES_DRIVER_CLIENT_KEY_FILE, clientConfig.getClientKeyFile) + sparkConf.set(KUBERNETES_DRIVER_CLIENT_CERT_FILE, clientConfig.getClientCertFile) + new Client( + sparkConf = sparkConf, + mainClass = SPARK_PI_MAIN_CLASS, + mainAppResource = SUBMITTER_LOCAL_MAIN_APP_RESOURCE, + appArgs = Array.empty[String]).run() + val sparkMetricsService = getSparkMetricsService("spark-pi") + expectationsForStaticAllocation(sparkMetricsService) + } } From e42c03aa1e5bd0034098ec326002fe08cbfb98ea Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 16 Mar 2017 14:01:11 -0700 Subject: [PATCH 2/9] Fix scalastyle --- .../main/scala/org/apache/spark/deploy/kubernetes/Client.scala | 3 ++- .../spark/deploy/kubernetes/SslConfigurationProvider.scala | 3 ++- .../scala/org/apache/spark/deploy/kubernetes/constants.scala | 3 ++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 851ba1b248ff5..82a9069e17da9 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -174,7 +174,8 @@ private[spark] class Client( .withData(Map((SUBMISSION_APP_SECRET_NAME, secretBase64String)).asJava) .withType("Opaque") .done() - val driverPodCredentials = kubernetesClientCredentialsProvider.getDriverPodKubernetesCredentials() + val driverPodCredentials = kubernetesClientCredentialsProvider + .getDriverPodKubernetesCredentials() val resolvedCredentials = kubernetesClient .secrets() .create(driverPodCredentials.credentialsSecret) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SslConfigurationProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SslConfigurationProvider.scala index 1351a2ea1e018..67ce5eecd3257 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SslConfigurationProvider.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SslConfigurationProvider.scala @@ -49,7 +49,8 @@ private[spark] class SslConfigurationProvider( kubernetesResourceCleaner: KubernetesResourceCleaner) { private val SECURE_RANDOM = new SecureRandom() private val sslSecretsName = s"$SUBMISSION_SSL_SECRETS_PREFIX-$kubernetesAppId" - private val sslSecretsDirectory = s"$DRIVER_CONTAINER_SUBMISSION_SECRETS_BASE_DIR/$kubernetesAppId-ssl" + private val sslSecretsDirectory = DRIVER_CONTAINER_SUBMISSION_SECRETS_BASE_DIR + s"/$kubernetesAppId-ssl" def getSslConfiguration(): SslConfiguration = { val (driverSubmitSslOptions, isKeyStoreLocalFile) = parseDriverSubmitSslOptions() diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala index 7ec64315fcfab..7185c52976833 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala @@ -24,7 +24,8 @@ package object constants { private[spark] val SPARK_EXECUTOR_ID_LABEL = "spark-exec-id" // Secrets - private[spark] val DRIVER_CONTAINER_SUBMISSION_SECRETS_BASE_DIR = "/var/run/secrets/spark-submission" + private[spark] val DRIVER_CONTAINER_SUBMISSION_SECRETS_BASE_DIR = + "/var/run/secrets/spark-submission" private[spark] val DRIVER_CONTAINER_KUBERNETES_CREDENTIALS_SECRETS_BASE_DIR = "/var/run/secrets/kubernetes-credentials" private[spark] val DRIVER_CONTAINER_KUBERNETES_CREDENTIALS_SECRET_NAME = From 3420d846f44d849754b79aef2c092cb937a7ec47 Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 16 Mar 2017 14:23:43 -0700 Subject: [PATCH 3/9] Change apiserver -> authentication --- docs/running-on-kubernetes.md | 18 +++++++++--------- .../spark/deploy/kubernetes/config.scala | 4 ++-- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index e51a3db634f64..c351ecc0967d9 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -200,14 +200,14 @@ from the other deployment modes. See the [configuration page](configuration.html - spark.kubernetes.apiserver.submit.caCertFile + spark.kubernetes.authentication.submit.caCertFile (none) CA cert file for connecting to Kubernetes over SSL. This file should be located on the submitting machine's disk. - spark.kubernetes.apiserver.submit.clientKeyFile + spark.kubernetes.authentication.submit.clientKeyFile (none) Client key file for authenticating against the Kubernetes API server. This file should be located on the submitting @@ -215,7 +215,7 @@ from the other deployment modes. See the [configuration page](configuration.html - spark.kubernetes.apiserver.submit.clientCertFile + spark.kubernetes.authentication.submit.clientCertFile (none) Client cert file for authenticating against the Kubernetes API server. This file should be located on the submitting @@ -223,7 +223,7 @@ from the other deployment modes. See the [configuration page](configuration.html - spark.kubernetes.apiserver.submit.oauthToken + spark.kubernetes.authentication.submit.oauthToken (none) OAuth token to use when authenticating against the against the Kubernetes API server. Note that unlike the other @@ -231,7 +231,7 @@ from the other deployment modes. See the [configuration page](configuration.html - spark.kubernetes.apiserver.driver.caCertFile + spark.kubernetes.authentication.driver.caCertFile (none) CA cert file for connecting to Kubernetes over SSL from the driver pod when requesting executors. This file should @@ -239,7 +239,7 @@ from the other deployment modes. See the [configuration page](configuration.html - spark.kubernetes.apiserver.driver.clientKeyFile + spark.kubernetes.authentication.driver.clientKeyFile (none) Client key file for authenticating against the Kubernetes API server from the driver pod when requesting executors. @@ -247,7 +247,7 @@ from the other deployment modes. See the [configuration page](configuration.html - spark.kubernetes.apiserver.driver.clientCertFile + spark.kubernetes.authentication.driver.clientCertFile (none) Client cert file for authenticating against the Kubernetes API server from the driver pod when requesting executors. @@ -255,7 +255,7 @@ from the other deployment modes. See the [configuration page](configuration.html - spark.kubernetes.apiserver.driver.oauthToken + spark.kubernetes.authentication.driver.oauthToken (none) OAuth token to use when authenticating against the against the Kubernetes API server from the driver pod when @@ -264,7 +264,7 @@ from the other deployment modes. See the [configuration page](configuration.html - spark.kubernetes.apiserver.driver.serviceAccountName + spark.kubernetes.authentication.driver.serviceAccountName default Service account that is used when running the driver pod. The driver pod uses this service account when requesting diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index 1548cf82a4ad7..804ae6a9a9e62 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -54,8 +54,8 @@ package object config { .stringConf .createWithDefault(s"spark-executor:$sparkVersion") - private val APISERVER_SUBMIT_CONF_PREFIX = "spark.kubernetes.apiserver.submit" - private val APISERVER_DRIVER_CONF_PREFIX = "spark.kubernetes.apiserver.driver" + private val APISERVER_SUBMIT_CONF_PREFIX = "spark.kubernetes.authentication.submit" + private val APISERVER_DRIVER_CONF_PREFIX = "spark.kubernetes.authentication.driver" private[spark] val KUBERNETES_SUBMIT_CA_CERT_FILE = ConfigBuilder(s"$APISERVER_SUBMIT_CONF_PREFIX.caCertFile") From 9ca7b576a77ebfbdb75099749102a170fb2983e3 Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 16 Mar 2017 19:09:33 -0700 Subject: [PATCH 4/9] Address comments. Also some quality of life fixes, most notably formatting all of the documentation strings in config.scala to no longer use triple quotes. Triple quoted strings are difficult to format consistently. --- docs/running-on-kubernetes.md | 64 +++-- .../spark/deploy/kubernetes/Client.scala | 68 ++--- ...iverPodKubernetesCredentialsProvider.scala | 90 ++----- .../kubernetes/KubernetesClientBuilder.scala | 66 ++--- .../kubernetes/SslConfigurationProvider.scala | 2 +- .../spark/deploy/kubernetes/config.scala | 253 +++++++----------- .../spark/deploy/kubernetes/constants.scala | 2 +- .../rest/KubernetesRestProtocolMessages.scala | 21 +- .../KubernetesSparkRestServer.scala | 50 +++- .../NodePortUrisDriverServiceManager.scala | 4 +- .../KubernetesClusterSchedulerBackend.scala | 9 +- .../integrationtest/KubernetesSuite.scala | 9 +- 12 files changed, 304 insertions(+), 334 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index c351ecc0967d9..80deb4e7aed5c 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -66,7 +66,7 @@ The Spark master, specified either via passing the `--master` command line argum master string with `k8s://` will cause the Spark application to launch on the Kubernetes cluster, with the API server being contacted at `api_server_url`. If no HTTP protocol is specified in the URL, it defaults to `https`. For example, setting the master to `k8s://example.com:443` is equivalent to setting it to `k8s://https://example.com:443`, but to -connect without SSL on a different port, the master would be set to `k8s://http://example.com:8443`. +connect without TLS on a different port, the master would be set to `k8s://http://example.com:8443`. If you have a Kubernetes cluster setup, one way to discover the apiserver URL is by executing `kubectl cluster-info`. @@ -119,20 +119,20 @@ is currently supported. ## Advanced -### Setting Up SSL For Submitting the Driver +### Setting Up TLS For Submitting the Driver When submitting to Kubernetes, a pod is started for the driver, and the pod starts an HTTP server. This HTTP server receives the driver's configuration, including uploaded driver jars, from the client before starting the application. -Spark supports using SSL to encrypt the traffic in this bootstrapping process. It is recommended to configure this +Spark supports using TLS to encrypt the traffic in this bootstrapping process. It is recommended to configure this whenever possible. See the [security page](security.html) and [configuration](configuration.html) sections for more information on -configuring SSL; use the prefix `spark.ssl.kubernetes.submit` in configuring the SSL-related fields in the context +configuring TLS; use the prefix `spark.ssl.kubernetes.submission` in configuring the TLS-related fields in the context of submitting to Kubernetes. For example, to set the trustStore used when the local machine communicates with the driver -pod in starting the application, set `spark.ssl.kubernetes.submit.trustStore`. +pod in starting the application, set `spark.ssl.kubernetes.submission.trustStore`. One note about the keyStore is that it can be specified as either a file on the client machine or a file in the -container image's disk. Thus `spark.ssl.kubernetes.submit.keyStore` can be a URI with a scheme of either `file:` +container image's disk. Thus `spark.ssl.kubernetes.submission.keyStore` can be a URI with a scheme of either `file:` or `local:`. A scheme of `file:` corresponds to the keyStore being located on the client machine; it is mounted onto the driver container as a [secret volume](https://kubernetes.io/docs/user-guide/secrets/). When the URI has the scheme `local:`, the file is assumed to already be on the container's disk at the appropriate path. @@ -200,58 +200,68 @@ from the other deployment modes. See the [configuration page](configuration.html - spark.kubernetes.authentication.submit.caCertFile + spark.kubernetes.authentication.submission.caCertFile (none) - CA cert file for connecting to Kubernetes over SSL. This file should be located on the submitting machine's disk. + Path to the CA cert file for connecting to the Kubernetes API server over TLS when starting the driver. This file + should be located on the submitting machine's disk. Specify this as a path as opposed to a URI (i.e. do not provide + a scheme). - spark.kubernetes.authentication.submit.clientKeyFile + spark.kubernetes.authentication.submission.clientKeyFile (none) - Client key file for authenticating against the Kubernetes API server. This file should be located on the submitting - machine's disk. + Path to the client key file for authenticating against the Kubernetes API server when starting the driver. This file + should be located on the submitting machine's disk. Specify this as a path as opposed to a URI (i.e. do not provide + a scheme). - spark.kubernetes.authentication.submit.clientCertFile + spark.kubernetes.authentication.submission.clientCertFile (none) - Client cert file for authenticating against the Kubernetes API server. This file should be located on the submitting - machine's disk. + Path to the client cert file for authenticating against the Kubernetes API server when starting the driver. This + file should be located on the submitting machine's disk. Specify this as a path as opposed to a URI (i.e. do not + provide a scheme). - spark.kubernetes.authentication.submit.oauthToken + spark.kubernetes.authentication.submission.oauthToken (none) - OAuth token to use when authenticating against the against the Kubernetes API server. Note that unlike the other - authentication options, this should be the exact string value of the token to use for the authentication. + OAuth token to use when authenticating against the Kubernetes API server when starting the driver. Note + that unlike the other authentication options, this should be the exact string value of the token to use for the + authentication. spark.kubernetes.authentication.driver.caCertFile (none) - CA cert file for connecting to Kubernetes over SSL from the driver pod when requesting executors. This file should - be located on the submitting machine's disk, and will be uploaded as a secret to the driver pod. + Path to the CA cert file for connecting to the Kubernetes API server over TLS from the driver pod when requesting + executors. This file should be located on the submitting machine's disk, and will be uploaded to the driver pod. + Specify this as a path as opposed to a URI (i.e. do not provide a scheme). spark.kubernetes.authentication.driver.clientKeyFile (none) - Client key file for authenticating against the Kubernetes API server from the driver pod when requesting executors. - This file should be located on the submitting machine's disk, and will be uploaded as a secret to the driver pod. + Path to the client key file for authenticating against the Kubernetes API server from the driver pod when requesting + executors. This file should be located on the submitting machine's disk, and will be uploaded to the driver pod. + Specify this as a path as opposed to a URI (i.e. do not provide a scheme). If this is specified, it is highly + recommended to set up TLS for the driver submission server, as this value is sensitive information that would be + passed to the driver pod in plaintext otherwise. spark.kubernetes.authentication.driver.clientCertFile (none) - Client cert file for authenticating against the Kubernetes API server from the driver pod when requesting executors. - This file should be located on the submitting machine's disk, and will be uploaded as a secret to the driver pod. + Path to the client cert file for authenticating against the Kubernetes API server from the driver pod when + requesting executors. This file should be located on the submitting machine's disk, and will be uploaded to the + driver pod. Specify this as a path as opposed to a URI (i.e. do not provide a scheme). @@ -260,7 +270,9 @@ from the other deployment modes. See the [configuration page](configuration.html OAuth token to use when authenticating against the against the Kubernetes API server from the driver pod when requesting executors. Note that unlike the other authentication options, this should be the exact string value of - the token to use for the authentication. This token value is mounted as a secret on the driver pod. + the token to use for the authentication. This token value is uploaded to the driver pod. If this is specified, it is + highly recommended to set up TLS for the driver submission server, as this value is sensitive information that would + be passed to the driver pod in plaintext otherwise. @@ -315,7 +327,7 @@ from the other deployment modes. See the [configuration page](configuration.html - spark.kubernetes.driverSubmitTimeout + spark.kubernetes.driverSubmissionTimeout 60s Time to wait for the driver pod to start running before aborting its execution. @@ -330,7 +342,7 @@ from the other deployment modes. See the [configuration page](configuration.html - spark.kubernetes.submit.waitAppCompletion + spark.kubernetes.submission.waitAppCompletion true In cluster mode, whether to wait for the application to finish before exiting the launcher process. When changed to diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 82a9069e17da9..a58f03a35eec4 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -21,7 +21,6 @@ import java.security.SecureRandom import java.util.ServiceLoader import java.util.concurrent.{CountDownLatch, TimeUnit} -import com.google.common.base.Charsets import com.google.common.io.Files import com.google.common.util.concurrent.SettableFuture import io.fabric8.kubernetes.api.model._ @@ -33,7 +32,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.deploy.rest.{AppResource, ContainerAppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, UploadedAppResource} +import org.apache.spark.deploy.rest.{AppResource, ContainerAppResource, KubernetesCreateSubmissionRequest, KubernetesCredentials, RemoteAppResource, UploadedAppResource} import org.apache.spark.deploy.rest.kubernetes._ import org.apache.spark.internal.Logging import org.apache.spark.util.{ShutdownHookManager, Utils} @@ -119,6 +118,7 @@ private[spark] class Client( customAnnotations, KUBERNETES_DRIVER_ANNOTATIONS.key, "annotations") + val driverPodKubernetesCredentials = new DriverPodKubernetesCredentialsProvider(sparkConf).get() var k8ConfBuilder = new K8SConfigBuilder() .withApiVersion("v1") .withMasterUrl(master) @@ -134,8 +134,6 @@ private[spark] class Client( } sparkConf.get(KUBERNETES_SUBMIT_OAUTH_TOKEN).foreach { token => k8ConfBuilder = k8ConfBuilder.withOauthToken(token) - // Remove the oauth token from Spark conf so that its doesn't appear in the Spark UI. - sparkConf.set(KUBERNETES_SUBMIT_OAUTH_TOKEN, "") } val k8ClientConfig = k8ConfBuilder.build @@ -165,8 +163,6 @@ private[spark] class Client( try { val sslConfigurationProvider = new SslConfigurationProvider( sparkConf, kubernetesAppId, kubernetesClient, kubernetesResourceCleaner) - val kubernetesClientCredentialsProvider = new DriverPodKubernetesCredentialsProvider( - sparkConf, kubernetesAppId) val submitServerSecret = kubernetesClient.secrets().createNew() .withNewMetadata() .withName(secretName) @@ -174,12 +170,6 @@ private[spark] class Client( .withData(Map((SUBMISSION_APP_SECRET_NAME, secretBase64String)).asJava) .withType("Opaque") .done() - val driverPodCredentials = kubernetesClientCredentialsProvider - .getDriverPodKubernetesCredentials() - val resolvedCredentials = kubernetesClient - .secrets() - .create(driverPodCredentials.credentialsSecret) - kubernetesResourceCleaner.registerOrUpdateResource(resolvedCredentials) kubernetesResourceCleaner.registerOrUpdateResource(submitServerSecret) val sslConfiguration = sslConfigurationProvider.getSslConfiguration() val (driverPod, driverService) = launchDriverKubernetesComponents( @@ -188,22 +178,21 @@ private[spark] class Client( parsedCustomLabels, parsedCustomAnnotations, submitServerSecret, - sslConfiguration, - driverPodCredentials) + sslConfiguration) configureOwnerReferences( kubernetesClient, submitServerSecret, sslConfiguration.sslSecrets, driverPod, - driverService, - driverPodCredentials.credentialsSecret) + driverService) submitApplicationToDriverServer( kubernetesClient, driverServiceManager, sslConfiguration, driverService, submitterLocalFiles, - submitterLocalJars) + submitterLocalJars, + driverPodKubernetesCredentials) // Now that the application has started, persist the components that were created beyond // the shutdown hook. We still want to purge the one-time secrets, so do not unregister // those. @@ -215,7 +204,7 @@ private[spark] class Client( throw e } finally { Utils.tryLogNonFatalError { - kubernetesResourceCleaner.deleteAllRegisteredResourcesFromKubernetes(kubernetesClient) +// kubernetesResourceCleaner.deleteAllRegisteredResourcesFromKubernetes(kubernetesClient) } Utils.tryLogNonFatalError { driverServiceManager.stop() @@ -250,7 +239,8 @@ private[spark] class Client( sslConfiguration: SslConfiguration, driverService: Service, submitterLocalFiles: Iterable[String], - submitterLocalJars: Iterable[String]): Unit = { + submitterLocalJars: Iterable[String], + driverPodKubernetesCredentials: KubernetesCredentials): Unit = { sparkConf.getOption("spark.app.id").foreach { id => logWarning(s"Warning: Provided app id in spark.app.id as $id will be" + s" overridden as $kubernetesAppId") @@ -262,6 +252,12 @@ private[spark] class Client( sparkConf.setIfMissing("spark.driver.port", DEFAULT_DRIVER_PORT.toString) sparkConf.setIfMissing("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT.toString) + sparkConf.get(KUBERNETES_SUBMIT_OAUTH_TOKEN).foreach { _ => + sparkConf.set(KUBERNETES_SUBMIT_OAUTH_TOKEN, "") + } + sparkConf.get(KUBERNETES_DRIVER_OAUTH_TOKEN).foreach { _ => + sparkConf.set(KUBERNETES_DRIVER_OAUTH_TOKEN, "") + } val driverSubmitter = buildDriverSubmissionClient( kubernetesClient, driverServiceManager, @@ -271,7 +267,10 @@ private[spark] class Client( driverSubmitter.ping() logInfo(s"Submitting local resources to driver pod for application " + s"$kubernetesAppId ...") - val submitRequest = buildSubmissionRequest(submitterLocalFiles, submitterLocalJars) + val submitRequest = buildSubmissionRequest( + submitterLocalFiles, + submitterLocalJars, + driverPodKubernetesCredentials) driverSubmitter.submitApplication(submitRequest) logInfo("Successfully submitted local resources and driver configuration to" + " driver pod.") @@ -299,8 +298,7 @@ private[spark] class Client( customLabels: Map[String, String], customAnnotations: Map[String, String], submitServerSecret: Secret, - sslConfiguration: SslConfiguration, - driverPodKubernetesCredentials: DriverPodKubernetesCredentials): (Pod, Service) = { + sslConfiguration: SslConfiguration): (Pod, Service) = { val driverKubernetesSelectors = (Map( SPARK_DRIVER_LABEL -> kubernetesAppId, SPARK_APP_ID_LABEL -> kubernetesAppId, @@ -333,8 +331,7 @@ private[spark] class Client( driverKubernetesSelectors, customAnnotations, submitServerSecret, - sslConfiguration, - driverPodKubernetesCredentials) + sslConfiguration) waitForReadyKubernetesComponents(kubernetesClient, endpointsReadyFuture, serviceReadyFuture, podReadyFuture) (driverPod, driverService) @@ -354,8 +351,7 @@ private[spark] class Client( submitServerSecret: Secret, sslSecrets: Array[Secret], driverPod: Pod, - driverService: Service, - credentialsSecret: Secret): Service = { + driverService: Service): Service = { val driverPodOwnerRef = new OwnerReferenceBuilder() .withName(driverPod.getMetadata.getName) .withUid(driverPod.getMetadata.getUid) @@ -389,15 +385,6 @@ private[spark] class Client( .endMetadata() .done() kubernetesResourceCleaner.registerOrUpdateResource(updatedService) - val updatedCredentialsSecret = kubernetesClient - .secrets() - .withName(credentialsSecret.getMetadata.getName) - .edit() - .editMetadata() - .addToOwnerReferences(driverPodOwnerRef) - .endMetadata() - .done() - kubernetesResourceCleaner.registerOrUpdateResource(updatedCredentialsSecret) updatedService } @@ -438,8 +425,7 @@ private[spark] class Client( driverKubernetesSelectors: Map[String, String], customAnnotations: Map[String, String], submitServerSecret: Secret, - sslConfiguration: SslConfiguration, - driverPodKubernetesCredentials: DriverPodKubernetesCredentials): Pod = { + sslConfiguration: SslConfiguration): Pod = { val containerPorts = buildContainerPorts() val probePingHttpGet = new HTTPGetActionBuilder() .withScheme(if (sslConfiguration.sslOptions.enabled) "HTTPS" else "HTTP") @@ -466,7 +452,6 @@ private[spark] class Client( .withSecretName(submitServerSecret.getMetadata.getName) .endSecret() .endVolume() - .addToVolumes(driverPodKubernetesCredentials.credentialsSecretVolume) .addToVolumes(sslConfiguration.sslPodVolumes: _*) .withServiceAccount(serviceAccount.getOrElse("default")) .addNewContainer() @@ -478,7 +463,6 @@ private[spark] class Client( .withMountPath(secretDirectory) .withReadOnly(true) .endVolumeMount() - .addToVolumeMounts(driverPodKubernetesCredentials.credentialsSecretVolumeMount) .addToVolumeMounts(sslConfiguration.sslPodVolumeMounts: _*) .addNewEnv() .withName(ENV_SUBMISSION_SECRET_LOCATION) @@ -645,7 +629,8 @@ private[spark] class Client( private def buildSubmissionRequest( submitterLocalFiles: Iterable[String], - submitterLocalJars: Iterable[String]): KubernetesCreateSubmissionRequest = { + submitterLocalJars: Iterable[String], + driverPodKubernetesCredentials: KubernetesCredentials): KubernetesCreateSubmissionRequest = { val mainResourceUri = Utils.resolveURI(mainAppResource) val resolvedAppResource: AppResource = Option(mainResourceUri.getScheme) .getOrElse("file") match { @@ -668,7 +653,8 @@ private[spark] class Client( secret = secretBase64String, sparkProperties = sparkConf.getAll.toMap, uploadedJarsBase64Contents = uploadJarsBase64Contents, - uploadedFilesBase64Contents = uploadFilesBase64Contents) + uploadedFilesBase64Contents = uploadFilesBase64Contents, + driverPodKubernetesCredentials = driverPodKubernetesCredentials) } private def buildDriverSubmissionClient( diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/DriverPodKubernetesCredentialsProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/DriverPodKubernetesCredentialsProvider.scala index 1390c41c4178f..cee47aad79393 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/DriverPodKubernetesCredentialsProvider.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/DriverPodKubernetesCredentialsProvider.scala @@ -18,83 +18,49 @@ package org.apache.spark.deploy.kubernetes import java.io.File -import com.google.common.base.Charsets import com.google.common.io.{BaseEncoding, Files} -import io.fabric8.kubernetes.api.model.{Secret, SecretBuilder, Volume, VolumeBuilder, VolumeMount, VolumeMountBuilder} -import scala.collection.JavaConverters._ -import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.SparkConf import org.apache.spark.deploy.kubernetes.config._ -import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.deploy.rest.KubernetesCredentials import org.apache.spark.internal.config.OptionalConfigEntry -private[spark] case class DriverPodKubernetesCredentials( - credentialsSecret: Secret, - credentialsSecretVolume: Volume, - credentialsSecretVolumeMount: VolumeMount) +private[spark] class DriverPodKubernetesCredentialsProvider(sparkConf: SparkConf) { -private[spark] class DriverPodKubernetesCredentialsProvider( - sparkConf: SparkConf, - kubernetesAppId: String) { - - def getDriverPodKubernetesCredentials(): DriverPodKubernetesCredentials = { - val oauthTokenSecretMapping = sparkConf - .get(KUBERNETES_DRIVER_OAUTH_TOKEN) - .map(token => (DRIVER_CONTAINER_OAUTH_TOKEN_SECRET_NAME, - BaseEncoding.base64().encode(token.getBytes(Charsets.UTF_8)))) - val caCertSecretMapping = convertFileConfToSecretMapping(KUBERNETES_DRIVER_CA_CERT_FILE, - DRIVER_CONTAINER_CA_CERT_FILE_SECRET_NAME) - val clientKeyFileSecretMapping = convertFileConfToSecretMapping( - KUBERNETES_DRIVER_CLIENT_KEY_FILE, DRIVER_CONTAINER_CLIENT_KEY_FILE_SECRET_NAME) - val clientCertFileSecretMapping = convertFileConfToSecretMapping( - KUBERNETES_DRIVER_CLIENT_CERT_FILE, DRIVER_CONTAINER_CLIENT_CERT_FILE_SECRET_NAME) - val secretData = (oauthTokenSecretMapping ++ - caCertSecretMapping ++ - clientKeyFileSecretMapping ++ - clientCertFileSecretMapping).toMap - val credentialsSecret = new SecretBuilder() - .withNewMetadata() - .withName(s"$DRIVER_CONTAINER_KUBERNETES_CREDENTIALS_SECRET_NAME-$kubernetesAppId") - .endMetadata() - .withData(secretData.asJava) - .build() - val credentialsSecretVolume = new VolumeBuilder() - .withName(DRIVER_CONTAINER_KUBERNETES_CREDENTIALS_VOLUME_NAME) - .withNewSecret() - .withSecretName(credentialsSecret.getMetadata.getName) - .endSecret() - .build() - val credentialsSecretVolumeMount = new VolumeMountBuilder() - .withName(credentialsSecretVolume.getName) - .withReadOnly(true) - .withMountPath(DRIVER_CONTAINER_KUBERNETES_CREDENTIALS_SECRETS_BASE_DIR) - .build() - // Cannot use both service account and mounted secrets + def get(): KubernetesCredentials = { sparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME).foreach { _ => - require(oauthTokenSecretMapping.isEmpty, + require(sparkConf.get(KUBERNETES_DRIVER_OAUTH_TOKEN).isEmpty, "Cannot specify both a service account and a driver pod OAuth token.") - require(caCertSecretMapping.isEmpty, + require(sparkConf.get(KUBERNETES_DRIVER_CA_CERT_FILE).isEmpty, "Cannot specify both a service account and a driver pod CA cert file.") - require(clientKeyFileSecretMapping.isEmpty, + require(sparkConf.get(KUBERNETES_DRIVER_CLIENT_KEY_FILE).isEmpty, "Cannot specify both a service account and a driver pod client key file.") - require(clientCertFileSecretMapping.isEmpty, + require(sparkConf.get(KUBERNETES_DRIVER_CLIENT_CERT_FILE).isEmpty, "Cannot specify both a service account and a driver pod client cert file.") } - DriverPodKubernetesCredentials( - credentialsSecret, - credentialsSecretVolume, - credentialsSecretVolumeMount) + val oauthToken = sparkConf.get(KUBERNETES_DRIVER_OAUTH_TOKEN) + val caCertDataBase64 = safeFileConfToBase64(KUBERNETES_DRIVER_CA_CERT_FILE, + s"Driver CA cert file provided at %s does not exist or is not a file.") + val clientKeyDataBase64 = safeFileConfToBase64(KUBERNETES_DRIVER_CLIENT_KEY_FILE, + s"Driver client key file provided at %s does not exist or is not a file.") + val clientCertDataBase64 = safeFileConfToBase64(KUBERNETES_DRIVER_CLIENT_CERT_FILE, + s"Driver client cert file provided at %s does not exist or is not a file.") + val serviceAccountName = sparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME) + KubernetesCredentials( + oauthToken = oauthToken, + caCertDataBase64 = caCertDataBase64, + clientKeyDataBase64 = clientKeyDataBase64, + clientCertDataBase64 = clientCertDataBase64) } - private def convertFileConfToSecretMapping( + private def safeFileConfToBase64( conf: OptionalConfigEntry[String], - secretName: String): Option[(String, String)] = { - sparkConf.get(conf).map(new File(_)).map { file => - if (!file.isFile()) { - throw new SparkException(s"File provided for ${conf.key} at ${file.getAbsolutePath}" + - s" does not exist or is not a file.") + fileNotFoundFormatString: String): Option[String] = { + sparkConf.get(conf) + .map(new File(_)) + .map { file => + require(file.isFile, String.format(fileNotFoundFormatString, file.getAbsolutePath)) + BaseEncoding.base64().encode(Files.toByteArray(file)) } - (secretName, BaseEncoding.base64().encode(Files.toByteArray(file))) - } } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala index 54c1452d58a14..465c9d875c97f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala @@ -22,21 +22,17 @@ import com.google.common.base.Charsets import com.google.common.io.Files import io.fabric8.kubernetes.client.{Config, ConfigBuilder, DefaultKubernetesClient} +import org.apache.spark.SparkConf +import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -private[spark] object KubernetesClientBuilder { +private[spark] class KubernetesClientBuilder(sparkConf: SparkConf, namespace: String) { private val SERVICE_ACCOUNT_TOKEN = new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH) private val SERVICE_ACCOUNT_CA_CERT = new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH) - private val MOUNTED_CREDENTIALS_BASE_DIR = new File( - DRIVER_CONTAINER_KUBERNETES_CREDENTIALS_SECRETS_BASE_DIR) - private val MOUNTED_TOKEN = new File(MOUNTED_CREDENTIALS_BASE_DIR, - DRIVER_CONTAINER_OAUTH_TOKEN_SECRET_NAME) - private val MOUNTED_CA_CERT = new File(MOUNTED_CREDENTIALS_BASE_DIR, - DRIVER_CONTAINER_CA_CERT_FILE_SECRET_NAME) - private val MOUNTED_CLIENT_KEY = new File(MOUNTED_CREDENTIALS_BASE_DIR, - DRIVER_CONTAINER_CLIENT_KEY_FILE_SECRET_NAME) - private val MOUNTED_CLIENT_CERT = new File(MOUNTED_CREDENTIALS_BASE_DIR, - DRIVER_CONTAINER_CLIENT_CERT_FILE_SECRET_NAME) + private val oauthTokenFile = sparkConf.get(KUBERNETES_DRIVER_MOUNTED_OAUTH_TOKEN) + private val caCertFile = sparkConf.get(KUBERNETES_DRIVER_MOUNTED_CA_CERT_FILE) + private val clientKeyFile = sparkConf.get(KUBERNETES_DRIVER_MOUNTED_CLIENT_KEY_FILE) + private val clientCertFile = sparkConf.get(KUBERNETES_DRIVER_MOUNTED_CLIENT_CERT_FILE) /** * Creates a {@link KubernetesClient}, expecting to be from @@ -44,42 +40,46 @@ private[spark] object KubernetesClientBuilder { * are picked up from canonical locations, as they are injected * into the pod's disk space. */ - def buildFromWithinPod(kubernetesNamespace: String): DefaultKubernetesClient = { - var clientConfigBuilder = new ConfigBuilder() + def buildFromWithinPod(): DefaultKubernetesClient = { + val baseClientConfigBuilder = new ConfigBuilder() .withApiVersion("v1") .withMasterUrl(KUBERNETES_MASTER_INTERNAL_URL) - .withNamespace(kubernetesNamespace) + .withNamespace(namespace) - if (MOUNTED_TOKEN.isFile || - MOUNTED_CA_CERT.isFile || - MOUNTED_CLIENT_KEY.isFile || - MOUNTED_CLIENT_CERT.isFile) { - if (MOUNTED_TOKEN.isFile) { - clientConfigBuilder = clientConfigBuilder.withOauthToken( - Files.toString(MOUNTED_TOKEN, Charsets.UTF_8)) + val configBuilder = oauthTokenFile + .orElse(caCertFile) + .orElse(clientKeyFile) + .orElse(clientCertFile) + .map { _ => + var mountedAuthConfigBuilder = baseClientConfigBuilder + oauthTokenFile.foreach { tokenFilePath => + val tokenFile = new File(tokenFilePath) + mountedAuthConfigBuilder = mountedAuthConfigBuilder + .withOauthToken(Files.toString(tokenFile, Charsets.UTF_8)) } - if (MOUNTED_CA_CERT.isFile) { - clientConfigBuilder = clientConfigBuilder.withCaCertFile(MOUNTED_CA_CERT.getAbsolutePath) + caCertFile.foreach { caFile => + mountedAuthConfigBuilder = mountedAuthConfigBuilder.withCaCertFile(caFile) } - if (MOUNTED_CLIENT_KEY.isFile) { - clientConfigBuilder = clientConfigBuilder.withClientKeyFile( - MOUNTED_CLIENT_KEY.getAbsolutePath) + clientKeyFile.foreach { keyFile => + mountedAuthConfigBuilder = mountedAuthConfigBuilder.withClientKeyFile(keyFile) } - if (MOUNTED_CLIENT_CERT.isFile) { - clientConfigBuilder = clientConfigBuilder.withClientCertFile( - MOUNTED_CLIENT_CERT.getAbsolutePath) + clientCertFile.foreach { certFile => + mountedAuthConfigBuilder = mountedAuthConfigBuilder.withClientCertFile(certFile) } - } else { + mountedAuthConfigBuilder + }.getOrElse { + var serviceAccountConfigBuilder = baseClientConfigBuilder if (SERVICE_ACCOUNT_CA_CERT.isFile) { - clientConfigBuilder = clientConfigBuilder.withCaCertFile( + serviceAccountConfigBuilder = serviceAccountConfigBuilder.withCaCertFile( SERVICE_ACCOUNT_CA_CERT.getAbsolutePath) } if (SERVICE_ACCOUNT_TOKEN.isFile) { - clientConfigBuilder = clientConfigBuilder.withOauthToken( + serviceAccountConfigBuilder = serviceAccountConfigBuilder.withOauthToken( Files.toString(SERVICE_ACCOUNT_TOKEN, Charsets.UTF_8)) } + serviceAccountConfigBuilder } - new DefaultKubernetesClient(clientConfigBuilder.build) + new DefaultKubernetesClient(configBuilder.build) } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SslConfigurationProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SslConfigurationProvider.scala index 67ce5eecd3257..4bbe3ed385a4d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SslConfigurationProvider.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SslConfigurationProvider.scala @@ -49,7 +49,7 @@ private[spark] class SslConfigurationProvider( kubernetesResourceCleaner: KubernetesResourceCleaner) { private val SECURE_RANDOM = new SecureRandom() private val sslSecretsName = s"$SUBMISSION_SSL_SECRETS_PREFIX-$kubernetesAppId" - private val sslSecretsDirectory = DRIVER_CONTAINER_SUBMISSION_SECRETS_BASE_DIR + private val sslSecretsDirectory = DRIVER_CONTAINER_SUBMISSION_SECRETS_BASE_DIR + s"/$kubernetesAppId-ssl" def getSslConfiguration(): SslConfiguration = { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index 804ae6a9a9e62..2f80a9f91c9d3 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -27,137 +27,131 @@ package object config { private[spark] val KUBERNETES_NAMESPACE = ConfigBuilder("spark.kubernetes.namespace") - .doc(""" - | The namespace that will be used for running the driver and - | executor pods. When using spark-submit in cluster mode, - | this can also be passed to spark-submit via the - | --kubernetes-namespace command line argument. - """.stripMargin) + .doc("The namespace that will be used for running the driver and executor pods. When using" + + " spark-submit in cluster mode, this can also be passed to spark-submit via the" + + " --kubernetes-namespace command line argument.") .stringConf .createWithDefault("default") private[spark] val DRIVER_DOCKER_IMAGE = ConfigBuilder("spark.kubernetes.driver.docker.image") - .doc(""" - | Docker image to use for the driver. Specify this using the - | standard Docker tag format. - """.stripMargin) + .doc("Docker image to use for the driver. Specify this using the standard Docker tag format.") .stringConf .createWithDefault(s"spark-driver:$sparkVersion") private[spark] val EXECUTOR_DOCKER_IMAGE = ConfigBuilder("spark.kubernetes.executor.docker.image") - .doc(""" - | Docker image to use for the executors. Specify this using - | the standard Docker tag format. - """.stripMargin) + .doc("Docker image to use for the executors. Specify this using the standard Docker tag" + + " format.") .stringConf .createWithDefault(s"spark-executor:$sparkVersion") - private val APISERVER_SUBMIT_CONF_PREFIX = "spark.kubernetes.authentication.submit" + private val APISERVER_SUBMIT_CONF_PREFIX = "spark.kubernetes.authentication.submission" private val APISERVER_DRIVER_CONF_PREFIX = "spark.kubernetes.authentication.driver" private[spark] val KUBERNETES_SUBMIT_CA_CERT_FILE = ConfigBuilder(s"$APISERVER_SUBMIT_CONF_PREFIX.caCertFile") - .doc(""" - | CA cert file for connecting to Kubernetes over SSL when - | creating Kubernetes resources for the driver. This - | file should be located on the submitting machine's disk. - """.stripMargin) + .doc("Path to the CA cert file for connecting to Kubernetes over SSL when creating" + + " Kubernetes resources for the driver. This file should be located on the submitting" + + " machine's disk.") .stringConf .createOptional private[spark] val KUBERNETES_SUBMIT_CLIENT_KEY_FILE = ConfigBuilder(s"$APISERVER_SUBMIT_CONF_PREFIX.clientKeyFile") - .doc(""" - | Client key file for authenticating against the Kubernetes - | API server when initially creating Kubernetes resources for - | the driver. This file should be located on the submitting - | machine's disk. - """.stripMargin) + .doc("Path to the client key file for authenticating against the Kubernetes API server" + + " when initially creating Kubernetes resources for the driver. This file should be" + + " located on the submitting machine's disk.") .stringConf .createOptional private[spark] val KUBERNETES_SUBMIT_CLIENT_CERT_FILE = ConfigBuilder(s"$APISERVER_SUBMIT_CONF_PREFIX.clientCertFile") - .doc(""" - | Client cert file for authenticating against the - | Kubernetes API server when initially creating Kubernetes - | resources for the driver. This file should be located on - | the submitting machine's disk. - """.stripMargin) + .doc("Path to the client cert file for authenticating against the Kubernetes API server" + + " when initially creating Kubernetes resources for the driver. This file should be" + + " located on the submitting machine's disk.") .stringConf .createOptional private[spark] val KUBERNETES_SUBMIT_OAUTH_TOKEN = ConfigBuilder(s"$APISERVER_SUBMIT_CONF_PREFIX.oauthToken") - .doc(""" - | OAuth token to use when authenticating against the - | against the Kubernetes API server when initially creating - | Kubernetes resources for the driver. Note that unlike - | the other authentication options, this should be the - | exact string value of the token to use for the - | authentication. - """.stripMargin) + .doc("OAuth token to use when authenticating against the against the Kubernetes API server" + + " when initially creating Kubernetes resources for the driver. Note that unlike the other" + + " authentication options, this should be the exact string value of the token to use for" + + " the authentication.") .stringConf .createOptional private[spark] val KUBERNETES_DRIVER_CA_CERT_FILE = ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.caCertFile") - .doc(""" - | CA cert file for connecting to Kubernetes over SSL from - | the driver pod when requesting executors. This file should - | be located on the submitting machine's disk, and will be - | uploaded as a secret to the driver pod. - """.stripMargin) + .doc("Path to the CA cert file for connecting to Kubernetes over TLS from the driver pod" + + " when requesting executors. This file should be located on the submitting machine's disk" + + " and will be uploaded to the driver pod.") .stringConf .createOptional private[spark] val KUBERNETES_DRIVER_CLIENT_KEY_FILE = ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.clientKeyFile") - .doc(""" - | Client key file for authenticating against the Kubernetes - | API server from the driver pod when requesting executors. - | This file should be located on the submitting machine's disk, - | and will be uploaded as a secret to the driver pod. - """.stripMargin) + .doc("Path to the client key file for authenticating against the Kubernetes API server from" + + " the driver pod when requesting executors. This file should be located on the submitting" + + " machine's disk, and will be uploaded to the driver pod.") .stringConf .createOptional private[spark] val KUBERNETES_DRIVER_CLIENT_CERT_FILE = ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.clientCertFile") - .doc(""" - | Client cert file for authenticating against the - | Kubernetes API server from the driver pod when requesting - | executors. This file should be located on the submitting - | machine's disk, and will be uploaded as a secret to the - | driver pod. - """.stripMargin) + .doc("Path to the client cert file for authenticating against the Kubernetes API server" + + " from the driver pod when requesting executors. This file should be located on the" + + " submitting machine's disk, and will be uploaded to the driver pod.") .stringConf .createOptional private[spark] val KUBERNETES_DRIVER_OAUTH_TOKEN = ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.oauthToken") - .doc(""" - | OAuth token to use when authenticating against the - | against the Kubernetes API server from the driver pod - | when requesting executors. Note that unlike the other - | authentication options, this should be the exact string - | value of the token to use for the authentication. This - | token value is mounted as a secret on the driver pod. - """.stripMargin) + .doc("OAuth token to use when authenticating against the Kubernetes API server from the" + + " driver pod when requesting executors. Note that unlike the other authentication options" + + " this should be the exact string value of the token to use for the authentication. This" + + " token value is mounted as a secret on the driver pod.") + .stringConf + .createOptional + + private[spark] val KUBERNETES_DRIVER_MOUNTED_CA_CERT_FILE = + ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.mounted.caCertFile") + .doc("Path on the driver pod's disk containing the CA cert file to use when authenticating" + + " against Kubernetes.") + .stringConf + .createOptional + + private[spark] val KUBERNETES_DRIVER_MOUNTED_CLIENT_KEY_FILE = + ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.mounted.clientKeyFile") + .doc("Path on the driver pod's disk containing the client key file to use when" + + " authenticating against Kubernetes.") + .internal() + .stringConf + .createOptional + + private[spark] val KUBERNETES_DRIVER_MOUNTED_CLIENT_CERT_FILE = + ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.mounted.clientCertFile") + .doc("Path on the driver pod's disk containing the client cert file to use when" + + " authenticating against Kubernetes.") + .internal() + .stringConf + .createOptional + + private[spark] val KUBERNETES_DRIVER_MOUNTED_OAUTH_TOKEN = + ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.mounted.oauthTokenFile") + .doc("Path on the driver pod's disk containing the OAuth token file to use when" + + " authenticating against Kubernetes.") + .internal() .stringConf .createOptional private[spark] val KUBERNETES_SERVICE_ACCOUNT_NAME = ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.serviceAccountName") - .doc(""" - | Service account that is used when running the driver pod. - | The driver pod uses this service account when requesting - | executor pods from the API server. If specific credentials - | are given for the driver pod to use, the driver will favor - | using those credentials instead. - """.stripMargin) + .doc("Service account that is used when running the driver pod. The driver pod uses" + + " this service account when requesting executor pods from the API server. If specific" + + " credentials are given for the driver pod to use, the driver will favor" + + "using those credentials instead.") .stringConf .createOptional @@ -166,150 +160,107 @@ package object config { // based on the executor memory. private[spark] val KUBERNETES_EXECUTOR_MEMORY_OVERHEAD = ConfigBuilder("spark.kubernetes.executor.memoryOverhead") - .doc(""" - | The amount of off-heap memory (in megabytes) to be - | allocated per executor. This is memory that accounts for - | things like VM overheads, interned strings, other native - | overheads, etc. This tends to grow with the executor size - | (typically 6-10%). - """.stripMargin) + .doc("The amount of off-heap memory (in megabytes) to be allocated per executor. This" + + " is memory that accounts for things like VM overheads, interned strings, other native" + + " overheads, etc. This tends to grow with the executor size. (typically 6-10%).") .bytesConf(ByteUnit.MiB) .createOptional private[spark] val KUBERNETES_DRIVER_MEMORY_OVERHEAD = ConfigBuilder("spark.kubernetes.driver.memoryOverhead") - .doc(""" - | The amount of off-heap memory (in megabytes) to be - | allocated for the driver and the driver submission server. - | This is memory that accounts for things like VM overheads, - | interned strings, other native overheads, etc. This tends - | to grow with the driver's memory size (typically 6-10%). - """.stripMargin) + .doc("The amount of off-heap memory (in megabytes) to be allocated for the driver and the" + + " driver submission server. This is memory that accounts for things like VM overheads," + + " interned strings, other native overheads, etc. This tends to grow with the driver's" + + " memory size (typically 6-10%).") .bytesConf(ByteUnit.MiB) .createOptional private[spark] val KUBERNETES_DRIVER_LABELS = ConfigBuilder("spark.kubernetes.driver.labels") - .doc(""" - | Custom labels that will be added to the driver pod. - | This should be a comma-separated list of label key-value - | pairs, where each label is in the format key=value. Note - | that Spark also adds its own labels to the driver pod - | for bookkeeping purposes. - """.stripMargin) + .doc("Custom labels that will be added to the driver pod. This should be a comma-separated" + + " list of label key-value pairs, where each label is in the format key=value. Note that" + + " Spark also adds its own labels to the driver pod for bookkeeping purposes.") .stringConf .createOptional private[spark] val KUBERNETES_DRIVER_ANNOTATIONS = ConfigBuilder("spark.kubernetes.driver.annotations") - .doc(""" - | Custom annotations that will be added to the driver pod. - | This should be a comma-separated list of annotation key-value - | pairs, where each annotation is in the format key=value. - """.stripMargin) + .doc("Custom annotations that will be added to the driver pod. This should be a" + + " comma-separated list of annotation key-value pairs, where each annotation is in the" + + " format key=value.") .stringConf .createOptional private[spark] val KUBERNETES_DRIVER_SUBMIT_TIMEOUT = - ConfigBuilder("spark.kubernetes.driverSubmitTimeout") - .doc(""" - | Time to wait for the driver process to start running - | before aborting its execution. - """.stripMargin) + ConfigBuilder("spark.kubernetes.driverSubmissionTimeout") + .doc("Time to wait for the driver process to start running before aborting its execution.") .timeConf(TimeUnit.SECONDS) .createWithDefault(60L) private[spark] val KUBERNETES_DRIVER_SUBMIT_KEYSTORE = - ConfigBuilder("spark.ssl.kubernetes.submit.keyStore") - .doc(""" - | KeyStore file for the driver submission server listening - | on SSL. Can be pre-mounted on the driver container - | or uploaded from the submitting client. - """.stripMargin) + ConfigBuilder("spark.ssl.kubernetes.submission.keyStore") + .doc("KeyStore file for the driver submission server listening on SSL. Can be pre-mounted" + + " on the driver container or uploaded from the submitting client.") .stringConf .createOptional private[spark] val KUBERNETES_DRIVER_SUBMIT_TRUSTSTORE = - ConfigBuilder("spark.ssl.kubernetes.submit.trustStore") - .doc(""" - | TrustStore containing certificates for communicating - | to the driver submission server over SSL. - """.stripMargin) + ConfigBuilder("spark.ssl.kubernetes.submission.trustStore") + .doc("TrustStore containing certificates for communicating to the driver submission server" + + " over SSL.") .stringConf .createOptional private[spark] val DRIVER_SUBMIT_SSL_ENABLED = - ConfigBuilder("spark.ssl.kubernetes.submit.enabled") - .doc(""" - | Whether or not to use SSL when sending the - | application dependencies to the driver pod. - | - """.stripMargin) + ConfigBuilder("spark.ssl.kubernetes.submission.enabled") + .doc("Whether or not to use SSL when sending the application dependencies to the driver pod.") .booleanConf .createWithDefault(false) private[spark] val KUBERNETES_DRIVER_SERVICE_NAME = ConfigBuilder("spark.kubernetes.driver.service.name") - .doc(""" - | Kubernetes service that exposes the driver pod - | for external access. - """.stripMargin) + .doc("Kubernetes service that exposes the driver pod for external access.") .internal() .stringConf .createOptional private[spark] val KUBERNETES_DRIVER_SUBMIT_SERVER_MEMORY = ConfigBuilder("spark.kubernetes.driver.submissionServerMemory") - .doc(""" - | The amount of memory to allocate for the driver submission server. - """.stripMargin) + .doc("The amount of memory to allocate for the driver submission server.") .bytesConf(ByteUnit.MiB) .createWithDefaultString("256m") private[spark] val EXPOSE_KUBERNETES_DRIVER_SERVICE_UI_PORT = ConfigBuilder("spark.kubernetes.driver.service.exposeUiPort") - .doc(""" - | Whether to expose the driver Web UI port as a service NodePort. Turned off by default - | because NodePort is a limited resource. Use alternatives such as Ingress if possible. - """.stripMargin) + .doc("Whether to expose the driver Web UI port as a service NodePort. Turned off by default" + + " because NodePort is a limited resource. Use alternatives if possible.") .booleanConf .createWithDefault(false) private[spark] val KUBERNETES_DRIVER_POD_NAME = ConfigBuilder("spark.kubernetes.driver.pod.name") - .doc(""" - | Name of the driver pod. - """.stripMargin) + .doc("Name of the driver pod.") .internal() .stringConf .createOptional private[spark] val DRIVER_SERVICE_MANAGER_TYPE = ConfigBuilder("spark.kubernetes.driver.serviceManagerType") - .doc(s""" - | A tag indicating which class to use for creating the - | Kubernetes service and determining its URI for the submission - | client. - """.stripMargin) + .doc("A tag indicating which class to use for creating the Kubernetes service and" + + " determining its URI for the submission client.") .stringConf .createWithDefault(NodePortUrisDriverServiceManager.TYPE) private[spark] val WAIT_FOR_APP_COMPLETION = - ConfigBuilder("spark.kubernetes.submit.waitAppCompletion") - .doc( - """ - | In cluster mode, whether to wait for the application to finish before exiting the - | launcher process. - """.stripMargin) + ConfigBuilder("spark.kubernetes.submission.waitAppCompletion") + .doc("In cluster mode, whether to wait for the application to finish before exiting the" + + " launcher process.") .booleanConf .createWithDefault(true) private[spark] val REPORT_INTERVAL = ConfigBuilder("spark.kubernetes.report.interval") - .doc( - """ - | Interval between reports of the current app status in cluster mode. - """.stripMargin) + .doc("Interval between reports of the current app status in cluster mode.") .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("1s") } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala index 7185c52976833..4ad215014de3b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala @@ -84,7 +84,7 @@ package object constants { // Miscellaneous private[spark] val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver" - private[spark] val KUBERNETES_SUBMIT_SSL_NAMESPACE = "kubernetes.submit" + private[spark] val KUBERNETES_SUBMIT_SSL_NAMESPACE = "kubernetes.submission" private[spark] val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc" private[spark] val MEMORY_OVERHEAD_FACTOR = 0.10 private[spark] val MEMORY_OVERHEAD_MIN = 384L diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala index 0d2d1a1c6f5e3..1ea44109c5f5e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala @@ -20,14 +20,21 @@ import com.fasterxml.jackson.annotation.{JsonSubTypes, JsonTypeInfo} import org.apache.spark.SPARK_VERSION +case class KubernetesCredentials( + oauthToken: Option[String], + caCertDataBase64: Option[String], + clientKeyDataBase64: Option[String], + clientCertDataBase64: Option[String]) + case class KubernetesCreateSubmissionRequest( - appResource: AppResource, - mainClass: String, - appArgs: Array[String], - sparkProperties: Map[String, String], - secret: String, - uploadedJarsBase64Contents: TarGzippedData, - uploadedFilesBase64Contents: TarGzippedData) extends SubmitRestProtocolRequest { + appResource: AppResource, + mainClass: String, + appArgs: Array[String], + sparkProperties: Map[String, String], + secret: String, + driverPodKubernetesCredentials: KubernetesCredentials, + uploadedJarsBase64Contents: TarGzippedData, + uploadedFilesBase64Contents: TarGzippedData) extends SubmitRestProtocolRequest { message = "create" clientSparkVersion = SPARK_VERSION } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala index 5952acc0d5916..0d81292da0368 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala @@ -29,9 +29,11 @@ import org.apache.commons.codec.binary.Base64 import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import org.apache.spark.{SecurityManager, SPARK_VERSION => sparkVersion, SparkConf, SSLOptions} +import org.apache.spark.{SSLOptions, SecurityManager, SparkConf, SPARK_VERSION => sparkVersion} import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.rest._ +import org.apache.spark.internal.config.OptionalConfigEntry import org.apache.spark.util.{ShutdownHookManager, ThreadUtils, Utils} private case class KubernetesSparkRestServerArguments( @@ -152,6 +154,7 @@ private[spark] class KubernetesSparkRestServer( appArgs, sparkProperties, secret, + driverPodKubernetesCredentials, uploadedJars, uploadedFiles) => val decodedSecret = Base64.decodeBase64(secret) @@ -163,6 +166,7 @@ private[spark] class KubernetesSparkRestServer( val resolvedAppResource = resolveAppResource(appResource, tempDir) val writtenJars = writeUploadedJars(uploadedJars, tempDir) val writtenFiles = writeUploadedFiles(uploadedFiles) + val resolvedSparkProperties = new mutable.HashMap[String, String] resolvedSparkProperties ++= sparkProperties val originalJars = sparkProperties.get("spark.jars") @@ -214,6 +218,8 @@ private[spark] class KubernetesSparkRestServer( } else { resolvedSparkProperties.remove("spark.files") } + resolvedSparkProperties ++= writeKubernetesCredentials( + driverPodKubernetesCredentials, tempDir) val command = new ArrayBuffer[String] command += javaExecutable @@ -280,6 +286,48 @@ private[spark] class KubernetesSparkRestServer( CompressionUtils.unpackAndWriteCompressedFiles(files, workingDir) } + private def writeKubernetesCredentials( + kubernetesCredentials: KubernetesCredentials, + rootTempDir: File): Map[String, String] = { + val resolvedDirectory = new File(rootTempDir, "kubernetes-credentials") + if (!resolvedDirectory.mkdir()) { + throw new IllegalStateException(s"Failed to create credentials dir at " + + resolvedDirectory.getAbsolutePath) + } + val oauthTokenFile = writeRawStringCredentialAndGetConf("oauth-token.txt", resolvedDirectory, + KUBERNETES_DRIVER_MOUNTED_OAUTH_TOKEN, kubernetesCredentials.oauthToken) + val caCertFile = writeBase64CredentialAndGetConf("ca.crt", resolvedDirectory, + KUBERNETES_DRIVER_MOUNTED_CA_CERT_FILE, kubernetesCredentials.caCertDataBase64) + val clientKeyFile = writeBase64CredentialAndGetConf("key.key", resolvedDirectory, + KUBERNETES_DRIVER_MOUNTED_CLIENT_KEY_FILE, kubernetesCredentials.clientKeyDataBase64) + val clientCertFile = writeBase64CredentialAndGetConf("cert.crt", resolvedDirectory, + KUBERNETES_DRIVER_MOUNTED_CLIENT_CERT_FILE, kubernetesCredentials.clientCertDataBase64) + (oauthTokenFile ++ caCertFile ++ clientKeyFile ++ clientCertFile).toMap + } + + private def writeRawStringCredentialAndGetConf( + fileName: String, + dir: File, + conf: OptionalConfigEntry[String], + credential: Option[String]): Option[(String, String)] = { + credential.map { cred => + val credentialFile = new File(dir, fileName) + Files.write(cred, credentialFile, Charsets.UTF_8) + (conf.key, credentialFile.getAbsolutePath) + } + } + + private def writeBase64CredentialAndGetConf( + fileName: String, + dir: File, + conf: OptionalConfigEntry[String], + credential: Option[String]): Option[(String, String)] = { + credential.map { cred => + val credentialFile = new File(dir, fileName) + Files.write(BaseEncoding.base64().decode(cred), credentialFile) + (conf.key, credentialFile.getAbsolutePath) + } + } /** * Retrieve the path on the driver container where the main app resource is, and what value it diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/NodePortUrisDriverServiceManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/NodePortUrisDriverServiceManager.scala index fa8362677f38f..1416476824793 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/NodePortUrisDriverServiceManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/NodePortUrisDriverServiceManager.scala @@ -39,8 +39,8 @@ private[spark] class NodePortUrisDriverServiceManager extends DriverServiceManag val urlScheme = if (sparkConf.get(DRIVER_SUBMIT_SSL_ENABLED)) { "https" } else { - logWarning("Submitting application details, application secret, and local" + - " jars to the cluster over an insecure connection. You should configure SSL" + + logWarning("Submitting application details, application secret, Kubernetes credentials," + + " and local jars to the cluster over an insecure connection. You should configure SSL" + " to secure this step.") "http" } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 90907ff83ed84..234829a541c30 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -16,17 +16,14 @@ */ package org.apache.spark.scheduler.cluster.kubernetes -import java.util.UUID -import java.util.concurrent.Executors import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} -import com.google.common.util.concurrent.ThreadFactoryBuilder import io.fabric8.kubernetes.api.model.{ContainerPortBuilder, EnvVarBuilder, Pod, QuantityBuilder} import scala.collection.JavaConverters._ import scala.concurrent.{ExecutionContext, Future} import org.apache.spark.{SparkContext, SparkException} -import org.apache.spark.deploy.kubernetes.{Client, KubernetesClientBuilder} +import org.apache.spark.deploy.kubernetes.KubernetesClientBuilder import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ import org.apache.spark.rpc.RpcEndpointAddress @@ -76,8 +73,8 @@ private[spark] class KubernetesClusterSchedulerBackend( private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("kubernetes-executor-requests")) - private val kubernetesClient = KubernetesClientBuilder - .buildFromWithinPod(kubernetesNamespace) + private val kubernetesClient = new KubernetesClientBuilder(conf, kubernetesNamespace) + .buildFromWithinPod() private val driverPod = try { kubernetesClient.pods().inNamespace(kubernetesNamespace). diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index 8bc9e527b6e9f..16564ca746b40 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -135,7 +135,10 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { .delete }) // spark-submit sets system properties so we have to clear them - new SparkConf(true).getAll.map(_._1).foreach { System.clearProperty } + new SparkConf(true) + .getAll.map(_._1) + .filter(_ != "spark.docker.test.persistMinikube") + .foreach { System.clearProperty } } override def afterAll(): Unit = { @@ -263,8 +266,8 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { test("Enable SSL on the driver submit server") { sparkConf.set(KUBERNETES_DRIVER_SUBMIT_KEYSTORE, s"file://${keyStoreFile.getAbsolutePath}") - sparkConf.set("spark.ssl.kubernetes.submit.keyStorePassword", "changeit") - sparkConf.set("spark.ssl.kubernetes.submit.keyPassword", "changeit") + sparkConf.set("spark.ssl.kubernetes.submission.keyStorePassword", "changeit") + sparkConf.set("spark.ssl.kubernetes.submission.keyPassword", "changeit") sparkConf.set(KUBERNETES_DRIVER_SUBMIT_TRUSTSTORE, s"file://${trustStoreFile.getAbsolutePath}") sparkConf.set(DRIVER_SUBMIT_SSL_ENABLED, true) From 5a8ba57507f8e0e597530b85269b02366116e30a Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 16 Mar 2017 19:11:39 -0700 Subject: [PATCH 5/9] Fix scalastyle --- .../deploy/rest/kubernetes/KubernetesSparkRestServer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala index 0d81292da0368..2670ffd21c27a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala @@ -29,7 +29,7 @@ import org.apache.commons.codec.binary.Base64 import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import org.apache.spark.{SSLOptions, SecurityManager, SparkConf, SPARK_VERSION => sparkVersion} +import org.apache.spark.{SecurityManager, SPARK_VERSION => sparkVersion, SparkConf, SSLOptions} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.rest._ From caf247d6a5e707346930be2e28850b1e668ad721 Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 16 Mar 2017 19:13:56 -0700 Subject: [PATCH 6/9] Fix comment --- .../spark/deploy/kubernetes/KubernetesClientBuilder.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala index 465c9d875c97f..554ed17ff25c4 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala @@ -35,10 +35,8 @@ private[spark] class KubernetesClientBuilder(sparkConf: SparkConf, namespace: St private val clientCertFile = sparkConf.get(KUBERNETES_DRIVER_MOUNTED_CLIENT_CERT_FILE) /** - * Creates a {@link KubernetesClient}, expecting to be from - * within the context of a pod. When doing so, credentials files - * are picked up from canonical locations, as they are injected - * into the pod's disk space. + * Creates a {@link KubernetesClient}, expecting to be from within the context of a pod. When + * doing so, service account token files can be picked up from canonical locations. */ def buildFromWithinPod(): DefaultKubernetesClient = { val baseClientConfigBuilder = new ConfigBuilder() From b06b88c866435eef921f4ef1a6b35c2dc3bd7dd8 Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 16 Mar 2017 19:17:03 -0700 Subject: [PATCH 7/9] Remove unnecessary constants --- .../org/apache/spark/deploy/kubernetes/constants.scala | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala index 4ad215014de3b..23d216e799fff 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala @@ -26,16 +26,6 @@ package object constants { // Secrets private[spark] val DRIVER_CONTAINER_SUBMISSION_SECRETS_BASE_DIR = "/var/run/secrets/spark-submission" - private[spark] val DRIVER_CONTAINER_KUBERNETES_CREDENTIALS_SECRETS_BASE_DIR = - "/var/run/secrets/kubernetes-credentials" - private[spark] val DRIVER_CONTAINER_KUBERNETES_CREDENTIALS_SECRET_NAME = - "driver-kubernetes-credentials" - private[spark] val DRIVER_CONTAINER_KUBERNETES_CREDENTIALS_VOLUME_NAME = - "driver-kubernetes-credentials-volume" - private[spark] val DRIVER_CONTAINER_OAUTH_TOKEN_SECRET_NAME = "client-oauth-token" - private[spark] val DRIVER_CONTAINER_CLIENT_KEY_FILE_SECRET_NAME = "client-key" - private[spark] val DRIVER_CONTAINER_CLIENT_CERT_FILE_SECRET_NAME = "client-cert" - private[spark] val DRIVER_CONTAINER_CA_CERT_FILE_SECRET_NAME = "ca-cert" private[spark] val SUBMISSION_APP_SECRET_NAME = "spark-submission-server-secret" private[spark] val SUBMISSION_APP_SECRET_PREFIX = "spark-submission-server-secret" private[spark] val SUBMISSION_APP_SECRET_VOLUME_NAME = "spark-submission-secret-volume" From 75c714f3673280f3170046bed7afe7bda8f8cc0f Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 16 Mar 2017 19:17:49 -0700 Subject: [PATCH 8/9] Remove unnecessary whitespace --- .../spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala index 2670ffd21c27a..4688521a59d38 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala @@ -166,7 +166,6 @@ private[spark] class KubernetesSparkRestServer( val resolvedAppResource = resolveAppResource(appResource, tempDir) val writtenJars = writeUploadedJars(uploadedJars, tempDir) val writtenFiles = writeUploadedFiles(uploadedFiles) - val resolvedSparkProperties = new mutable.HashMap[String, String] resolvedSparkProperties ++= sparkProperties val originalJars = sparkProperties.get("spark.jars") From 2f33ff34419b29cec4a58872230c32993a95efe2 Mon Sep 17 00:00:00 2001 From: mcheah Date: Fri, 17 Mar 2017 11:27:18 -0700 Subject: [PATCH 9/9] Authentication -> Authenticate --- docs/running-on-kubernetes.md | 36 +++++++++---------- .../spark/deploy/kubernetes/Client.scala | 2 +- .../spark/deploy/kubernetes/config.scala | 6 ++-- 3 files changed, 22 insertions(+), 22 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 80deb4e7aed5c..dcfa70a85a970 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -200,83 +200,83 @@ from the other deployment modes. See the [configuration page](configuration.html - spark.kubernetes.authentication.submission.caCertFile + spark.kubernetes.authenticate.submission.caCertFile (none) Path to the CA cert file for connecting to the Kubernetes API server over TLS when starting the driver. This file - should be located on the submitting machine's disk. Specify this as a path as opposed to a URI (i.e. do not provide + must be located on the submitting machine's disk. Specify this as a path as opposed to a URI (i.e. do not provide a scheme). - spark.kubernetes.authentication.submission.clientKeyFile + spark.kubernetes.authenticate.submission.clientKeyFile (none) Path to the client key file for authenticating against the Kubernetes API server when starting the driver. This file - should be located on the submitting machine's disk. Specify this as a path as opposed to a URI (i.e. do not provide + must be located on the submitting machine's disk. Specify this as a path as opposed to a URI (i.e. do not provide a scheme). - spark.kubernetes.authentication.submission.clientCertFile + spark.kubernetes.authenticate.submission.clientCertFile (none) Path to the client cert file for authenticating against the Kubernetes API server when starting the driver. This - file should be located on the submitting machine's disk. Specify this as a path as opposed to a URI (i.e. do not + file must be located on the submitting machine's disk. Specify this as a path as opposed to a URI (i.e. do not provide a scheme). - spark.kubernetes.authentication.submission.oauthToken + spark.kubernetes.authenticate.submission.oauthToken (none) OAuth token to use when authenticating against the Kubernetes API server when starting the driver. Note - that unlike the other authentication options, this should be the exact string value of the token to use for the - authentication. + that unlike the other authentication options, this is expected to be the exact string value of the token to use for + the authentication. - spark.kubernetes.authentication.driver.caCertFile + spark.kubernetes.authenticate.driver.caCertFile (none) Path to the CA cert file for connecting to the Kubernetes API server over TLS from the driver pod when requesting - executors. This file should be located on the submitting machine's disk, and will be uploaded to the driver pod. + executors. This file must be located on the submitting machine's disk, and will be uploaded to the driver pod. Specify this as a path as opposed to a URI (i.e. do not provide a scheme). - spark.kubernetes.authentication.driver.clientKeyFile + spark.kubernetes.authenticate.driver.clientKeyFile (none) Path to the client key file for authenticating against the Kubernetes API server from the driver pod when requesting - executors. This file should be located on the submitting machine's disk, and will be uploaded to the driver pod. + executors. This file must be located on the submitting machine's disk, and will be uploaded to the driver pod. Specify this as a path as opposed to a URI (i.e. do not provide a scheme). If this is specified, it is highly recommended to set up TLS for the driver submission server, as this value is sensitive information that would be passed to the driver pod in plaintext otherwise. - spark.kubernetes.authentication.driver.clientCertFile + spark.kubernetes.authenticate.driver.clientCertFile (none) Path to the client cert file for authenticating against the Kubernetes API server from the driver pod when - requesting executors. This file should be located on the submitting machine's disk, and will be uploaded to the + requesting executors. This file must be located on the submitting machine's disk, and will be uploaded to the driver pod. Specify this as a path as opposed to a URI (i.e. do not provide a scheme). - spark.kubernetes.authentication.driver.oauthToken + spark.kubernetes.authenticate.driver.oauthToken (none) OAuth token to use when authenticating against the against the Kubernetes API server from the driver pod when - requesting executors. Note that unlike the other authentication options, this should be the exact string value of + requesting executors. Note that unlike the other authentication options, this must be the exact string value of the token to use for the authentication. This token value is uploaded to the driver pod. If this is specified, it is highly recommended to set up TLS for the driver submission server, as this value is sensitive information that would be passed to the driver pod in plaintext otherwise. - spark.kubernetes.authentication.driver.serviceAccountName + spark.kubernetes.authenticate.driver.serviceAccountName default Service account that is used when running the driver pod. The driver pod uses this service account when requesting diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index a58f03a35eec4..e6b2e31568653 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -204,7 +204,7 @@ private[spark] class Client( throw e } finally { Utils.tryLogNonFatalError { -// kubernetesResourceCleaner.deleteAllRegisteredResourcesFromKubernetes(kubernetesClient) + kubernetesResourceCleaner.deleteAllRegisteredResourcesFromKubernetes(kubernetesClient) } Utils.tryLogNonFatalError { driverServiceManager.stop() diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index 2f80a9f91c9d3..e33c761ecc8d1 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -46,8 +46,8 @@ package object config { .stringConf .createWithDefault(s"spark-executor:$sparkVersion") - private val APISERVER_SUBMIT_CONF_PREFIX = "spark.kubernetes.authentication.submission" - private val APISERVER_DRIVER_CONF_PREFIX = "spark.kubernetes.authentication.driver" + private val APISERVER_SUBMIT_CONF_PREFIX = "spark.kubernetes.authenticate.submission" + private val APISERVER_DRIVER_CONF_PREFIX = "spark.kubernetes.authenticate.driver" private[spark] val KUBERNETES_SUBMIT_CA_CERT_FILE = ConfigBuilder(s"$APISERVER_SUBMIT_CONF_PREFIX.caCertFile") @@ -151,7 +151,7 @@ package object config { .doc("Service account that is used when running the driver pod. The driver pod uses" + " this service account when requesting executor pods from the API server. If specific" + " credentials are given for the driver pod to use, the driver will favor" + - "using those credentials instead.") + " using those credentials instead.") .stringConf .createOptional