diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 488efbe5eef36..e9002bdfe0502 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -450,6 +450,69 @@ from the other deployment modes. See the [configuration page](configuration.html client cert file, and/or OAuth token. + + spark.kubernetes.authenticate.resourceStagingServer.caCertFile + (none) + + Path to the CA cert file for connecting to the Kubernetes API server over TLS from the resource staging server when + it monitors objects in determining when to clean up resource bundles. + + + + spark.kubernetes.authenticate.resourceStagingServer.clientKeyFile + (none) + + Path to the client key file for authenticating against the Kubernetes API server from the resource staging server + when it monitors objects in determining when to clean up resource bundles. The resource staging server must have + credentials that allow it to view API objects in any namespace. + + + + spark.kubernetes.authenticate.resourceStagingServer.clientCertFile + (none) + + Path to the client cert file for authenticating against the Kubernetes API server from the resource staging server + when it monitors objects in determining when to clean up resource bundles. The resource staging server must have + credentials that allow it to view API objects in any namespace. + + + + spark.kubernetes.authenticate.resourceStagingServer.oauthToken + (none) + + OAuth token value for authenticating against the Kubernetes API server from the resource staging server + when it monitors objects in determining when to clean up resource bundles. The resource staging server must have + credentials that allow it to view API objects in any namespace. Note that this cannot be set at the same time as + spark.kubernetes.authenticate.resourceStagingServer.oauthTokenFile. + + + + spark.kubernetes.authenticate.resourceStagingServer.oauthTokenFile + (none) + + File containing the OAuth token to use when authenticating against the against the Kubernetes API server from the + resource staging server, when it monitors objects in determining when to clean up resource bundles. The resource + staging server must have credentials that allow it to view API objects in any namespace. Note that this cannot be + set at the same time as spark.kubernetes.authenticate.resourceStagingServer.oauthToken. + + + + spark.kubernetes.authenticate.resourceStagingServer.useServiceAccountCredentials + true + + Whether or not to use a service account token and a service account CA certificate when the resource staging server + authenticates to Kubernetes. If this is set, interactions with Kubernetes will authenticate using a token located at + /var/run/secrets/kubernetes.io/serviceaccount/token and the CA certificate located at + /var/run/secrets/kubernetes.io/serviceaccount/ca.crt. Note that if + spark.kubernetes.authenticate.resourceStagingServer.oauthTokenFile is set, it takes precedence + over the usage of the service account token file. Also, if + spark.kubernetes.authenticate.resourceStagingServer.caCertFile is set, it takes precedence over using + the service account's CA certificate file. This generally should be set to true (the default value) when the + resource staging server is deployed as a Kubernetes pod, but should be set to false if the resource staging server + is deployed by other means (i.e. when running the staging server process outside of Kubernetes). The resource + staging server must have credentials that allow it to view API objects in any namespace. + + spark.kubernetes.executor.memoryOverhead executorMemory * 0.10, with minimum of 384 diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesExternalShuffleService.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesExternalShuffleService.scala index 94292dae10f29..01a8a9a6899fd 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesExternalShuffleService.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesExternalShuffleService.scala @@ -17,10 +17,11 @@ package org.apache.spark.deploy.kubernetes +import java.io.File import java.nio.ByteBuffer import io.fabric8.kubernetes.api.model.Pod -import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watch, Watcher} +import io.fabric8.kubernetes.client.{Config, KubernetesClient, KubernetesClientException, Watch, Watcher} import io.fabric8.kubernetes.client.Watcher.Action import org.apache.commons.io.IOUtils import scala.collection.JavaConverters._ @@ -28,13 +29,13 @@ import scala.collection.mutable import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.deploy.ExternalShuffleService +import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ import org.apache.spark.internal.Logging import org.apache.spark.network.client.{RpcResponseCallback, TransportClient} import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, RegisterDriver} import org.apache.spark.network.util.TransportConf -import org.apache.spark.scheduler.cluster.kubernetes.DriverPodKubernetesClientProvider /** * An RPC endpoint that receives registration requests from Spark drivers running on Kubernetes. @@ -42,19 +43,16 @@ import org.apache.spark.scheduler.cluster.kubernetes.DriverPodKubernetesClientPr */ private[spark] class KubernetesShuffleBlockHandler ( transportConf: TransportConf, - kubernetesClientProvider: DriverPodKubernetesClientProvider) + kubernetesClient: KubernetesClient) extends ExternalShuffleBlockHandler(transportConf, null) with Logging { private val INIT_AND_STOP_LOCK = new Object private val CONNECTED_APPS_LOCK = new Object private val connectedApps = mutable.Set.empty[String] private var shuffleWatch: Option[Watch] = None - private var kubernetesClient: Option[KubernetesClient] = None def start(): Unit = INIT_AND_STOP_LOCK.synchronized { - val client = kubernetesClientProvider.get - shuffleWatch = startShuffleWatcher(client) - kubernetesClient = Some(client) + shuffleWatch = startShuffleWatcher() } override def close(): Unit = { @@ -64,8 +62,7 @@ private[spark] class KubernetesShuffleBlockHandler ( INIT_AND_STOP_LOCK.synchronized { shuffleWatch.foreach(IOUtils.closeQuietly) shuffleWatch = None - kubernetesClient.foreach(IOUtils.closeQuietly) - kubernetesClient = None + IOUtils.closeQuietly(kubernetesClient) } } } @@ -90,9 +87,9 @@ private[spark] class KubernetesShuffleBlockHandler ( } } - private def startShuffleWatcher(client: KubernetesClient): Option[Watch] = { + private def startShuffleWatcher(): Option[Watch] = { try { - Some(client + Some(kubernetesClient .pods() .withLabels(Map(SPARK_ROLE_LABEL -> "driver").asJava) .watch(new Watcher[Pod] { @@ -137,31 +134,47 @@ private[spark] class KubernetesShuffleBlockHandler ( */ private[spark] class KubernetesExternalShuffleService( conf: SparkConf, - securityManager: SecurityManager, - kubernetesClientProvider: DriverPodKubernetesClientProvider) + securityManager: SecurityManager) extends ExternalShuffleService(conf, securityManager) { private var shuffleBlockHandlers: mutable.Buffer[KubernetesShuffleBlockHandler] = _ protected override def newShuffleBlockHandler( tConf: TransportConf): ExternalShuffleBlockHandler = { - val newBlockHandler = new KubernetesShuffleBlockHandler(tConf, kubernetesClientProvider) - newBlockHandler.start() - - // TODO: figure out a better way of doing this. - // This is necessary because the constructor is not called - // when this class is initialized through ExternalShuffleService. - if (shuffleBlockHandlers == null) { + val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient( + conf.get(KUBERNETES_SHUFFLE_APISERVER_URI), + None, + APISERVER_AUTH_SHUFFLE_SERVICE_CONF_PREFIX, + conf, + Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)) + .filter( _ => conf.get(KUBERNETES_SHUFFLE_USE_SERVICE_ACCOUNT_CREDENTIALS)), + Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)) + .filter( _ => conf.get(KUBERNETES_SHUFFLE_USE_SERVICE_ACCOUNT_CREDENTIALS))) + val newBlockHandler = new KubernetesShuffleBlockHandler(tConf, kubernetesClient) + try { + newBlockHandler.start() + // TODO: figure out a better way of doing this. + // This is necessary because the constructor is not called + // when this class is initialized through ExternalShuffleService. + if (shuffleBlockHandlers == null) { shuffleBlockHandlers = mutable.Buffer.empty[KubernetesShuffleBlockHandler] + } + shuffleBlockHandlers += newBlockHandler + newBlockHandler + } catch { + case e: Throwable => + logError("Failed to create Kubernetes shuffle block handler.", e) + newBlockHandler.close() + throw e } - shuffleBlockHandlers += newBlockHandler - newBlockHandler } override def stop(): Unit = { try { super.stop() } finally { - shuffleBlockHandlers.foreach(_.close()) + if (shuffleBlockHandlers != null) { + shuffleBlockHandlers.foreach(_.close()) + } } } } @@ -169,10 +182,7 @@ private[spark] class KubernetesExternalShuffleService( private[spark] object KubernetesExternalShuffleService extends Logging { def main(args: Array[String]): Unit = { ExternalShuffleService.main(args, - (conf: SparkConf, sm: SecurityManager) => { - val kubernetesClientProvider = new DriverPodKubernetesClientProvider(conf) - new KubernetesExternalShuffleService(conf, sm, kubernetesClientProvider) - }) + (conf: SparkConf, sm: SecurityManager) => new KubernetesExternalShuffleService(conf, sm)) } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkKubernetesClientFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkKubernetesClientFactory.scala new file mode 100644 index 0000000000000..d2729a2db2fa0 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkKubernetesClientFactory.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes + +import java.io.File + +import com.google.common.base.Charsets +import com.google.common.io.Files +import io.fabric8.kubernetes.client.{Config, ConfigBuilder, DefaultKubernetesClient, KubernetesClient} +import io.fabric8.kubernetes.client.utils.HttpClientUtils +import okhttp3.Dispatcher + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.util.ThreadUtils + +/** + * Spark-opinionated builder for Kubernetes clients. It uses a prefix plus common suffixes to + * parse configuration keys, similar to the manner in which Spark's SecurityManager parses SSL + * options for different components. + */ +private[spark] object SparkKubernetesClientFactory { + + def createKubernetesClient( + master: String, + namespace: Option[String], + kubernetesAuthConfPrefix: String, + sparkConf: SparkConf, + maybeServiceAccountToken: Option[File], + maybeServiceAccountCaCert: Option[File]): KubernetesClient = { + val oauthTokenFileConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_FILE_CONF_SUFFIX" + val oauthTokenConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_CONF_SUFFIX" + val oauthTokenFile = sparkConf.getOption(oauthTokenFileConf) + .map(new File(_)) + .orElse(maybeServiceAccountToken) + val oauthTokenValue = sparkConf.getOption(oauthTokenConf) + OptionRequirements.requireNandDefined( + oauthTokenFile, + oauthTokenValue, + s"Cannot specify OAuth token through both a file $oauthTokenFileConf and a" + + s" value $oauthTokenConf.") + + val caCertFile = sparkConf + .getOption(s"$kubernetesAuthConfPrefix.$CA_CERT_FILE_CONF_SUFFIX") + .orElse(maybeServiceAccountCaCert.map(_.getAbsolutePath)) + val clientKeyFile = sparkConf + .getOption(s"$kubernetesAuthConfPrefix.$CLIENT_KEY_FILE_CONF_SUFFIX") + val clientCertFile = sparkConf + .getOption(s"$kubernetesAuthConfPrefix.$CLIENT_CERT_FILE_CONF_SUFFIX") + val dispatcher = new Dispatcher( + ThreadUtils.newDaemonCachedThreadPool("kubernetes-dispatcher")) + val config = new ConfigBuilder() + .withApiVersion("v1") + .withMasterUrl(master) + .withWebsocketPingInterval(0) + .withOption(oauthTokenValue) { + (token, configBuilder) => configBuilder.withOauthToken(token) + }.withOption(oauthTokenFile) { + (file, configBuilder) => + configBuilder.withOauthToken(Files.toString(file, Charsets.UTF_8)) + }.withOption(caCertFile) { + (file, configBuilder) => configBuilder.withCaCertFile(file) + }.withOption(clientKeyFile) { + (file, configBuilder) => configBuilder.withClientKeyFile(file) + }.withOption(clientCertFile) { + (file, configBuilder) => configBuilder.withClientCertFile(file) + }.withOption(namespace) { + (ns, configBuilder) => configBuilder.withNamespace(ns) + }.build() + val baseHttpClient = HttpClientUtils.createHttpClient(config) + val httpClientWithCustomDispatcher = baseHttpClient.newBuilder() + .dispatcher(dispatcher) + .build() + new DefaultKubernetesClient(httpClientWithCustomDispatcher, config) + } + + private implicit class OptionConfigurableConfigBuilder(configBuilder: ConfigBuilder) { + + def withOption[T] + (option: Option[T]) + (configurator: ((T, ConfigBuilder) => ConfigBuilder)): OptionConfigurableConfigBuilder = { + new OptionConfigurableConfigBuilder(option.map { opt => + configurator(opt, configBuilder) + }.getOrElse(configBuilder)) + } + + def build(): Config = configBuilder.build() + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index d1341b15afaca..dd99e0f7a5ae0 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -47,120 +47,32 @@ package object config extends Logging { .stringConf .createWithDefault(s"spark-executor:$sparkVersion") - private val APISERVER_SUBMIT_CONF_PREFIX = "spark.kubernetes.authenticate.submission" - private val APISERVER_DRIVER_CONF_PREFIX = "spark.kubernetes.authenticate.driver" - - private[spark] val KUBERNETES_SUBMIT_CA_CERT_FILE = - ConfigBuilder(s"$APISERVER_SUBMIT_CONF_PREFIX.caCertFile") - .doc("Path to the CA cert file for connecting to Kubernetes over SSL when creating" + - " Kubernetes resources for the driver. This file should be located on the submitting" + - " machine's disk.") - .stringConf - .createOptional - - private[spark] val KUBERNETES_SUBMIT_CLIENT_KEY_FILE = - ConfigBuilder(s"$APISERVER_SUBMIT_CONF_PREFIX.clientKeyFile") - .doc("Path to the client key file for authenticating against the Kubernetes API server" + - " when initially creating Kubernetes resources for the driver. This file should be" + - " located on the submitting machine's disk.") - .stringConf - .createOptional - - private[spark] val KUBERNETES_SUBMIT_CLIENT_CERT_FILE = - ConfigBuilder(s"$APISERVER_SUBMIT_CONF_PREFIX.clientCertFile") - .doc("Path to the client cert file for authenticating against the Kubernetes API server" + - " when initially creating Kubernetes resources for the driver. This file should be" + - " located on the submitting machine's disk.") - .stringConf - .createOptional - - private[spark] val KUBERNETES_SUBMIT_OAUTH_TOKEN = - ConfigBuilder(s"$APISERVER_SUBMIT_CONF_PREFIX.oauthToken") - .doc("OAuth token to use when authenticating against the against the Kubernetes API server" + - " when initially creating Kubernetes resources for the driver. Note that unlike the other" + - " authentication options, this should be the exact string value of the token to use for" + - " the authentication.") - .stringConf - .createOptional - - private[spark] val KUBERNETES_DRIVER_CA_CERT_FILE = - ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.caCertFile") - .doc("Path to the CA cert file for connecting to Kubernetes over TLS from the driver pod" + - " when requesting executors. This file should be located on the submitting machine's disk" + - " and will be uploaded to the driver pod.") - .stringConf - .createOptional - - private[spark] val KUBERNETES_DRIVER_CLIENT_KEY_FILE = - ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.clientKeyFile") - .doc("Path to the client key file for authenticating against the Kubernetes API server from" + - " the driver pod when requesting executors. This file should be located on the submitting" + - " machine's disk, and will be uploaded to the driver pod.") - .stringConf - .createOptional - - private[spark] val KUBERNETES_DRIVER_CLIENT_CERT_FILE = - ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.clientCertFile") - .doc("Path to the client cert file for authenticating against the Kubernetes API server" + - " from the driver pod when requesting executors. This file should be located on the" + - " submitting machine's disk, and will be uploaded to the driver pod.") - .stringConf - .createOptional - - private[spark] val KUBERNETES_DRIVER_OAUTH_TOKEN = - ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.oauthToken") - .doc("OAuth token to use when authenticating against the Kubernetes API server from the" + - " driver pod when requesting executors. Note that unlike the other authentication options" + - " this should be the exact string value of the token to use for the authentication. This" + - " token value is mounted as a secret on the driver pod.") - .stringConf - .createOptional - - private[spark] val KUBERNETES_DRIVER_MOUNTED_CA_CERT_FILE = - ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.mounted.caCertFile") - .doc("Path on the driver pod's disk containing the CA cert file to use when authenticating" + - " against Kubernetes. Typically this is configured by spark-submit from mounting a" + - " secret from the submitting machine into the pod, and hence this configuration is marked" + - " as internal, but this can also be set manually to use a certificate that is mounted" + - " into the driver pod via other means.") - .stringConf - .createOptional - - private[spark] val KUBERNETES_DRIVER_MOUNTED_CLIENT_KEY_FILE = - ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.mounted.clientKeyFile") - .doc("Path on the driver pod's disk containing the client key file to use when" + - " authenticating against Kubernetes. Typically this is configured by spark-submit from" + - " mounting a secret from the submitting machine into the pod, and hence this" + - " configuration is marked as internal, but this can also be set manually to" + - " use a key file that is mounted into the driver pod via other means.") - .internal() - .stringConf - .createOptional - - private[spark] val KUBERNETES_DRIVER_MOUNTED_CLIENT_CERT_FILE = - ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.mounted.clientCertFile") - .doc("Path on the driver pod's disk containing the client cert file to use when" + - " authenticating against Kubernetes. Typically this is configured by spark-submit from" + - " mounting a secret from the submitting machine into the pod, and hence this" + - " configuration is marked as internal, but this can also be set manually to" + - " use a certificate that is mounted into the driver pod via other means.") - .internal() - .stringConf - .createOptional - - private[spark] val KUBERNETES_DRIVER_MOUNTED_OAUTH_TOKEN = - ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.mounted.oauthTokenFile") - .doc("Path on the driver pod's disk containing the OAuth token file to use when" + - " authenticating against Kubernetes. Typically this is configured by spark-submit from" + - " mounting a secret from the submitting machine into the pod, and hence this" + - " configuration is marked as internal, but this can also be set manually to" + - " use a token that is mounted into the driver pod via other means.") - .internal() - .stringConf - .createOptional + private[spark] val APISERVER_AUTH_SUBMISSION_CONF_PREFIX = + "spark.kubernetes.authenticate.submission" + private[spark] val APISERVER_AUTH_DRIVER_CONF_PREFIX = + "spark.kubernetes.authenticate.driver" + private[spark] val APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX = + "spark.kubernetes.authenticate.driver.mounted" + private[spark] val APISERVER_AUTH_RESOURCE_STAGING_SERVER_CONF_PREFIX = + "spark.kubernetes.authenticate.resourceStagingServer" + private[spark] val APISERVER_AUTH_SHUFFLE_SERVICE_CONF_PREFIX = + "spark.kubernetes.authenticate.shuffleService" + private[spark] val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken" + private[spark] val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile" + private[spark] val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile" + private[spark] val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile" + private[spark] val CA_CERT_FILE_CONF_SUFFIX = "caCertFile" + + private[spark] val RESOURCE_STAGING_SERVER_USE_SERVICE_ACCOUNT_CREDENTIALS = + ConfigBuilder( + s"$APISERVER_AUTH_RESOURCE_STAGING_SERVER_CONF_PREFIX.useServiceAccountCredentials") + .doc("Use a service account token and CA certificate in the resource staging server to" + + " watch the API server's objects.") + .booleanConf + .createWithDefault(true) private[spark] val KUBERNETES_SERVICE_ACCOUNT_NAME = - ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.serviceAccountName") + ConfigBuilder(s"$APISERVER_AUTH_DRIVER_CONF_PREFIX.serviceAccountName") .doc("Service account that is used when running the driver pod. The driver pod uses" + " this service account when requesting executor pods from the API server. If specific" + " credentials are given for the driver pod to use, the driver will favor" + @@ -259,6 +171,19 @@ package object config extends Logging { .stringConf .createOptional + private[spark] val KUBERNETES_SHUFFLE_APISERVER_URI = + ConfigBuilder("spark.kubernetes.shuffle.apiServer.url") + .doc("URL to the Kubernetes API server that the shuffle service will monitor for Spark pods.") + .stringConf + .createWithDefault(KUBERNETES_MASTER_INTERNAL_URL) + + private[spark] val KUBERNETES_SHUFFLE_USE_SERVICE_ACCOUNT_CREDENTIALS = + ConfigBuilder(s"$APISERVER_AUTH_SHUFFLE_SERVICE_CONF_PREFIX.useServiceAccountCredentials") + .doc("Whether or not to use service account credentials when contacting the API server from" + + " the shuffle service.") + .booleanConf + .createWithDefault(true) + private[spark] val KUBERNETES_ALLOCATION_BATCH_SIZE = ConfigBuilder("spark.kubernetes.allocation.batch.size") .doc("Number of pods to launch at once in each round of dynamic allocation. ") @@ -285,12 +210,36 @@ package object config extends Logging { .createWithDefaultString("1s") // Spark resource staging server. + private[spark] val RESOURCE_STAGING_SERVER_API_SERVER_URL = + ConfigBuilder("spark.kubernetes.resourceStagingServer.apiServer.url") + .doc("URL for the Kubernetes API server. The resource staging server monitors the API" + + " server to check when pods no longer are using mounted resources. Note that this isn't" + + " to be used in Spark applications, as the API server URL should be set via spark.master.") + .stringConf + .createWithDefault(KUBERNETES_MASTER_INTERNAL_URL) + + private[spark] val RESOURCE_STAGING_SERVER_API_SERVER_CA_CERT_FILE = + ConfigBuilder("spark.kubernetes.resourceStagingServer.apiServer.caCertFile") + .doc("CA certificate for the resource staging server to use when contacting the Kubernetes" + + " API server over TLS.") + .stringConf + .createOptional + private[spark] val RESOURCE_STAGING_SERVER_PORT = ConfigBuilder("spark.kubernetes.resourceStagingServer.port") .doc("Port for the Kubernetes resource staging server to listen on.") .intConf .createWithDefault(10000) + private[spark] val RESOURCE_STAGING_SERVER_INITIAL_ACCESS_EXPIRATION_TIMEOUT = + ConfigBuilder("spark.kubernetes.resourceStagingServer.initialAccessExpirationTimeout") + .doc("The resource staging server will wait for any resource bundle to be accessed for a" + + " first time for this period. If this timeout expires before the resources are accessed" + + " the first time, the resources are cleaned up under the assumption that the dependents" + + " of the given resource bundle failed to launch at all.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("30m") + private[spark] val RESOURCE_STAGING_SERVER_KEY_PEM = ConfigBuilder("spark.ssl.kubernetes.resourceStagingServer.keyPem") .doc("Key PEM file to use when having the Kubernetes dependency server listen on TLS.") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala index dc8a6da45495e..85dac3df57b4c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala @@ -20,10 +20,11 @@ import java.io.File import java.util.Collections import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, OwnerReferenceBuilder, PodBuilder} +import io.fabric8.kubernetes.client.KubernetesClient import scala.collection.JavaConverters._ -import org.apache.spark.SparkConf -import org.apache.spark.deploy.kubernetes.ConfigurationUtils +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.kubernetes.{ConfigurationUtils, SparkKubernetesClientFactory} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ import org.apache.spark.deploy.rest.kubernetes.ResourceStagingServerSslOptionsProviderImpl @@ -42,18 +43,18 @@ import org.apache.spark.util.Utils * where different steps of submission should be factored out into separate classes. */ private[spark] class Client( - appName: String, - kubernetesAppId: String, - mainClass: String, - sparkConf: SparkConf, - appArgs: Array[String], - sparkJars: Seq[String], - sparkFiles: Seq[String], - waitForAppCompletion: Boolean, - kubernetesClientProvider: SubmissionKubernetesClientProvider, - initContainerComponentsProvider: DriverInitContainerComponentsProvider, - kubernetesCredentialsMounterProvider: DriverPodKubernetesCredentialsMounterProvider, - loggingPodStatusWatcher: LoggingPodStatusWatcher) + appName: String, + kubernetesAppId: String, + mainClass: String, + sparkConf: SparkConf, + appArgs: Array[String], + sparkJars: Seq[String], + sparkFiles: Seq[String], + waitForAppCompletion: Boolean, + kubernetesClient: KubernetesClient, + initContainerComponentsProvider: DriverInitContainerComponentsProvider, + kubernetesCredentialsMounterProvider: DriverPodKubernetesCredentialsMounterProvider, + loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging { private val kubernetesDriverPodName = sparkConf.get(KUBERNETES_DRIVER_POD_NAME) @@ -89,142 +90,134 @@ private[spark] class Client( val parsedCustomAnnotations = ConfigurationUtils.parseKeyValuePairs( customAnnotations, KUBERNETES_DRIVER_ANNOTATIONS.key, "annotations") - Utils.tryWithResource(kubernetesClientProvider.get) { kubernetesClient => - val driverExtraClasspathEnv = driverExtraClasspath.map { classPath => - new EnvVarBuilder() - .withName(ENV_SUBMIT_EXTRA_CLASSPATH) - .withValue(classPath) - .build() - } - val driverContainer = new ContainerBuilder() - .withName(DRIVER_CONTAINER_NAME) - .withImage(driverDockerImage) - .withImagePullPolicy("IfNotPresent") - .addToEnv(driverExtraClasspathEnv.toSeq: _*) - .addNewEnv() - .withName(ENV_DRIVER_MEMORY) - .withValue(driverContainerMemoryWithOverhead + "m") - .endEnv() - .addNewEnv() - .withName(ENV_DRIVER_MAIN_CLASS) - .withValue(mainClass) - .endEnv() - .addNewEnv() - .withName(ENV_DRIVER_ARGS) - .withValue(appArgs.mkString(" ")) - .endEnv() + val driverExtraClasspathEnv = driverExtraClasspath.map { classPath => + new EnvVarBuilder() + .withName(ENV_SUBMIT_EXTRA_CLASSPATH) + .withValue(classPath) .build() - val basePod = new PodBuilder() - .withNewMetadata() - .withName(kubernetesDriverPodName) - .addToLabels(allLabels.asJava) - .addToAnnotations(parsedCustomAnnotations.asJava) - .endMetadata() - .withNewSpec() - .withRestartPolicy("Never") - .addToContainers(driverContainer) - .endSpec() + } + val driverContainer = new ContainerBuilder() + .withName(DRIVER_CONTAINER_NAME) + .withImage(driverDockerImage) + .withImagePullPolicy("IfNotPresent") + .addToEnv(driverExtraClasspathEnv.toSeq: _*) + .addNewEnv() + .withName(ENV_DRIVER_MEMORY) + .withValue(driverContainerMemoryWithOverhead + "m") + .endEnv() + .addNewEnv() + .withName(ENV_DRIVER_MAIN_CLASS) + .withValue(mainClass) + .endEnv() + .addNewEnv() + .withName(ENV_DRIVER_ARGS) + .withValue(appArgs.mkString(" ")) + .endEnv() + .build() + val basePod = new PodBuilder() + .withNewMetadata() + .withName(kubernetesDriverPodName) + .addToLabels(allLabels.asJava) + .addToAnnotations(parsedCustomAnnotations.asJava) + .endMetadata() + .withNewSpec() + .withRestartPolicy("Never") + .addToContainers(driverContainer) + .endSpec() - val maybeSubmittedDependencyUploader = initContainerComponentsProvider + val maybeSubmittedDependencyUploader = initContainerComponentsProvider .provideInitContainerSubmittedDependencyUploader(allLabels) - val maybeSubmittedResourceIdentifiers = maybeSubmittedDependencyUploader.map { uploader => - SubmittedResources(uploader.uploadJars(), uploader.uploadFiles()) - } - val maybeSecretBuilder = initContainerComponentsProvider - .provideSubmittedDependenciesSecretBuilder( - maybeSubmittedResourceIdentifiers.map(_.secrets())) - val maybeSubmittedDependenciesSecret = maybeSecretBuilder.map(_.build()) - val initContainerConfigMap = initContainerComponentsProvider + val maybeSubmittedResourceIdentifiers = maybeSubmittedDependencyUploader.map { uploader => + SubmittedResources(uploader.uploadJars(), uploader.uploadFiles()) + } + val maybeSecretBuilder = initContainerComponentsProvider + .provideSubmittedDependenciesSecretBuilder( + maybeSubmittedResourceIdentifiers.map(_.secrets())) + val maybeSubmittedDependenciesSecret = maybeSecretBuilder.map(_.build()) + val initContainerConfigMap = initContainerComponentsProvider .provideInitContainerConfigMapBuilder(maybeSubmittedResourceIdentifiers.map(_.ids())) .build() - val podWithInitContainer = initContainerComponentsProvider + val podWithInitContainer = initContainerComponentsProvider .provideInitContainerBootstrap() .bootstrapInitContainerAndVolumes(driverContainer.getName, basePod) - val containerLocalizedFilesResolver = initContainerComponentsProvider - .provideContainerLocalizedFilesResolver() - val resolvedSparkJars = containerLocalizedFilesResolver.resolveSubmittedSparkJars() - val resolvedSparkFiles = containerLocalizedFilesResolver.resolveSubmittedSparkFiles() + val containerLocalizedFilesResolver = initContainerComponentsProvider + .provideContainerLocalizedFilesResolver() + val resolvedSparkJars = containerLocalizedFilesResolver.resolveSubmittedSparkJars() + val resolvedSparkFiles = containerLocalizedFilesResolver.resolveSubmittedSparkFiles() - val executorInitContainerConfiguration = initContainerComponentsProvider - .provideExecutorInitContainerConfiguration() - val sparkConfWithExecutorInit = executorInitContainerConfiguration - .configureSparkConfForExecutorInitContainer(sparkConf) - val credentialsMounter = kubernetesCredentialsMounterProvider - .getDriverPodKubernetesCredentialsMounter() - val credentialsSecret = credentialsMounter.createCredentialsSecret() - val podWithInitContainerAndMountedCreds = credentialsMounter.mountDriverKubernetesCredentials( - podWithInitContainer, driverContainer.getName, credentialsSecret) - val resolvedSparkConf = credentialsMounter.setDriverPodKubernetesCredentialLocations( - sparkConfWithExecutorInit) - if (resolvedSparkJars.nonEmpty) { - resolvedSparkConf.set("spark.jars", resolvedSparkJars.mkString(",")) - } - if (resolvedSparkFiles.nonEmpty) { - resolvedSparkConf.set("spark.files", resolvedSparkFiles.mkString(",")) - } - resolvedSparkConf.setIfMissing(KUBERNETES_DRIVER_POD_NAME, kubernetesDriverPodName) - resolvedSparkConf.set("spark.app.id", kubernetesAppId) - // We don't need this anymore since we just set the JVM options on the environment - resolvedSparkConf.remove(org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS) - resolvedSparkConf.get(KUBERNETES_SUBMIT_OAUTH_TOKEN).foreach { _ => - resolvedSparkConf.set(KUBERNETES_SUBMIT_OAUTH_TOKEN.key, "") - } - resolvedSparkConf.get(KUBERNETES_DRIVER_OAUTH_TOKEN).foreach { _ => - resolvedSparkConf.set(KUBERNETES_DRIVER_OAUTH_TOKEN.key, "") - } - val resolvedLocalClasspath = containerLocalizedFilesResolver - .resolveSubmittedAndRemoteSparkJars() - val resolvedDriverJavaOpts = resolvedSparkConf.getAll.map { - case (confKey, confValue) => s"-D$confKey=$confValue" - }.mkString(" ") + driverJavaOptions.map(" " + _).getOrElse("") - val resolvedDriverPod = podWithInitContainerAndMountedCreds.editSpec() - .editMatchingContainer(new ContainerNameEqualityPredicate(driverContainer.getName)) - .addNewEnv() - .withName(ENV_MOUNTED_CLASSPATH) - .withValue(resolvedLocalClasspath.mkString(File.pathSeparator)) - .endEnv() - .addNewEnv() - .withName(ENV_DRIVER_JAVA_OPTS) - .withValue(resolvedDriverJavaOpts) - .endEnv() - .endContainer() - .endSpec() - .build() - Utils.tryWithResource( - kubernetesClient - .pods() - .withName(resolvedDriverPod.getMetadata.getName) - .watch(loggingPodStatusWatcher)) { _ => - val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) - try { - val driverOwnedResources = Seq(initContainerConfigMap) ++ - maybeSubmittedDependenciesSecret.toSeq ++ - credentialsSecret.toSeq - val driverPodOwnerReference = new OwnerReferenceBuilder() - .withName(createdDriverPod.getMetadata.getName) - .withApiVersion(createdDriverPod.getApiVersion) - .withUid(createdDriverPod.getMetadata.getUid) - .withKind(createdDriverPod.getKind) - .withController(true) - .build() - driverOwnedResources.foreach { resource => - val originalMetadata = resource.getMetadata - originalMetadata.setOwnerReferences(Collections.singletonList(driverPodOwnerReference)) - } - kubernetesClient.resourceList(driverOwnedResources: _*).createOrReplace() - } catch { - case e: Throwable => - kubernetesClient.pods().delete(createdDriverPod) - throw e - } - if (waitForAppCompletion) { - logInfo(s"Waiting for application $kubernetesAppId to finish...") - loggingPodStatusWatcher.awaitCompletion() - logInfo(s"Application $kubernetesAppId finished.") - } else { - logInfo(s"Deployed Spark application $kubernetesAppId into Kubernetes.") + val executorInitContainerConfiguration = initContainerComponentsProvider + .provideExecutorInitContainerConfiguration() + val sparkConfWithExecutorInit = executorInitContainerConfiguration + .configureSparkConfForExecutorInitContainer(sparkConf) + val credentialsMounter = kubernetesCredentialsMounterProvider + .getDriverPodKubernetesCredentialsMounter() + val credentialsSecret = credentialsMounter.createCredentialsSecret() + val podWithInitContainerAndMountedCreds = credentialsMounter.mountDriverKubernetesCredentials( + podWithInitContainer, driverContainer.getName, credentialsSecret) + val resolvedSparkConf = credentialsMounter.setDriverPodKubernetesCredentialLocations( + sparkConfWithExecutorInit) + if (resolvedSparkJars.nonEmpty) { + resolvedSparkConf.set("spark.jars", resolvedSparkJars.mkString(",")) + } + if (resolvedSparkFiles.nonEmpty) { + resolvedSparkConf.set("spark.files", resolvedSparkFiles.mkString(",")) + } + resolvedSparkConf.setIfMissing(KUBERNETES_DRIVER_POD_NAME, kubernetesDriverPodName) + resolvedSparkConf.set("spark.app.id", kubernetesAppId) + // We don't need this anymore since we just set the JVM options on the environment + resolvedSparkConf.remove(org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS) + val resolvedLocalClasspath = containerLocalizedFilesResolver + .resolveSubmittedAndRemoteSparkJars() + val resolvedDriverJavaOpts = resolvedSparkConf.getAll.map { + case (confKey, confValue) => s"-D$confKey=$confValue" + }.mkString(" ") + driverJavaOptions.map(" " + _).getOrElse("") + val resolvedDriverPod = podWithInitContainerAndMountedCreds.editSpec() + .editMatchingContainer(new ContainerNameEqualityPredicate(driverContainer.getName)) + .addNewEnv() + .withName(ENV_MOUNTED_CLASSPATH) + .withValue(resolvedLocalClasspath.mkString(File.pathSeparator)) + .endEnv() + .addNewEnv() + .withName(ENV_DRIVER_JAVA_OPTS) + .withValue(resolvedDriverJavaOpts) + .endEnv() + .endContainer() + .endSpec() + .build() + Utils.tryWithResource( + kubernetesClient + .pods() + .withName(resolvedDriverPod.getMetadata.getName) + .watch(loggingPodStatusWatcher)) { _ => + val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) + try { + val driverOwnedResources = Seq(initContainerConfigMap) ++ + maybeSubmittedDependenciesSecret.toSeq ++ + credentialsSecret.toSeq + val driverPodOwnerReference = new OwnerReferenceBuilder() + .withName(createdDriverPod.getMetadata.getName) + .withApiVersion(createdDriverPod.getApiVersion) + .withUid(createdDriverPod.getMetadata.getUid) + .withKind(createdDriverPod.getKind) + .withController(true) + .build() + driverOwnedResources.foreach { resource => + val originalMetadata = resource.getMetadata + originalMetadata.setOwnerReferences(Collections.singletonList(driverPodOwnerReference)) } + kubernetesClient.resourceList(driverOwnedResources: _*).createOrReplace() + } catch { + case e: Throwable => + kubernetesClient.pods().delete(createdDriverPod) + throw e + } + if (waitForAppCompletion) { + logInfo(s"Waiting for application $kubernetesAppId to finish...") + loggingPodStatusWatcher.awaitCompletion() + logInfo(s"Application $kubernetesAppId finished.") + } else { + logInfo(s"Deployed Spark application $kubernetesAppId into Kubernetes.") } } } @@ -268,27 +261,43 @@ private[spark] object Client { val appName = sparkConf.getOption("spark.app.name") .getOrElse("spark") val kubernetesAppId = s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-") + val namespace = sparkConf.get(KUBERNETES_NAMESPACE) + val master = resolveK8sMaster(sparkConf.get("spark.master")) val sslOptionsProvider = new ResourceStagingServerSslOptionsProviderImpl(sparkConf) val initContainerComponentsProvider = new DriverInitContainerComponentsProviderImpl( - sparkConf, kubernetesAppId, sparkJars, sparkFiles, sslOptionsProvider.getSslOptions) - val kubernetesClientProvider = new SubmissionKubernetesClientProviderImpl(sparkConf) - val kubernetesCredentialsMounterProvider = - new DriverPodKubernetesCredentialsMounterProviderImpl(sparkConf, kubernetesAppId) - val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION) - val loggingInterval = Option(sparkConf.get(REPORT_INTERVAL)).filter( _ => waitForAppCompletion) - val loggingPodStatusWatcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, loggingInterval) - new Client( - appName, - kubernetesAppId, - mainClass, - sparkConf, - appArgs, - sparkJars, - sparkFiles, - waitForAppCompletion, - kubernetesClientProvider, - initContainerComponentsProvider, - kubernetesCredentialsMounterProvider, - loggingPodStatusWatcher).run() + sparkConf, + kubernetesAppId, + namespace, + sparkJars, + sparkFiles, + sslOptionsProvider.getSslOptions) + Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient( + master, + Some(namespace), + APISERVER_AUTH_SUBMISSION_CONF_PREFIX, + sparkConf, + None, + None)) { kubernetesClient => + val kubernetesCredentialsMounterProvider = + new DriverPodKubernetesCredentialsMounterProviderImpl(sparkConf, kubernetesAppId) + val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION) + val loggingInterval = Option(sparkConf.get(REPORT_INTERVAL)) + .filter( _ => waitForAppCompletion) + val loggingPodStatusWatcher = new LoggingPodStatusWatcherImpl( + kubernetesAppId, loggingInterval) + new Client( + appName, + kubernetesAppId, + mainClass, + sparkConf, + appArgs, + sparkJars, + sparkFiles, + waitForAppCompletion, + kubernetesClient, + initContainerComponentsProvider, + kubernetesCredentialsMounterProvider, + loggingPodStatusWatcher).run() + } } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverInitContainerComponentsProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverInitContainerComponentsProvider.scala index 7fbb0c9274bf5..ccb349c5b2988 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverInitContainerComponentsProvider.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverInitContainerComponentsProvider.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.deploy.kubernetes.submit +import java.io.File + import org.apache.spark.{SparkConf, SSLOptions} import org.apache.spark.deploy.kubernetes.{InitContainerResourceStagingServerSecretPluginImpl, OptionRequirements, SparkPodInitContainerBootstrap, SparkPodInitContainerBootstrapImpl} import org.apache.spark.deploy.kubernetes.config._ @@ -46,6 +48,7 @@ private[spark] trait DriverInitContainerComponentsProvider { private[spark] class DriverInitContainerComponentsProviderImpl( sparkConf: SparkConf, kubernetesAppId: String, + namespace: String, sparkJars: Seq[String], sparkFiles: Seq[String], resourceStagingServerExternalSslOptions: SSLOptions) @@ -98,7 +101,6 @@ private[spark] class DriverInitContainerComponentsProviderImpl( private val maybeSecretName = maybeResourceStagingServerUri.map { _ => s"$kubernetesAppId-init-secret" } - private val namespace = sparkConf.get(KUBERNETES_NAMESPACE) private val configMapName = s"$kubernetesAppId-init-config" private val configMapKey = s"$kubernetesAppId-init-config-key" private val initContainerImage = sparkConf.get(INIT_CONTAINER_DOCKER_IMAGE) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverPodKubernetesCredentialsMounter.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverPodKubernetesCredentialsMounter.scala index ded0237732ce0..b13800f389605 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverPodKubernetesCredentialsMounter.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverPodKubernetesCredentialsMounter.scala @@ -53,41 +53,50 @@ private[spark] trait DriverPodKubernetesCredentialsMounter { } private[spark] class DriverPodKubernetesCredentialsMounterImpl( - kubernetesAppId: String, - submitterLocalDriverPodKubernetesCredentials: KubernetesCredentials, - maybeUserSpecifiedMountedClientKeyFile: Option[String], - maybeUserSpecifiedMountedClientCertFile: Option[String], - maybeUserSpecifiedMountedOAuthTokenFile: Option[String], - maybeUserSpecifiedMountedCaCertFile: Option[String]) + kubernetesAppId: String, + submitterLocalDriverPodKubernetesCredentials: KubernetesCredentials, + maybeUserSpecifiedMountedClientKeyFile: Option[String], + maybeUserSpecifiedMountedClientCertFile: Option[String], + maybeUserSpecifiedMountedOAuthTokenFile: Option[String], + maybeUserSpecifiedMountedCaCertFile: Option[String]) extends DriverPodKubernetesCredentialsMounter { override def setDriverPodKubernetesCredentialLocations(sparkConf: SparkConf): SparkConf = { val resolvedMountedClientKeyFile = resolveSecretLocation( - maybeUserSpecifiedMountedClientKeyFile, - submitterLocalDriverPodKubernetesCredentials.clientKeyDataBase64, - DRIVER_CREDENTIALS_CLIENT_KEY_PATH) + maybeUserSpecifiedMountedClientKeyFile, + submitterLocalDriverPodKubernetesCredentials.clientKeyDataBase64, + DRIVER_CREDENTIALS_CLIENT_KEY_PATH) val resolvedMountedClientCertFile = resolveSecretLocation( - maybeUserSpecifiedMountedClientCertFile, - submitterLocalDriverPodKubernetesCredentials.clientCertDataBase64, - DRIVER_CREDENTIALS_CLIENT_CERT_PATH) + maybeUserSpecifiedMountedClientCertFile, + submitterLocalDriverPodKubernetesCredentials.clientCertDataBase64, + DRIVER_CREDENTIALS_CLIENT_CERT_PATH) val resolvedMountedCaCertFile = resolveSecretLocation( - maybeUserSpecifiedMountedCaCertFile, - submitterLocalDriverPodKubernetesCredentials.caCertDataBase64, - DRIVER_CREDENTIALS_CA_CERT_PATH) + maybeUserSpecifiedMountedCaCertFile, + submitterLocalDriverPodKubernetesCredentials.caCertDataBase64, + DRIVER_CREDENTIALS_CA_CERT_PATH) val resolvedMountedOAuthTokenFile = resolveSecretLocation( - maybeUserSpecifiedMountedOAuthTokenFile, - submitterLocalDriverPodKubernetesCredentials.oauthTokenBase64, - DRIVER_CREDENTIALS_OAUTH_TOKEN_PATH) + maybeUserSpecifiedMountedOAuthTokenFile, + submitterLocalDriverPodKubernetesCredentials.oauthTokenBase64, + DRIVER_CREDENTIALS_OAUTH_TOKEN_PATH) val sparkConfWithCredentialLocations = sparkConf.clone() - .setOption(KUBERNETES_DRIVER_MOUNTED_CA_CERT_FILE, resolvedMountedCaCertFile) - .setOption(KUBERNETES_DRIVER_MOUNTED_CLIENT_KEY_FILE, resolvedMountedClientKeyFile) - .setOption(KUBERNETES_DRIVER_MOUNTED_CLIENT_CERT_FILE, resolvedMountedClientCertFile) - .setOption(KUBERNETES_DRIVER_MOUNTED_OAUTH_TOKEN, resolvedMountedOAuthTokenFile) - sparkConfWithCredentialLocations.get(KUBERNETES_DRIVER_OAUTH_TOKEN).foreach { _ => - sparkConfWithCredentialLocations.set(KUBERNETES_DRIVER_OAUTH_TOKEN, "") - } - sparkConfWithCredentialLocations.get(KUBERNETES_SUBMIT_OAUTH_TOKEN).foreach { _ => - sparkConfWithCredentialLocations.set(KUBERNETES_SUBMIT_OAUTH_TOKEN, "") + .setOption( + s"$APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX", + resolvedMountedCaCertFile) + .setOption( + s"$APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX", + resolvedMountedClientKeyFile) + .setOption( + s"$APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX", + resolvedMountedClientCertFile) + .setOption( + s"$APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX", + resolvedMountedOAuthTokenFile) + // Redact all OAuth token values + sparkConfWithCredentialLocations + .getAll + .filter(_._1.endsWith(OAUTH_TOKEN_CONF_SUFFIX)).map(_._1) + .foreach { + sparkConfWithCredentialLocations.set(_, "") } sparkConfWithCredentialLocations } @@ -141,9 +150,9 @@ private[spark] class DriverPodKubernetesCredentialsMounterImpl( } private def resolveSecretLocation( - mountedUserSpecified: Option[String], - valueMountedFromSubmitter: Option[String], - mountedCanonicalLocation: String): Option[String] = { + mountedUserSpecified: Option[String], + valueMountedFromSubmitter: Option[String], + mountedCanonicalLocation: String): Option[String] = { mountedUserSpecified.orElse(valueMountedFromSubmitter.map( _ => { mountedCanonicalLocation })) @@ -167,7 +176,7 @@ private[spark] class DriverPodKubernetesCredentialsMounterImpl( } private class OptionSettableSparkConf(sparkConf: SparkConf) { - def setOption[T](configEntry: OptionalConfigEntry[T], option: Option[T]): SparkConf = { + def setOption(configEntry: String, option: Option[String]): SparkConf = { option.map( opt => { sparkConf.set(configEntry, opt) }).getOrElse(sparkConf) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverPodKubernetesCredentialsMounterProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverPodKubernetesCredentialsMounterProvider.scala index 3f0e7d97275a5..913279198146a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverPodKubernetesCredentialsMounterProvider.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverPodKubernetesCredentialsMounterProvider.scala @@ -37,9 +37,13 @@ private[spark] class DriverPodKubernetesCredentialsMounterProviderImpl( new DriverPodKubernetesCredentialsMounterImpl( kubernetesAppId, submitterLocalDriverPodKubernetesCredentials, - sparkConf.get(KUBERNETES_DRIVER_MOUNTED_CLIENT_KEY_FILE), - sparkConf.get(KUBERNETES_DRIVER_MOUNTED_CLIENT_CERT_FILE), - sparkConf.get(KUBERNETES_DRIVER_MOUNTED_OAUTH_TOKEN), - sparkConf.get(KUBERNETES_DRIVER_MOUNTED_CA_CERT_FILE)) + sparkConf.getOption( + s"$APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX"), + sparkConf.getOption( + s"$APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX"), + sparkConf.getOption( + s"$APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX"), + sparkConf.getOption( + s"$APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX")) } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverPodKubernetesCredentialsProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverPodKubernetesCredentialsProvider.scala index 404741520c059..41b0cf8ceaeab 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverPodKubernetesCredentialsProvider.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverPodKubernetesCredentialsProvider.scala @@ -29,25 +29,20 @@ import org.apache.spark.internal.config.OptionalConfigEntry private[spark] class DriverPodKubernetesCredentialsProvider(sparkConf: SparkConf) { def get(): KubernetesCredentials = { - sparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME).foreach { _ => - require(sparkConf.get(KUBERNETES_DRIVER_OAUTH_TOKEN).isEmpty, - "Cannot specify both a service account and a driver pod OAuth token.") - require(sparkConf.get(KUBERNETES_DRIVER_CA_CERT_FILE).isEmpty, - "Cannot specify both a service account and a driver pod CA cert file.") - require(sparkConf.get(KUBERNETES_DRIVER_CLIENT_KEY_FILE).isEmpty, - "Cannot specify both a service account and a driver pod client key file.") - require(sparkConf.get(KUBERNETES_DRIVER_CLIENT_CERT_FILE).isEmpty, - "Cannot specify both a service account and a driver pod client cert file.") - } - val oauthTokenBase64 = sparkConf.get(KUBERNETES_DRIVER_OAUTH_TOKEN).map { token => + val oauthTokenBase64 = sparkConf + .getOption(s"$APISERVER_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX") + .map { token => BaseEncoding.base64().encode(token.getBytes(Charsets.UTF_8)) } - val caCertDataBase64 = safeFileConfToBase64(KUBERNETES_DRIVER_CA_CERT_FILE, - s"Driver CA cert file provided at %s does not exist or is not a file.") - val clientKeyDataBase64 = safeFileConfToBase64(KUBERNETES_DRIVER_CLIENT_KEY_FILE, - s"Driver client key file provided at %s does not exist or is not a file.") - val clientCertDataBase64 = safeFileConfToBase64(KUBERNETES_DRIVER_CLIENT_CERT_FILE, - s"Driver client cert file provided at %s does not exist or is not a file.") + val caCertDataBase64 = safeFileConfToBase64( + s"$APISERVER_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX", + s"Driver CA cert file provided at %s does not exist or is not a file.") + val clientKeyDataBase64 = safeFileConfToBase64( + s"$APISERVER_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX", + s"Driver client key file provided at %s does not exist or is not a file.") + val clientCertDataBase64 = safeFileConfToBase64( + s"$APISERVER_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX", + s"Driver client cert file provided at %s does not exist or is not a file.") KubernetesCredentials( oauthTokenBase64 = oauthTokenBase64, caCertDataBase64 = caCertDataBase64, @@ -56,9 +51,9 @@ private[spark] class DriverPodKubernetesCredentialsProvider(sparkConf: SparkConf } private def safeFileConfToBase64( - conf: OptionalConfigEntry[String], + conf: String, fileNotFoundFormatString: String): Option[String] = { - sparkConf.get(conf) + sparkConf.getOption(conf) .map(new File(_)) .map { file => require(file.isFile, String.format(fileNotFoundFormatString, file.getAbsolutePath)) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/SubmissionKubernetesClientProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/SubmissionKubernetesClientProvider.scala deleted file mode 100644 index 17b61d4a6ace0..0000000000000 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/SubmissionKubernetesClientProvider.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.kubernetes.submit - -import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient} - -import org.apache.spark.SparkConf -import org.apache.spark.deploy.kubernetes.config._ -import org.apache.spark.internal.Logging - -trait SubmissionKubernetesClientProvider { - def get: KubernetesClient -} - -private[spark] class SubmissionKubernetesClientProviderImpl(sparkConf: SparkConf) - extends SubmissionKubernetesClientProvider with Logging { - - private val namespace = sparkConf.get(KUBERNETES_NAMESPACE) - private val master = resolveK8sMaster(sparkConf.get("spark.master")) - - override def get: KubernetesClient = { - var k8ConfBuilder = new ConfigBuilder() - .withApiVersion("v1") - .withMasterUrl(master) - .withNamespace(namespace) - sparkConf.get(KUBERNETES_SUBMIT_CA_CERT_FILE).foreach { - f => k8ConfBuilder = k8ConfBuilder.withCaCertFile(f) - } - sparkConf.get(KUBERNETES_SUBMIT_CLIENT_KEY_FILE).foreach { - f => k8ConfBuilder = k8ConfBuilder.withClientKeyFile(f) - } - sparkConf.get(KUBERNETES_SUBMIT_CLIENT_CERT_FILE).foreach { - f => k8ConfBuilder = k8ConfBuilder.withClientCertFile(f) - } - sparkConf.get(KUBERNETES_SUBMIT_OAUTH_TOKEN).foreach { token => - k8ConfBuilder = k8ConfBuilder.withOauthToken(token) - } - val k8ClientConfig = k8ConfBuilder.build - new DefaultKubernetesClient(k8ClientConfig) - } -} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/SubmittedDependencyUploaderImpl.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/SubmittedDependencyUploaderImpl.scala index 9d0d863d174bc..a891cf3904d2d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/SubmittedDependencyUploaderImpl.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/SubmittedDependencyUploaderImpl.scala @@ -21,12 +21,14 @@ import javax.ws.rs.core.MediaType import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule +import com.google.common.base.Charsets +import com.google.common.io.{BaseEncoding, Files} import okhttp3.RequestBody import retrofit2.Call import org.apache.spark.{SparkException, SSLOptions} -import org.apache.spark.deploy.kubernetes.{CompressionUtils, KubernetesCredentials} -import org.apache.spark.deploy.rest.kubernetes.{ResourceStagingServiceRetrofit, RetrofitClientFactory} +import org.apache.spark.deploy.kubernetes.CompressionUtils +import org.apache.spark.deploy.rest.kubernetes.{ResourceStagingServiceRetrofit, RetrofitClientFactory, StagedResourcesOwner, StagedResourcesOwnerType} import org.apache.spark.util.Utils private[spark] trait SubmittedDependencyUploader { @@ -76,29 +78,23 @@ private[spark] class SubmittedDependencyUploaderImpl( Utils.tryWithResource(new FileOutputStream(filesTgz)) { filesOutputStream => CompressionUtils.writeTarGzipToStream(filesOutputStream, files.map(_.getAbsolutePath)) } - // TODO provide credentials properly when the staging server monitors the Kubernetes API. - val kubernetesCredentialsString = OBJECT_MAPPER.writer() - .writeValueAsString(KubernetesCredentials(None, None, None, None)) - val labelsAsString = OBJECT_MAPPER.writer().writeValueAsString(podLabels) + val stagedResourcesOwner = StagedResourcesOwner( + ownerNamespace = podNamespace, + ownerLabels = podLabels, + ownerType = StagedResourcesOwnerType.Pod) + val stagedResourcesOwnerString = OBJECT_MAPPER.writeValueAsString(stagedResourcesOwner) + val stagedResourcesOwnerBody = RequestBody.create( + okhttp3.MediaType.parse(MediaType.APPLICATION_JSON), stagedResourcesOwnerString) val filesRequestBody = RequestBody.create( - okhttp3.MediaType.parse(MediaType.MULTIPART_FORM_DATA), filesTgz) - - val kubernetesCredentialsBody = RequestBody.create( - okhttp3.MediaType.parse(MediaType.APPLICATION_JSON), kubernetesCredentialsString) - - val namespaceRequestBody = RequestBody.create( - okhttp3.MediaType.parse(MediaType.TEXT_PLAIN), podNamespace) - - val labelsRequestBody = RequestBody.create( - okhttp3.MediaType.parse(MediaType.APPLICATION_JSON), labelsAsString) + okhttp3.MediaType.parse(MediaType.MULTIPART_FORM_DATA), filesTgz) val service = retrofitClientFactory.createRetrofitClient( stagingServerUri, classOf[ResourceStagingServiceRetrofit], stagingServiceSslOptions) val uploadResponse = service.uploadResources( - labelsRequestBody, namespaceRequestBody, filesRequestBody, kubernetesCredentialsBody) + resources = filesRequestBody, resourcesOwner = stagedResourcesOwnerBody) getTypedResponseResult(uploadResponse) } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkDependencyDownloadInitContainer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkDependencyDownloadInitContainer.scala index 9bdc224f10c90..ac19c2463218b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkDependencyDownloadInitContainer.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkDependencyDownloadInitContainer.scala @@ -204,13 +204,6 @@ private class FileFetcherImpl(sparkConf: SparkConf, securityManager: SparkSecuri } } -private case class StagedResources( - resourceSecret: String, - podLabels: Map[String, String], - podNamespace: String, - resourcesFile: File, - kubernetesCredentials: KubernetesCredentials) - object KubernetesSparkDependencyDownloadInitContainer extends Logging { def main(args: Array[String]): Unit = { logInfo("Starting init-container to download Spark application dependencies.") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServer.scala index 34594ba518b62..0b97317eba8b1 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServer.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServer.scala @@ -21,6 +21,7 @@ import java.io.File import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.jaxrs.json.JacksonJaxbJsonProvider import com.fasterxml.jackson.module.scala.DefaultScalaModule +import io.fabric8.kubernetes.client.Config import org.eclipse.jetty.http.HttpVersion import org.eclipse.jetty.server.{HttpConfiguration, HttpConnectionFactory, Server, ServerConnector, SslConnectionFactory} import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder} @@ -30,9 +31,10 @@ import org.glassfish.jersey.server.ResourceConfig import org.glassfish.jersey.servlet.ServletContainer import org.apache.spark.SparkConf +import org.apache.spark.deploy.kubernetes.SparkKubernetesClientFactory import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.internal.Logging -import org.apache.spark.util.Utils +import org.apache.spark.util.{SystemClock, ThreadUtils, Utils} private[spark] class ResourceStagingServer( port: Int, @@ -98,8 +100,33 @@ object ResourceStagingServer { } else { new SparkConf(true) } + val apiServerUri = sparkConf.get(RESOURCE_STAGING_SERVER_API_SERVER_URL) + val initialAccessExpirationMs = sparkConf.get( + RESOURCE_STAGING_SERVER_INITIAL_ACCESS_EXPIRATION_TIMEOUT) val dependenciesRootDir = Utils.createTempDir(namePrefix = "local-application-dependencies") - val serviceInstance = new ResourceStagingServiceImpl(dependenciesRootDir) + val useServiceAccountCredentials = sparkConf.get( + RESOURCE_STAGING_SERVER_USE_SERVICE_ACCOUNT_CREDENTIALS) + // Namespace doesn't matter because we list resources from various namespaces + val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient( + apiServerUri, + None, + APISERVER_AUTH_RESOURCE_STAGING_SERVER_CONF_PREFIX, + sparkConf, + Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)) + .filter( _ => useServiceAccountCredentials), + Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)) + .filter( _ => useServiceAccountCredentials)) + + val stagedResourcesStore = new StagedResourcesStoreImpl(dependenciesRootDir) + val stagedResourcesCleaner = new StagedResourcesCleanerImpl( + stagedResourcesStore, + kubernetesClient, + ThreadUtils.newDaemonSingleThreadScheduledExecutor("resource-expiration"), + new SystemClock(), + initialAccessExpirationMs) + stagedResourcesCleaner.start() + val serviceInstance = new ResourceStagingServiceImpl( + stagedResourcesStore, stagedResourcesCleaner) val sslOptionsProvider = new ResourceStagingServerSslOptionsProviderImpl(sparkConf) val server = new ResourceStagingServer( port = sparkConf.get(RESOURCE_STAGING_SERVER_PORT), diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingService.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingService.scala index 525711e78c01c..b9d283a99ade9 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingService.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingService.scala @@ -52,13 +52,12 @@ private[spark] trait ResourceStagingService { * The tarball should contain the files laid out in a flat hierarchy, without * any directories. We take a stream here to avoid holding these entirely in * memory. - * @param podLabels Labels of pods to monitor. When no more pods are running with the given label, - * after some period of time, these dependencies will be cleared. - * @param podNamespace Namespace of pods to monitor. - * @param kubernetesCredentials These credentials are primarily used to monitor the progress of - * the application. When the application shuts down normally, shuts - * down abnormally and does not restart, or fails to start entirely, - * the data uploaded through this endpoint is cleared. + * @param resourcesOwner A description of the "owner" of a resource. A resource owner is a + * Kubernetes API object in a given namespace, with a specific set of + * labels. When there are no resources of the owner's type in the given + * namespace with the given labels, the resources are cleaned up. The owner + * bundle also includes any Kubernetes credentials that are required for + * resource staging server to watch the object's state over time. * @return A unique token that should be provided when retrieving these dependencies later. */ @POST @@ -66,10 +65,8 @@ private[spark] trait ResourceStagingService { @Produces(Array(MediaType.APPLICATION_JSON)) @Path("/resources") def uploadResources( - @FormDataParam("podLabels") podLabels: Map[String, String], - @FormDataParam("podNamespace") podNamespace: String, - @FormDataParam("resources") resources: InputStream, - @FormDataParam("kubernetesCredentials") kubernetesCredentials: KubernetesCredentials) + @FormDataParam("resources") resources: InputStream, + @FormDataParam("resourcesOwner") resourcesOwner: StagedResourcesOwner) : SubmittedResourceIdAndSecret /** diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServiceImpl.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServiceImpl.scala index abe956da9914d..7bc21c21619e1 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServiceImpl.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServiceImpl.scala @@ -31,58 +31,28 @@ import org.apache.spark.deploy.kubernetes.submit.SubmittedResourceIdAndSecret import org.apache.spark.internal.Logging import org.apache.spark.util.Utils -private[spark] class ResourceStagingServiceImpl(dependenciesRootDir: File) +private[spark] class ResourceStagingServiceImpl( + stagedResourcesStore: StagedResourcesStore, + stagedResourcesCleaner: StagedResourcesCleaner) extends ResourceStagingService with Logging { - private val SECURE_RANDOM = new SecureRandom() - // TODO clean up these resources based on the driver's lifecycle - private val stagedResources = TrieMap.empty[String, StagedResources] - override def uploadResources( - podLabels: Map[String, String], - podNamespace: String, resources: InputStream, - kubernetesCredentials: KubernetesCredentials): SubmittedResourceIdAndSecret = { - val resourceId = UUID.randomUUID().toString - val secretBytes = new Array[Byte](1024) - SECURE_RANDOM.nextBytes(secretBytes) - val resourceSecret = resourceId + "-" + BaseEncoding.base64().encode(secretBytes) - - val namespaceDir = new File(dependenciesRootDir, podNamespace) - val resourcesDir = new File(namespaceDir, resourceId) - try { - if (!resourcesDir.exists()) { - if (!resourcesDir.mkdirs()) { - throw new SparkException("Failed to create dependencies directory for application" + - s" at ${resourcesDir.getAbsolutePath}") - } - } - // TODO encrypt the written data with the secret. - val resourcesTgz = new File(resourcesDir, "resources.data") - Utils.tryWithResource(new FileOutputStream(resourcesTgz)) { ByteStreams.copy(resources, _) } - stagedResources(resourceId) = StagedResources( - resourceSecret, - podLabels, - podNamespace, - resourcesTgz, - kubernetesCredentials) - SubmittedResourceIdAndSecret(resourceId, resourceSecret) - } catch { - case e: Throwable => - if (!resourcesDir.delete()) { - logWarning(s"Failed to delete application directory $resourcesDir.") - } - throw e - } + resourcesOwner: StagedResourcesOwner): SubmittedResourceIdAndSecret = { + val stagedResources = stagedResourcesStore.addResources( + resourcesOwner.ownerNamespace, resources) + stagedResourcesCleaner.registerResourceForCleaning( + stagedResources.resourceId, resourcesOwner) + SubmittedResourceIdAndSecret(stagedResources.resourceId, stagedResources.resourceSecret) } override def downloadResources(resourceId: String, resourceSecret: String): StreamingOutput = { - val resource = stagedResources - .get(resourceId) + val resource = stagedResourcesStore.getResources(resourceId) .getOrElse(throw new NotFoundException(s"No resource bundle found with id $resourceId")) if (!resource.resourceSecret.equals(resourceSecret)) { throw new NotAuthorizedException(s"Unauthorized to download resource with id $resourceId") } + stagedResourcesCleaner.markResourceAsUsed(resourceId) new StreamingOutput { override def write(outputStream: OutputStream) = { Files.copy(resource.resourcesFile, outputStream) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServiceRetrofit.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServiceRetrofit.scala index c0da44838aba3..5fbf0f9c43970 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServiceRetrofit.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServiceRetrofit.scala @@ -31,11 +31,9 @@ private[spark] trait ResourceStagingServiceRetrofit { @Multipart @retrofit2.http.POST("api/v0/resources/") def uploadResources( - @retrofit2.http.Part("podLabels") podLabels: RequestBody, - @retrofit2.http.Part("podNamespace") podNamespace: RequestBody, @retrofit2.http.Part("resources") resources: RequestBody, - @retrofit2.http.Part("kubernetesCredentials") - kubernetesCredentials: RequestBody): Call[SubmittedResourceIdAndSecret] + @retrofit2.http.Part("resourcesOwner") resourcesOwner: RequestBody) + : Call[SubmittedResourceIdAndSecret] @Streaming @retrofit2.http.GET("api/v0/resources/{resourceId}") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/StagedResources.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/StagedResources.scala new file mode 100644 index 0000000000000..81f394800f803 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/StagedResources.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.rest.kubernetes + +import java.io.File + +case class StagedResources( + resourceId: String, + resourceSecret: String, + resourcesFile: File) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/StagedResourcesCleaner.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/StagedResourcesCleaner.scala new file mode 100644 index 0000000000000..5d9db728483fa --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/StagedResourcesCleaner.scala @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.rest.kubernetes + +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} + +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.util.Clock + +private[spark] trait StagedResourcesCleaner { + + def start(): Unit + + def registerResourceForCleaning( + resourceId: String, stagedResourceOwner: StagedResourcesOwner): Unit + + def markResourceAsUsed(resourceId: String): Unit +} + +private class StagedResourcesCleanerImpl( + stagedResourcesStore: StagedResourcesStore, + kubernetesClient: KubernetesClient, + cleanupExecutorService: ScheduledExecutorService, + clock: Clock, + initialAccessExpirationMs: Long) + extends StagedResourcesCleaner { + + private val CLEANUP_INTERVAL_MS = 30000 + private val RESOURCE_LOCK = new Object() + private val activeResources = mutable.Map.empty[String, MonitoredResource] + private val unusedResources = mutable.Map.empty[String, UnusedMonitoredResource] + + override def start(): Unit = { + cleanupExecutorService.scheduleAtFixedRate( + new CleanupRunnable(), + CLEANUP_INTERVAL_MS, + CLEANUP_INTERVAL_MS, + TimeUnit.MILLISECONDS) + } + + override def registerResourceForCleaning( + resourceId: String, stagedResourceOwner: StagedResourcesOwner): Unit = { + RESOURCE_LOCK.synchronized { + unusedResources(resourceId) = UnusedMonitoredResource( + clock.getTimeMillis() + initialAccessExpirationMs, + MonitoredResource(resourceId, stagedResourceOwner)) + + } + } + + override def markResourceAsUsed(resourceId: String): Unit = RESOURCE_LOCK.synchronized { + val resource = unusedResources.remove(resourceId) + resource.foreach { res => + activeResources(resourceId) = res.resource + } + } + + private class CleanupRunnable extends Runnable with Logging { + + override def run(): Unit = { + // Make a copy so we can iterate through this while modifying + val activeResourcesCopy = RESOURCE_LOCK.synchronized { + Map.apply(activeResources.toSeq: _*) + } + for ((resourceId, resource) <- activeResourcesCopy) { + val namespace = kubernetesClient.namespaces() + .withName(resource.stagedResourceOwner.ownerNamespace) + .get() + if (namespace == null) { + logInfo(s"Resource files with id $resourceId is being removed. The owner's namespace" + + s" ${resource.stagedResourceOwner.ownerNamespace} was not found.") + stagedResourcesStore.removeResources(resourceId) + RESOURCE_LOCK.synchronized { + activeResources.remove(resourceId) + } + } else { + val metadataOperation = resource.stagedResourceOwner.ownerType match { + case StagedResourcesOwnerType.Pod => + kubernetesClient.pods().inNamespace(resource.stagedResourceOwner.ownerNamespace) + case _ => + throw new SparkException(s"Unsupported resource owner type for cleanup:" + + s" ${resource.stagedResourceOwner.ownerType}") + } + if (metadataOperation + .withLabels(resource.stagedResourceOwner.ownerLabels.asJava) + .list() + .getItems + .isEmpty) { + logInfo(s"Resource files with id $resourceId is being removed. Owners of the" + + s" resource with namespace: ${resource.stagedResourceOwner.ownerNamespace}," + + s" type: ${resource.stagedResourceOwner.ownerType}, and labels:" + + s" ${resource.stagedResourceOwner.ownerLabels} was not found on the API server.") + stagedResourcesStore.removeResources(resourceId) + RESOURCE_LOCK.synchronized { + activeResources.remove(resourceId) + } + } + } + } + + // Make a copy so we can iterate through this while modifying + val unusedResourcesCopy = RESOURCE_LOCK.synchronized { + Map.apply(unusedResources.toSeq: _*) + } + + for ((resourceId, resource) <- unusedResourcesCopy) { + if (resource.expiresAt < clock.getTimeMillis()) { + RESOURCE_LOCK.synchronized { + // Check for existence again here (via foreach) because in between the time we starting + // iterating over the unused resources copy, we might have already marked the resource + // as active in-between, and likely shouldn't remove the resources in such a case. + unusedResources.remove(resourceId).foreach { _ => + logInfo(s"Resources with id $resourceId was not accessed after being added to" + + s" the staging server at least $initialAccessExpirationMs ms ago. The resource" + + s" will be deleted.") + stagedResourcesStore.removeResources(resourceId) + } + } + } + } + } + } + + private case class MonitoredResource( + resourceId: String, + stagedResourceOwner: StagedResourcesOwner) + + private case class UnusedMonitoredResource(expiresAt: Long, resource: MonitoredResource) +} + + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/StagedResourcesOwner.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/StagedResourcesOwner.scala new file mode 100644 index 0000000000000..4061bc36764d7 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/StagedResourcesOwner.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.rest.kubernetes + +import com.fasterxml.jackson.core.`type`.TypeReference +import com.fasterxml.jackson.module.scala.JsonScalaEnumeration + +object StagedResourcesOwnerType extends Enumeration { + type OwnerType = Value + // In more generic scenarios, we might want to be watching Deployments, etc. + val Pod = Value +} + +class StagedResourcesOwnerTypeReference extends TypeReference[StagedResourcesOwnerType.type] + +case class StagedResourcesOwner( + ownerNamespace: String, + ownerLabels: Map[String, String], + @JsonScalaEnumeration(classOf[StagedResourcesOwnerTypeReference]) + ownerType: StagedResourcesOwnerType.OwnerType) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/StagedResourcesStore.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/StagedResourcesStore.scala new file mode 100644 index 0000000000000..0c0d428e035dc --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/StagedResourcesStore.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.rest.kubernetes + +import java.io.{File, FileOutputStream, InputStream, IOException} +import java.security.SecureRandom +import java.util.UUID + +import com.google.common.io.{BaseEncoding, ByteStreams} +import org.apache.commons.io.FileUtils +import scala.collection.concurrent.TrieMap + +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + + +private[spark] trait StagedResourcesStore { + + /** + * Store the given stream on disk and return its resource ID and secret. + */ + def addResources( + podNamespace: String, + resources: InputStream): StagedResources + + /** + * Retrieve a resource bundle with the given id. Returns empty if no resources match this id. + */ + def getResources(resourceId: String): Option[StagedResources] + + def removeResources(resourceId: String): Unit +} + +private[spark] class StagedResourcesStoreImpl(dependenciesRootDir: File) + extends StagedResourcesStore with Logging { + + private val SECURE_RANDOM = new SecureRandom() + private val stagedResources = TrieMap.empty[String, StagedResources] + + override def addResources( + podNamespace: String, + resources: InputStream): StagedResources = { + val resourceId = UUID.randomUUID().toString + val secretBytes = new Array[Byte](1024) + SECURE_RANDOM.nextBytes(secretBytes) + val resourceSecret = resourceId + "-" + BaseEncoding.base64().encode(secretBytes) + + val namespaceDir = new File(dependenciesRootDir, podNamespace) + val resourcesDir = new File(namespaceDir, resourceId) + try { + if (!resourcesDir.exists()) { + if (!resourcesDir.mkdirs()) { + throw new SparkException("Failed to create dependencies directory for application" + + s" at ${resourcesDir.getAbsolutePath}") + } + } + // TODO encrypt the written data with the secret. + val resourcesFile = new File(resourcesDir, "resources.data") + Utils.tryWithResource(new FileOutputStream(resourcesFile)) { + ByteStreams.copy(resources, _) + } + val resourceBundle = StagedResources(resourceId, resourceSecret, resourcesFile) + stagedResources(resourceId) = resourceBundle + resourceBundle + } catch { + case e: Throwable => + if (!resourcesDir.delete()) { + logWarning(s"Failed to delete application directory $resourcesDir.") + } + stagedResources.remove(resourceId) + throw e + } + } + + override def getResources(resourceId: String): Option[StagedResources] = { + stagedResources.get(resourceId) + } + + override def removeResources(resourceId: String): Unit = { + stagedResources.remove(resourceId) + .map(_.resourcesFile.getParentFile) + .foreach { resourcesDirectory => + try { + FileUtils.deleteDirectory(resourcesDirectory) + } catch { + case e: IOException => + logWarning(s"Failed to delete resources directory" + + s" at ${resourcesDirectory.getAbsolutePath}", e) + } + } + } +} + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/DriverPodKubernetesClientProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/DriverPodKubernetesClientProvider.scala deleted file mode 100644 index cc2032219f885..0000000000000 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/DriverPodKubernetesClientProvider.scala +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.scheduler.cluster.kubernetes - -import java.io.File - -import com.google.common.base.Charsets -import com.google.common.io.Files -import io.fabric8.kubernetes.client.{Config, ConfigBuilder, DefaultKubernetesClient} -import io.fabric8.kubernetes.client.utils.HttpClientUtils -import okhttp3.Dispatcher - -import org.apache.spark.SparkConf -import org.apache.spark.deploy.kubernetes.config._ -import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.util.ThreadUtils - -private[spark] class DriverPodKubernetesClientProvider( - sparkConf: SparkConf, - namespace: Option[String] = None) { - - private val SERVICE_ACCOUNT_TOKEN = new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH) - private val SERVICE_ACCOUNT_CA_CERT = new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH) - private val oauthTokenFile = sparkConf.get(KUBERNETES_DRIVER_MOUNTED_OAUTH_TOKEN) - private val caCertFile = sparkConf.get(KUBERNETES_DRIVER_MOUNTED_CA_CERT_FILE) - private val clientKeyFile = sparkConf.get(KUBERNETES_DRIVER_MOUNTED_CLIENT_KEY_FILE) - private val clientCertFile = sparkConf.get(KUBERNETES_DRIVER_MOUNTED_CLIENT_CERT_FILE) - - /** - * Creates a {@link KubernetesClient}, expecting to be from within the context of a pod. When - * doing so, service account token files can be picked up from canonical locations. - */ - def get: DefaultKubernetesClient = { - val baseClientConfigBuilder = new ConfigBuilder() - .withApiVersion("v1") - .withMasterUrl(KUBERNETES_MASTER_INTERNAL_URL) - - // Build a namespaced client if specified. - val namespacedClientConfigBuilder = namespace - .map(baseClientConfigBuilder.withNamespace(_)).getOrElse(baseClientConfigBuilder) - - val configBuilder = oauthTokenFile - .orElse(caCertFile) - .orElse(clientKeyFile) - .orElse(clientCertFile) - .map { _ => - var mountedAuthConfigBuilder = baseClientConfigBuilder - oauthTokenFile.foreach { tokenFilePath => - val tokenFile = new File(tokenFilePath) - mountedAuthConfigBuilder = mountedAuthConfigBuilder - .withOauthToken(Files.toString(tokenFile, Charsets.UTF_8)) - } - caCertFile.foreach { caFile => - mountedAuthConfigBuilder = mountedAuthConfigBuilder.withCaCertFile(caFile) - } - clientKeyFile.foreach { keyFile => - mountedAuthConfigBuilder = mountedAuthConfigBuilder.withClientKeyFile(keyFile) - } - clientCertFile.foreach { certFile => - mountedAuthConfigBuilder = mountedAuthConfigBuilder.withClientCertFile(certFile) - } - mountedAuthConfigBuilder - }.getOrElse { - var serviceAccountConfigBuilder = baseClientConfigBuilder - if (SERVICE_ACCOUNT_CA_CERT.isFile) { - serviceAccountConfigBuilder = serviceAccountConfigBuilder.withCaCertFile( - SERVICE_ACCOUNT_CA_CERT.getAbsolutePath) - } - - if (SERVICE_ACCOUNT_TOKEN.isFile) { - serviceAccountConfigBuilder = serviceAccountConfigBuilder.withOauthToken( - Files.toString(SERVICE_ACCOUNT_TOKEN, Charsets.UTF_8)) - } - serviceAccountConfigBuilder - } - // Disable the ping thread that is not daemon, in order to allow - // the driver main thread to shut down upon errors. Otherwise, the driver - // will hang indefinitely. - val config = configBuilder - .withWebsocketPingInterval(0) - .build() - val httpClient = HttpClientUtils.createHttpClient(config).newBuilder() - // Use a Dispatcher with a custom executor service that creates daemon threads. The default - // executor service used by Dispatcher creates non-daemon threads. - .dispatcher(new Dispatcher(ThreadUtils.newDaemonCachedThreadPool("spark-on-k8s"))) - .build() - new DefaultKubernetesClient(httpClient, config) - } -} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala index e2630b9918b61..6abce55cff209 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala @@ -16,9 +16,14 @@ */ package org.apache.spark.scheduler.cluster.kubernetes +import java.io.File + +import io.fabric8.kubernetes.client.Config + import org.apache.spark.SparkContext -import org.apache.spark.deploy.kubernetes.{InitContainerResourceStagingServerSecretPluginImpl, SparkPodInitContainerBootstrapImpl} +import org.apache.spark.deploy.kubernetes.{InitContainerResourceStagingServerSecretPluginImpl, SparkKubernetesClientFactory, SparkPodInitContainerBootstrapImpl} import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.deploy.kubernetes.constants._ import org.apache.spark.internal.Logging import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl} @@ -75,8 +80,15 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit logWarning("The executor's init-container config map key was not specified. Executors will" + " therefore not attempt to fetch remote or submitted dependencies.") } + val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient( + KUBERNETES_MASTER_INTERNAL_URL, + Some(sparkConf.get(KUBERNETES_NAMESPACE)), + APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX, + sparkConf, + Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)), + Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH))) new KubernetesClusterSchedulerBackend( - sc.taskScheduler.asInstanceOf[TaskSchedulerImpl], sc, bootStrap) + sc.taskScheduler.asInstanceOf[TaskSchedulerImpl], sc, bootStrap, kubernetesClient) } override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 257cee80fdea9..1852ed021d91a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} import io.fabric8.kubernetes.api.model.{ContainerPortBuilder, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder} -import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action import org.apache.commons.io.FilenameUtils import scala.collection.JavaConverters._ @@ -43,7 +43,8 @@ import org.apache.spark.util.{ThreadUtils, Utils} private[spark] class KubernetesClusterSchedulerBackend( scheduler: TaskSchedulerImpl, val sc: SparkContext, - executorInitContainerBootstrap: Option[SparkPodInitContainerBootstrap]) + executorInitContainerBootstrap: Option[SparkPodInitContainerBootstrap], + kubernetesClient: KubernetesClient) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) { import KubernetesClusterSchedulerBackend._ @@ -102,9 +103,6 @@ private[spark] class KubernetesClusterSchedulerBackend( private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("kubernetes-executor-requests")) - private val kubernetesClient = new DriverPodKubernetesClientProvider(conf, - Some(kubernetesNamespace)).get - private val driverPod = try { kubernetesClient.pods().inNamespace(kubernetesNamespace). withName(kubernetesDriverPodName).get() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala index ff6c710117318..00f09c64b53b7 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala @@ -37,7 +37,6 @@ import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClient -import org.apache.spark.scheduler.cluster.kubernetes.DriverPodKubernetesClientProvider class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { private val JARS_RESOURCE = SubmittedResourceIdAndSecret("jarsId", "jarsSecret") @@ -131,8 +130,6 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { @Mock private var initContainerComponentsProvider: DriverInitContainerComponentsProvider = _ @Mock - private var kubernetesClientProvider: SubmissionKubernetesClientProvider = _ - @Mock private var kubernetesClient: KubernetesClient = _ @Mock private var podOps: MixedOperation[Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]] = _ @@ -174,7 +171,6 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { .thenReturn(INIT_CONTAINER_SECRET) when(initContainerConfigMapBuilder.build()) .thenReturn(INIT_CONTAINER_CONFIG_MAP) - when(kubernetesClientProvider.get).thenReturn(kubernetesClient) when(kubernetesClient.pods()).thenReturn(podOps) when(podOps.create(any())).thenAnswer(new Answer[Pod] { override def answer(invocation: InvocationOnMock): Pod = { @@ -302,37 +298,13 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { SPARK_JARS, SPARK_FILES, true, - kubernetesClientProvider, + kubernetesClient, initContainerComponentsProvider, credentialsMounterProvider, loggingPodStatusWatcher).run() verify(loggingPodStatusWatcher).awaitCompletion() } - test("Run kubernetes shuffle service.") { - expectationsForNoMountedCredentials() - expectationsForNoDependencyUploader() - - val shuffleService = new KubernetesExternalShuffleService( - SPARK_CONF, - new SecurityManager(SPARK_CONF), - new DriverPodKubernetesClientProvider(SPARK_CONF)) - - val shuffleClient = new KubernetesExternalShuffleClient( - SparkTransportConf.fromSparkConf(SPARK_CONF, "shuffle"), - new SecurityManager(SPARK_CONF), - false, - false) - - shuffleService.start() - shuffleClient.init("newapp") - - // verifies that we can connect to the shuffle service and send - // it a message. - shuffleClient.registerDriverWithShuffleService("localhost", 7337) - shuffleService.stop() - } - private def expectationsForNoDependencyUploader(): Unit = { when(initContainerComponentsProvider .provideInitContainerSubmittedDependencyUploader(ALL_EXPECTED_LABELS)) @@ -409,7 +381,7 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { SPARK_JARS, SPARK_FILES, false, - kubernetesClientProvider, + kubernetesClient, initContainerComponentsProvider, credentialsMounterProvider, loggingPodStatusWatcher).run() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/DriverPodKubernetesCredentialsMounterSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/DriverPodKubernetesCredentialsMounterSuite.scala index c1005a176408c..2e0a7ba5098b2 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/DriverPodKubernetesCredentialsMounterSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/DriverPodKubernetesCredentialsMounterSuite.scala @@ -111,13 +111,17 @@ class DriverPodKubernetesCredentialsMounterSuite val baseSparkConf = new SparkConf() val resolvedSparkConf = credentialsMounter.setDriverPodKubernetesCredentialLocations(baseSparkConf) - assert(resolvedSparkConf.get(KUBERNETES_DRIVER_MOUNTED_CLIENT_KEY_FILE) === + assert(resolvedSparkConf.getOption( + s"$APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX") === expectedClientKeyFile) - assert(resolvedSparkConf.get(KUBERNETES_DRIVER_MOUNTED_CLIENT_CERT_FILE) === + assert(resolvedSparkConf.getOption( + s"$APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX") === expectedClientCertFile) - assert(resolvedSparkConf.get(KUBERNETES_DRIVER_MOUNTED_CA_CERT_FILE) === + assert(resolvedSparkConf.getOption( + s"$APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX") === expectedCaCertFile) - assert(resolvedSparkConf.get(KUBERNETES_DRIVER_MOUNTED_OAUTH_TOKEN) === + assert(resolvedSparkConf.getOption( + s"$APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX") === expectedOAuthTokenFile) } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/KubernetesExternalShuffleServiceSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/KubernetesExternalShuffleServiceSuite.scala new file mode 100644 index 0000000000000..0de1955884c8e --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/KubernetesExternalShuffleServiceSuite.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.submit + +import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} +import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.deploy.kubernetes.KubernetesExternalShuffleService +import org.apache.spark.network.netty.SparkTransportConf +import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClient + +private[spark] class KubernetesExternalShuffleServiceSuite extends SparkFunSuite { + + private val SPARK_CONF = new SparkConf() + .set(KUBERNETES_SHUFFLE_USE_SERVICE_ACCOUNT_CREDENTIALS, false) + + test("Run kubernetes shuffle service.") { + val shuffleService = new KubernetesExternalShuffleService( + SPARK_CONF, + new SecurityManager(SPARK_CONF)) + + val shuffleClient = new KubernetesExternalShuffleClient( + SparkTransportConf.fromSparkConf(SPARK_CONF, "shuffle"), + new SecurityManager(SPARK_CONF), + false, + false) + + shuffleService.start() + shuffleClient.init("newapp") + + // verifies that we can connect to the shuffle service and send + // it a message. + shuffleClient.registerDriverWithShuffleService("localhost", 7337) + shuffleService.stop() + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/SubmittedDependencyUploaderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/SubmittedDependencyUploaderSuite.scala index 8693ff4e15372..c207e3c69cd3c 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/SubmittedDependencyUploaderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/SubmittedDependencyUploaderSuite.scala @@ -22,26 +22,24 @@ import java.util.UUID import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule import com.google.common.base.Charsets -import com.google.common.io.Files +import com.google.common.io.{BaseEncoding, Files} import okhttp3.RequestBody import okio.Okio -import org.mockito.Matchers.any -import org.mockito.Mockito -import org.mockito.invocation.InvocationOnMock -import org.mockito.stubbing.Answer +import org.mockito.{ArgumentCaptor, Mockito} import org.scalatest.BeforeAndAfter import org.scalatest.mock.MockitoSugar._ import retrofit2.{Call, Response} import org.apache.spark.{SparkFunSuite, SSLOptions} import org.apache.spark.deploy.kubernetes.CompressionUtils -import org.apache.spark.deploy.rest.kubernetes.{ResourceStagingServiceRetrofit, RetrofitClientFactory} +import org.apache.spark.deploy.rest.kubernetes.{ResourceStagingServiceRetrofit, RetrofitClientFactory, StagedResourcesOwner} import org.apache.spark.util.Utils private[spark] class SubmittedDependencyUploaderSuite extends SparkFunSuite with BeforeAndAfter { import SubmittedDependencyUploaderSuite.createTempFile private val OBJECT_MAPPER = new ObjectMapper().registerModule(new DefaultScalaModule) + private val BASE_64 = BaseEncoding.base64() private val APP_ID = "app-id" private val LABELS = Map("label1" -> "label1value", "label2" -> "label2value") private val NAMESPACE = "namespace" @@ -61,18 +59,31 @@ private[spark] class SubmittedDependencyUploaderSuite extends SparkFunSuite with trustStore = Some(TRUSTSTORE_FILE), trustStorePassword = Some(TRUSTSTORE_PASSWORD), trustStoreType = Some(TRUSTSTORE_TYPE)) + private val CLIENT_KEY_FILE = createTempFile("pem") + private val CLIENT_CERT_FILE = createTempFile("pem") + private val OAUTH_TOKEN = "token" private var retrofitClientFactory: RetrofitClientFactory = _ private var retrofitClient: ResourceStagingServiceRetrofit = _ + private var resourcesOwnerCaptor: ArgumentCaptor[RequestBody] = _ + private var resourcesDataCaptor: ArgumentCaptor[RequestBody] = _ private var dependencyUploaderUnderTest: SubmittedDependencyUploader = _ before { + resourcesOwnerCaptor = ArgumentCaptor.forClass(classOf[RequestBody]) + resourcesDataCaptor = ArgumentCaptor.forClass(classOf[RequestBody]) retrofitClientFactory = mock[RetrofitClientFactory] retrofitClient = mock[ResourceStagingServiceRetrofit] Mockito.when( retrofitClientFactory.createRetrofitClient( STAGING_SERVER_URI, classOf[ResourceStagingServiceRetrofit], STAGING_SERVER_SSL_OPTIONS)) .thenReturn(retrofitClient) + val responseCall = mock[Call[SubmittedResourceIdAndSecret]] + Mockito.when(responseCall.execute()).thenReturn( + Response.success(SubmittedResourceIdAndSecret("resourceId", "resourceSecret"))) + Mockito.when(retrofitClient.uploadResources( + resourcesDataCaptor.capture(), resourcesOwnerCaptor.capture())) + .thenReturn(responseCall) dependencyUploaderUnderTest = new SubmittedDependencyUploaderImpl( APP_ID, LABELS, @@ -85,38 +96,24 @@ private[spark] class SubmittedDependencyUploaderSuite extends SparkFunSuite with } test("Uploading jars should contact the staging server with the appropriate parameters") { - val capturingArgumentsAnswer = new UploadDependenciesArgumentsCapturingAnswer( - SubmittedResourceIdAndSecret("resourceId", "resourceSecret")) - Mockito.when(retrofitClient.uploadResources(any(), any(), any(), any())) - .thenAnswer(capturingArgumentsAnswer) dependencyUploaderUnderTest.uploadJars() - testUploadSendsCorrectFiles(LOCAL_JARS, capturingArgumentsAnswer) + testUploadSendsCorrectFiles(LOCAL_JARS) } test("Uploading files should contact the staging server with the appropriate parameters") { - val capturingArgumentsAnswer = new UploadDependenciesArgumentsCapturingAnswer( - SubmittedResourceIdAndSecret("resourceId", "resourceSecret")) - Mockito.when(retrofitClient.uploadResources(any(), any(), any(), any())) - .thenAnswer(capturingArgumentsAnswer) dependencyUploaderUnderTest.uploadFiles() - testUploadSendsCorrectFiles(LOCAL_FILES, capturingArgumentsAnswer) + testUploadSendsCorrectFiles(LOCAL_FILES) } - private def testUploadSendsCorrectFiles( - expectedFiles: Seq[String], - capturingArgumentsAnswer: UploadDependenciesArgumentsCapturingAnswer) = { - val requestLabelsBytes = requestBodyBytes(capturingArgumentsAnswer.podLabelsArg) - val requestLabelsString = new String(requestLabelsBytes, Charsets.UTF_8) - val requestLabelsMap = OBJECT_MAPPER.readValue( - requestLabelsString, classOf[Map[String, String]]) - assert(requestLabelsMap === LABELS) - val requestNamespaceBytes = requestBodyBytes(capturingArgumentsAnswer.podNamespaceArg) - val requestNamespaceString = new String(requestNamespaceBytes, Charsets.UTF_8) - assert(requestNamespaceString === NAMESPACE) - + private def testUploadSendsCorrectFiles(expectedFiles: Seq[String]) = { + val resourceOwnerString = new String( + requestBodyBytes(resourcesOwnerCaptor.getValue), Charsets.UTF_8) + val resourceOwner = OBJECT_MAPPER.readValue(resourceOwnerString, classOf[StagedResourcesOwner]) + assert(resourceOwner.ownerLabels === LABELS) + assert(resourceOwner.ownerNamespace === NAMESPACE) val unpackedFilesDir = Utils.createTempDir(namePrefix = "test-unpacked-files") val compressedBytesInput = new ByteArrayInputStream( - requestBodyBytes(capturingArgumentsAnswer.podResourcesArg)) + requestBodyBytes(resourcesDataCaptor.getValue())) CompressionUtils.unpackTarStreamToDirectory(compressedBytesInput, unpackedFilesDir) val writtenFiles = unpackedFilesDir.listFiles assert(writtenFiles.size === expectedFiles.size) @@ -148,25 +145,6 @@ private[spark] class SubmittedDependencyUploaderSuite extends SparkFunSuite with } } -private class UploadDependenciesArgumentsCapturingAnswer(returnValue: SubmittedResourceIdAndSecret) - extends Answer[Call[SubmittedResourceIdAndSecret]] { - - var podLabelsArg: RequestBody = _ - var podNamespaceArg: RequestBody = _ - var podResourcesArg: RequestBody = _ - var kubernetesCredentialsArg: RequestBody = _ - - override def answer(invocationOnMock: InvocationOnMock): Call[SubmittedResourceIdAndSecret] = { - podLabelsArg = invocationOnMock.getArgumentAt(0, classOf[RequestBody]) - podNamespaceArg = invocationOnMock.getArgumentAt(1, classOf[RequestBody]) - podResourcesArg = invocationOnMock.getArgumentAt(2, classOf[RequestBody]) - kubernetesCredentialsArg = invocationOnMock.getArgumentAt(3, classOf[RequestBody]) - val responseCall = mock[Call[SubmittedResourceIdAndSecret]] - Mockito.when(responseCall.execute()).thenReturn(Response.success(returnValue)) - responseCall - } -} - private object SubmittedDependencyUploaderSuite { def createTempFile(extension: String): String = { val dir = Utils.createTempDir() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala index 0604e0d6494ae..0c0908da20d89 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServerSuite.scala @@ -24,10 +24,11 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule import com.google.common.io.ByteStreams import okhttp3.{RequestBody, ResponseBody} import org.scalatest.BeforeAndAfter +import org.scalatest.mock.MockitoSugar.mock import retrofit2.Call import org.apache.spark.{SparkFunSuite, SSLOptions} -import org.apache.spark.deploy.kubernetes.{KubernetesCredentials, SSLUtils} +import org.apache.spark.deploy.kubernetes.SSLUtils import org.apache.spark.util.Utils /** @@ -40,12 +41,21 @@ import org.apache.spark.util.Utils * receive streamed uploads and can stream downloads. */ class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfter { + private var serviceImpl: ResourceStagingService = _ + private var stagedResourcesCleaner: StagedResourcesCleaner = _ + private var server: ResourceStagingServer = _ private val OBJECT_MAPPER = new ObjectMapper().registerModule(new DefaultScalaModule) private val serverPort = new ServerSocket(0).getLocalPort - private val serviceImpl = new ResourceStagingServiceImpl(Utils.createTempDir()) + private val sslOptionsProvider = new SettableReferenceSslOptionsProvider() - private val server = new ResourceStagingServer(serverPort, serviceImpl, sslOptionsProvider) + + before { + stagedResourcesCleaner = mock[StagedResourcesCleaner] + serviceImpl = new ResourceStagingServiceImpl( + new StagedResourcesStoreImpl(Utils.createTempDir()), stagedResourcesCleaner) + server = new ResourceStagingServer(serverPort, serviceImpl, sslOptionsProvider) + } after { server.stop() @@ -83,20 +93,17 @@ class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfter { val resourcesBytes = Array[Byte](1, 2, 3, 4) val labels = Map("label1" -> "label1Value", "label2" -> "label2value") val namespace = "namespace" - val labelsJson = OBJECT_MAPPER.writer().writeValueAsString(labels) + val resourcesOwner = StagedResourcesOwner( + ownerLabels = labels, + ownerNamespace = namespace, + ownerType = StagedResourcesOwnerType.Pod) + val resourcesOwnerJson = OBJECT_MAPPER.writeValueAsString(resourcesOwner) + val resourcesOwnerRequestBody = RequestBody.create( + okhttp3.MediaType.parse(MediaType.APPLICATION_JSON), resourcesOwnerJson) val resourcesRequestBody = RequestBody.create( - okhttp3.MediaType.parse(MediaType.MULTIPART_FORM_DATA), resourcesBytes) - val labelsRequestBody = RequestBody.create( - okhttp3.MediaType.parse(MediaType.APPLICATION_JSON), labelsJson) - val namespaceRequestBody = RequestBody.create( - okhttp3.MediaType.parse(MediaType.TEXT_PLAIN), namespace) - val kubernetesCredentials = KubernetesCredentials(Some("token"), Some("ca-cert"), None, None) - val kubernetesCredentialsString = OBJECT_MAPPER.writer() - .writeValueAsString(kubernetesCredentials) - val kubernetesCredentialsBody = RequestBody.create( - okhttp3.MediaType.parse(MediaType.APPLICATION_JSON), kubernetesCredentialsString) + okhttp3.MediaType.parse(MediaType.MULTIPART_FORM_DATA), resourcesBytes) val uploadResponse = retrofitService.uploadResources( - labelsRequestBody, namespaceRequestBody, resourcesRequestBody, kubernetesCredentialsBody) + resourcesRequestBody, resourcesOwnerRequestBody) val resourceIdentifier = getTypedResponseResult(uploadResponse) checkResponseBodyBytesMatches( retrofitService.downloadResources( diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServiceImplSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServiceImplSuite.scala deleted file mode 100644 index 53396a3f27a1a..0000000000000 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/ResourceStagingServiceImplSuite.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.rest.kubernetes - -import java.io.{ByteArrayInputStream, File} -import java.nio.file.Paths - -import com.google.common.io.Files - -import org.apache.spark.SparkFunSuite -import org.apache.spark.deploy.kubernetes.KubernetesCredentials -import org.apache.spark.util.Utils - -/** - * Unit, scala-level tests for KubernetesSparkDependencyServiceImpl. The coverage here - * differs from that of KubernetesSparkDependencyServerSuite as here we invoke the - * implementation methods directly as opposed to over HTTP, as well as check the - * data written to the underlying disk. - */ -class ResourceStagingServiceImplSuite extends SparkFunSuite { - - private val dependencyRootDir = Utils.createTempDir() - private val serviceImpl = new ResourceStagingServiceImpl(dependencyRootDir) - private val resourceBytes = Array[Byte](1, 2, 3, 4) - private val kubernetesCredentials = KubernetesCredentials( - Some("token"), Some("caCert"), Some("key"), Some("cert")) - private val namespace = "namespace" - private val labels = Map("label1" -> "label1value", "label2" -> "label2value") - - test("Uploads should write data to the underlying disk") { - Utils.tryWithResource(new ByteArrayInputStream(resourceBytes)) { resourceStream => - serviceImpl.uploadResources(labels, namespace, resourceStream, kubernetesCredentials) - } - val resourceNamespaceDir = Paths.get(dependencyRootDir.getAbsolutePath, "namespace").toFile - assert(resourceNamespaceDir.isDirectory, s"Resource namespace dir was not created at" + - s" ${resourceNamespaceDir.getAbsolutePath} or is not a directory.") - val resourceDirs = resourceNamespaceDir.listFiles() - assert(resourceDirs.length === 1, s"Resource root directory did not have exactly one" + - s" subdirectory. Got: ${resourceDirs.map(_.getAbsolutePath).mkString(",")}") - val resourceTgz = new File(resourceDirs(0), "resources.data") - assert(resourceTgz.isFile, - s"Resources written to ${resourceTgz.getAbsolutePath} does not exist or is not a file.") - val resourceTgzBytes = Files.toByteArray(resourceTgz) - assert(resourceTgzBytes.toSeq === resourceBytes.toSeq, "Incorrect resource bytes were written.") - } -} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/StagedResourcesCleanerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/StagedResourcesCleanerSuite.scala new file mode 100644 index 0000000000000..8b398a9891f34 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/StagedResourcesCleanerSuite.scala @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.rest.kubernetes + +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} + +import io.fabric8.kubernetes.api.model.{DoneableNamespace, DoneablePod, Namespace, NamespaceList, Pod, PodList, PodListBuilder} +import io.fabric8.kubernetes.client.{KubernetesClient, Watch, Watcher} +import io.fabric8.kubernetes.client.dsl.{FilterWatchListDeletable, MixedOperation, NonNamespaceOperation, PodResource, Resource} +import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations} +import org.mockito.Matchers.{eq => mockitoEq} +import org.mockito.Mockito.{never, verify, when} +import org.scalatest.BeforeAndAfter +import scala.collection.JavaConverters._ + +import org.apache.spark.SparkFunSuite +import org.apache.spark.util.Clock + +private[spark] class StagedResourcesCleanerSuite extends SparkFunSuite with BeforeAndAfter { + + private type PODS = MixedOperation[Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]] + private type PODSWITHLABELS = FilterWatchListDeletable[ + Pod, PodList, java.lang.Boolean, Watch, Watcher[Pod]] + private type PODSINNAMESPACE = NonNamespaceOperation[ + Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]] + private type NAMESPACES = NonNamespaceOperation[ + Namespace, NamespaceList, DoneableNamespace, Resource[Namespace, DoneableNamespace]] + private type NAMESPACEWITHNAME = Resource[Namespace, DoneableNamespace] + + private val INITIAL_ACCESS_EXPIRATION_MS = 5000L + private val CURRENT_TIME = 10000L + private val RESOURCE_ID = "resource-id" + private val POD_NAMESPACE = "namespace" + private val POD_LABELS = Map("label1" -> "label1value", "label2" -> "label2value") + private val RESOURCES_OWNER = StagedResourcesOwner( + ownerNamespace = POD_NAMESPACE, + ownerLabels = POD_LABELS, + ownerType = StagedResourcesOwnerType.Pod) + + @Mock + private var stagedResourcesStore: StagedResourcesStore = _ + @Mock + private var kubernetesClient: KubernetesClient = _ + @Mock + private var clock: Clock = _ + @Mock + private var cleanerExecutorService: ScheduledExecutorService = _ + @Mock + private var podOperations: PODS = _ + @Mock + private var podsInNamespaceOperations: PODSINNAMESPACE = _ + @Mock + private var podsWithLabelsOperations: PODSWITHLABELS = _ + @Mock + private var namespaceOperations: NAMESPACES = _ + @Mock + private var namedNamespaceOperations: NAMESPACEWITHNAME = _ + private var cleanerUnderTest: StagedResourcesCleaner = _ + + before { + MockitoAnnotations.initMocks(this) + cleanerUnderTest = new StagedResourcesCleanerImpl( + stagedResourcesStore, + kubernetesClient, + cleanerExecutorService, + clock, + INITIAL_ACCESS_EXPIRATION_MS) + when(kubernetesClient.pods()).thenReturn(podOperations) + when(podOperations.withLabels(POD_LABELS.asJava)).thenReturn(podsWithLabelsOperations) + when(kubernetesClient.namespaces()).thenReturn(namespaceOperations) + } + + test("Clean the resource if it is never accessed for the expiration interval.") { + val cleanupRunnable = startCleanupAndGetCleanupRunnable() + cleanerUnderTest.registerResourceForCleaning(RESOURCE_ID, RESOURCES_OWNER) + when(clock.getTimeMillis()).thenReturn(CURRENT_TIME + INITIAL_ACCESS_EXPIRATION_MS) + cleanupRunnable.run() + verify(stagedResourcesStore).removeResources(RESOURCE_ID) + verify(kubernetesClient, never()).pods() + } + + test("Don't clean the resource if it is accessed in the expiration interval" + + " and there are owners available.") { + val cleanupRunnable = startCleanupAndGetCleanupRunnable() + cleanerUnderTest.registerResourceForCleaning(RESOURCE_ID, RESOURCES_OWNER) + cleanerUnderTest.markResourceAsUsed(RESOURCE_ID) + when(clock.getTimeMillis()).thenReturn(CURRENT_TIME + INITIAL_ACCESS_EXPIRATION_MS) + when(namespaceOperations.withName(POD_NAMESPACE)).thenReturn(namedNamespaceOperations) + when(namedNamespaceOperations.get()).thenReturn(new Namespace()) + when(podOperations.inNamespace(POD_NAMESPACE)).thenReturn(podsInNamespaceOperations) + when(podsInNamespaceOperations.withLabels(POD_LABELS.asJava)) + .thenReturn(podsWithLabelsOperations) + when(podsWithLabelsOperations.list()).thenReturn( + new PodListBuilder().addNewItemLike(new Pod()).endItem().build()) + cleanupRunnable.run() + verify(stagedResourcesStore, never()).removeResources(RESOURCE_ID) + } + + test("Clean the resource if no owners are available.") { + val cleanupRunnable = startCleanupAndGetCleanupRunnable() + cleanerUnderTest.registerResourceForCleaning(RESOURCE_ID, RESOURCES_OWNER) + cleanerUnderTest.markResourceAsUsed(RESOURCE_ID) + when(clock.getTimeMillis()).thenReturn(CURRENT_TIME + INITIAL_ACCESS_EXPIRATION_MS) + when(namespaceOperations.withName(POD_NAMESPACE)).thenReturn(namedNamespaceOperations) + when(namedNamespaceOperations.get()).thenReturn(new Namespace()) + when(podOperations.inNamespace(POD_NAMESPACE)).thenReturn(podsInNamespaceOperations) + when(podsInNamespaceOperations.withLabels(POD_LABELS.asJava)) + .thenReturn(podsWithLabelsOperations) + when(podsWithLabelsOperations.list()).thenReturn(new PodListBuilder().build()) + cleanupRunnable.run() + verify(stagedResourcesStore).removeResources(RESOURCE_ID) + } + + test("Clean up the resource if the namespace does not exist.") { + val cleanupRunnable = startCleanupAndGetCleanupRunnable() + cleanerUnderTest.registerResourceForCleaning(RESOURCE_ID, RESOURCES_OWNER) + cleanerUnderTest.markResourceAsUsed(RESOURCE_ID) + when(clock.getTimeMillis()).thenReturn(CURRENT_TIME + INITIAL_ACCESS_EXPIRATION_MS) + when(namespaceOperations.withName(POD_NAMESPACE)).thenReturn(namedNamespaceOperations) + when(namedNamespaceOperations.get()).thenReturn(null) + cleanupRunnable.run() + verify(stagedResourcesStore).removeResources(RESOURCE_ID) + } + + private def startCleanupAndGetCleanupRunnable(): Runnable = { + val captor = ArgumentCaptor.forClass(classOf[Runnable]) + cleanerUnderTest.start() + verify(cleanerExecutorService).scheduleAtFixedRate( + captor.capture(), + mockitoEq(30000L), + mockitoEq(30000L), + mockitoEq(TimeUnit.MILLISECONDS)) + captor.getValue + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/StagedResourcesStoreSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/StagedResourcesStoreSuite.scala new file mode 100644 index 0000000000000..6b5737ebf2e23 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/StagedResourcesStoreSuite.scala @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.rest.kubernetes + +import java.io.{ByteArrayInputStream, File} +import java.nio.file.Paths + +import com.google.common.io.Files +import org.scalatest.BeforeAndAfter + +import org.apache.spark.SparkFunSuite +import org.apache.spark.util.Utils + +private[spark] class StagedResourcesStoreSuite extends SparkFunSuite with BeforeAndAfter { + + private val resourceBytes = Array[Byte](1, 2, 3, 4) + private val namespace = "namespace" + private var dependencyRootDir: File = _ + private var stagedResourcesStore: StagedResourcesStore = _ + + before { + dependencyRootDir = Utils.createTempDir() + stagedResourcesStore = new StagedResourcesStoreImpl(dependencyRootDir) + } + + after { + dependencyRootDir.delete() + } + + test("Uploads should write data to the underlying disk") { + val resourceIdAndSecret = Utils.tryWithResource(new ByteArrayInputStream(resourceBytes)) { + resourceStream => + stagedResourcesStore.addResources(namespace, resourceStream) + } + val resourceNamespaceDir = Paths.get(dependencyRootDir.getAbsolutePath, "namespace").toFile + assert(resourceNamespaceDir.isDirectory, s"Resource namespace dir was not created at" + + s" ${resourceNamespaceDir.getAbsolutePath} or is not a directory.") + val resourceDirs = resourceNamespaceDir.listFiles() + assert(resourceDirs.length === 1, s"Resource root directory did not have exactly one" + + s" subdirectory. Got: ${resourceDirs.map(_.getAbsolutePath).mkString(",")}") + assert(resourceDirs(0).getName === resourceIdAndSecret.resourceId) + val resourceTgz = new File(resourceDirs(0), "resources.data") + assert(resourceTgz.isFile, + s"Resources written to ${resourceTgz.getAbsolutePath} does not exist or is not a file.") + val resourceTgzBytes = Files.toByteArray(resourceTgz) + assert(resourceTgzBytes.toSeq === resourceBytes.toSeq, "Incorrect resource bytes were written.") + } + + test("Uploading and then getting should return a stream with the written bytes.") { + val resourceIdAndSecret = Utils.tryWithResource(new ByteArrayInputStream(resourceBytes)) { + resourceStream => + stagedResourcesStore.addResources(namespace, resourceStream) + } + val resources = stagedResourcesStore.getResources(resourceIdAndSecret.resourceId) + assert(resources.map(_.resourcesFile) + .map(Files.toByteArray) + .exists(resourceBytes.sameElements(_))) + assert(resources.exists(_.resourceId == resourceIdAndSecret.resourceId)) + assert(resources.exists(_.resourceSecret == resourceIdAndSecret.resourceSecret)) + } + + test("Uploading and then deleting should result in the resource directory being deleted.") { + val resourceIdAndSecret = Utils.tryWithResource(new ByteArrayInputStream(resourceBytes)) { + resourceStream => + stagedResourcesStore.addResources(namespace, resourceStream) + } + stagedResourcesStore.removeResources(resourceIdAndSecret.resourceId) + val resourceNamespaceDir = Paths.get(dependencyRootDir.getAbsolutePath, "namespace").toFile + assert(resourceNamespaceDir.listFiles().isEmpty) + assert(stagedResourcesStore.getResources(resourceIdAndSecret.resourceId).isEmpty) + } +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index 95775d262a69d..6a296d6112c97 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -169,11 +169,14 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { sparkConf.setJars(Seq( CONTAINER_LOCAL_MAIN_APP_RESOURCE, CONTAINER_LOCAL_HELPER_JAR_PATH)) - sparkConf.set(KUBERNETES_DRIVER_CLIENT_KEY_FILE, + sparkConf.set( + s"$APISERVER_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX", kubernetesTestComponents.clientConfig.getClientKeyFile) - sparkConf.set(KUBERNETES_DRIVER_CLIENT_CERT_FILE, + sparkConf.set( + s"$APISERVER_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX", kubernetesTestComponents.clientConfig.getClientCertFile) - sparkConf.set(KUBERNETES_DRIVER_CA_CERT_FILE, + sparkConf.set( + s"$APISERVER_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX", kubernetesTestComponents.clientConfig.getCaCertFile) runSparkPiAndVerifyCompletion(SparkLauncher.NO_RESOURCE) }