diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index 1c8b6798bbdd5..ea7a2e96df0ec 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -365,7 +365,7 @@ package object config extends Logging { " resource staging server to download jars.") .internal() .stringConf - .createWithDefault(INIT_CONTAINER_DOWNLOAD_JARS_SECRET_PATH) + .createWithDefault(INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_JARS_SECRET_PATH) private[spark] val INIT_CONTAINER_DOWNLOAD_FILES_RESOURCE_IDENTIFIER = ConfigBuilder("spark.kubernetes.driver.initcontainer.downloadFilesResourceIdentifier") @@ -380,7 +380,23 @@ package object config extends Logging { " resource staging server to download files.") .internal() .stringConf - .createWithDefault(INIT_CONTAINER_DOWNLOAD_FILES_SECRET_PATH) + .createWithDefault(INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_FILES_SECRET_PATH) + + private[spark] val INIT_CONTAINER_REMOTE_JARS = + ConfigBuilder("spark.kubernetes.driver.initcontainer.remoteJars") + .doc("Comma-separated list of jar URIs to download in the init-container. This is inferred" + + " from spark.jars.") + .internal() + .stringConf + .createOptional + + private[spark] val INIT_CONTAINER_REMOTE_FILES = + ConfigBuilder("spark.kubernetes.driver.initcontainer.remoteFiles") + .doc("Comma-separated list of file URIs to download in the init-container. This is inferred" + + " from spark.files.") + .internal() + .stringConf + .createOptional private[spark] val INIT_CONTAINER_DOCKER_IMAGE = ConfigBuilder("spark.kubernetes.driver.initcontainer.docker.image") @@ -388,22 +404,38 @@ package object config extends Logging { .stringConf .createWithDefault(s"spark-driver-init:$sparkVersion") - private[spark] val DRIVER_LOCAL_JARS_DOWNLOAD_LOCATION = - ConfigBuilder("spark.kubernetes.driver.mountdependencies.jarsDownloadDir") + private[spark] val DRIVER_SUBMITTED_JARS_DOWNLOAD_LOCATION = + ConfigBuilder("spark.kubernetes.driver.mountdependencies.submittedJars.downloadDir") .doc("Location to download local jars to in the driver. When using spark-submit, this" + " directory must be empty and will be mounted as an empty directory volume on the" + " driver pod.") .stringConf .createWithDefault("/var/spark-data/spark-local-jars") - private[spark] val DRIVER_LOCAL_FILES_DOWNLOAD_LOCATION = - ConfigBuilder("spark.kubernetes.driver.mountdependencies.filesDownloadDir") + private[spark] val DRIVER_SUBMITTED_FILES_DOWNLOAD_LOCATION = + ConfigBuilder("spark.kubernetes.driver.mountdependencies.submittedFiles.downloadDir") .doc("Location to download local files to in the driver. When using spark-submit, this" + " directory must be empty and will be mounted as an empty directory volume on the" + " driver pod.") .stringConf .createWithDefault("/var/spark-data/spark-local-files") + private[spark] val DRIVER_REMOTE_JARS_DOWNLOAD_LOCATION = + ConfigBuilder("spark.kubernetes.driver.mountdependencies.remoteJars.downloadDir") + .doc("Location to download remotely-located (e.g. HDFS) jars to in the driver. When" + + " using spark-submit, this directory must be empty and will be mounted as an empty" + + " directory volume on the driver pod.") + .stringConf + .createWithDefault("/var/spark-data/spark-remote-jars") + + private[spark] val DRIVER_REMOTE_FILES_DOWNLOAD_LOCATION = + ConfigBuilder("spark.kubernetes.driver.mountdependencies.remoteFiles.downloadDir") + .doc("Location to download remotely-located (e.g. HDFS) files to in the driver. When" + + " using spark-submit, this directory must be empty and will be mounted as an empty" + + " directory volume on the driver pod.") + .stringConf + .createWithDefault("/var/spark-data/spark-remote-files") + private[spark] val DRIVER_MOUNT_DEPENDENCIES_INIT_TIMEOUT = ConfigBuilder("spark.kubernetes.mountdependencies.mountTimeout") .doc("Timeout before aborting the attempt to download and unpack local dependencies from" + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala index f82cb88b4c622..360f3976911bd 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala @@ -70,7 +70,6 @@ package object constants { private[spark] val ENV_EXECUTOR_ID = "SPARK_EXECUTOR_ID" private[spark] val ENV_EXECUTOR_POD_IP = "SPARK_EXECUTOR_POD_IP" private[spark] val ENV_DRIVER_MEMORY = "SPARK_DRIVER_MEMORY" - private[spark] val ENV_UPLOADED_JARS_DIR = "SPARK_UPLOADED_JARS_DIR" private[spark] val ENV_SUBMIT_EXTRA_CLASSPATH = "SPARK_SUBMIT_EXTRA_CLASSPATH" private[spark] val ENV_MOUNTED_CLASSPATH = "SPARK_MOUNTED_CLASSPATH" private[spark] val ENV_DRIVER_MAIN_CLASS = "SPARK_DRIVER_CLASS" @@ -92,25 +91,59 @@ package object constants { // V2 submission init container private[spark] val INIT_CONTAINER_ANNOTATION = "pod.beta.kubernetes.io/init-containers" - private[spark] val INIT_CONTAINER_SECRETS_VOLUME_NAME = "dependency-secret" - private[spark] val INIT_CONTAINER_SECRETS_VOLUME_MOUNT_PATH = "/mnt/secrets/spark-init" - private[spark] val INIT_CONTAINER_DOWNLOAD_JARS_SECRET_KEY = "downloadJarsSecret" - private[spark] val INIT_CONTAINER_DOWNLOAD_FILES_SECRET_KEY = "downloadFilesSecret" - private[spark] val INIT_CONTAINER_TRUSTSTORE_SECRET_KEY = "trustStore" - private[spark] val INIT_CONTAINER_DOWNLOAD_JARS_SECRET_PATH = - s"$INIT_CONTAINER_SECRETS_VOLUME_MOUNT_PATH/$INIT_CONTAINER_DOWNLOAD_JARS_SECRET_KEY" - private[spark] val INIT_CONTAINER_DOWNLOAD_FILES_SECRET_PATH = - s"$INIT_CONTAINER_SECRETS_VOLUME_MOUNT_PATH/$INIT_CONTAINER_DOWNLOAD_FILES_SECRET_KEY" - private[spark] val INIT_CONTAINER_TRUSTSTORE_PATH = - s"$INIT_CONTAINER_SECRETS_VOLUME_MOUNT_PATH/$INIT_CONTAINER_TRUSTSTORE_SECRET_KEY" - private[spark] val INIT_CONTAINER_DOWNLOAD_CREDENTIALS_PATH = - "/mnt/secrets/kubernetes-credentials" - private[spark] val INIT_CONTAINER_CONFIG_MAP_KEY = "init-driver" - private[spark] val INIT_CONTAINER_PROPERTIES_FILE_VOLUME = "init-container-properties" - private[spark] val INIT_CONTAINER_PROPERTIES_FILE_MOUNT_PATH = "/etc/spark-init/" - private[spark] val INIT_CONTAINER_PROPERTIES_FILE_NAME = "init-driver.properties" - private[spark] val INIT_CONTAINER_PROPERTIES_FILE_PATH = - s"$INIT_CONTAINER_PROPERTIES_FILE_MOUNT_PATH/$INIT_CONTAINER_PROPERTIES_FILE_NAME" - private[spark] val DOWNLOAD_JARS_VOLUME_NAME = "download-jars" - private[spark] val DOWNLOAD_FILES_VOLUME_NAME = "download-files" + + // Init container for downloading submitted files from the staging server. + private[spark] val INIT_CONTAINER_SUBMITTED_FILES_CONTAINER_NAME = + "spark-driver-download-submitted-files" + private[spark] val INIT_CONTAINER_SUBMITTED_FILES_SECRETS_VOLUME_NAME = + "resource-staging-server-secret" + private[spark] val INIT_CONTAINER_SUBMITTED_FILES_SECRETS_VOLUME_MOUNT_PATH = + "/mnt/secrets/spark-init" + private[spark] val INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_JARS_SECRET_KEY = + "downloadSubmittedJarsSecret" + private[spark] val INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_FILES_SECRET_KEY = + "downloadSubmittedFilesSecret" + private[spark] val INIT_CONTAINER_SUBMITTED_FILES_TRUSTSTORE_SECRET_KEY = "trustStore" + private[spark] val INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_JARS_SECRET_PATH = + s"$INIT_CONTAINER_SUBMITTED_FILES_SECRETS_VOLUME_MOUNT_PATH/" + + s"$INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_JARS_SECRET_KEY" + private[spark] val INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_FILES_SECRET_PATH = + s"$INIT_CONTAINER_SUBMITTED_FILES_SECRETS_VOLUME_MOUNT_PATH/" + + s"$INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_FILES_SECRET_KEY" + private[spark] val INIT_CONTAINER_SUBMITTED_FILES_TRUSTSTORE_PATH = + s"$INIT_CONTAINER_SUBMITTED_FILES_SECRETS_VOLUME_MOUNT_PATH/" + + s"$INIT_CONTAINER_SUBMITTED_FILES_TRUSTSTORE_SECRET_KEY" + private[spark] val INIT_CONTAINER_SUBMITTED_FILES_CONFIG_MAP_KEY = + "download-submitted-files" + private[spark] val INIT_CONTAINER_SUBMITTED_FILES_PROPERTIES_FILE_VOLUME = + "download-submitted-files-properties" + private[spark] val INIT_CONTAINER_SUBMITTED_FILES_PROPERTIES_FILE_MOUNT_PATH = + "/etc/spark-init/download-submitted-files" + private[spark] val INIT_CONTAINER_SUBMITTED_FILES_PROPERTIES_FILE_NAME = + "init-driver-download-submitted-files.properties" + private[spark] val INIT_CONTAINER_SUBMITTED_FILES_PROPERTIES_FILE_PATH = + s"$INIT_CONTAINER_SUBMITTED_FILES_PROPERTIES_FILE_MOUNT_PATH/" + + s"$INIT_CONTAINER_SUBMITTED_FILES_PROPERTIES_FILE_NAME" + private[spark] val INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_JARS_VOLUME_NAME = + "download-submitted-jars" + private[spark] val INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_FILES_VOLUME_NAME = + "download-submitted-files" + + // Init container for fetching remote dependencies. + private[spark] val INIT_CONTAINER_REMOTE_FILES_CONTAINER_NAME = + "spark-driver-download-remote-files" + private[spark] val INIT_CONTAINER_REMOTE_FILES_CONFIG_MAP_KEY = + "download-remote-files" + private[spark] val INIT_CONTAINER_REMOTE_FILES_PROPERTIES_FILE_VOLUME = + "download-remote-files-properties" + private[spark] val INIT_CONTAINER_REMOTE_FILES_PROPERTIES_FILE_MOUNT_PATH = + "/etc/spark-init/download-remote-files" + private[spark] val INIT_CONTAINER_REMOTE_FILES_PROPERTIES_FILE_NAME = + "init-driver-download-remote-files.properties" + private[spark] val INIT_CONTAINER_REMOTE_FILES_PROPERTIES_FILE_PATH = + s"$INIT_CONTAINER_REMOTE_FILES_PROPERTIES_FILE_MOUNT_PATH/" + + s"$INIT_CONTAINER_REMOTE_FILES_PROPERTIES_FILE_NAME" + private[spark] val INIT_CONTAINER_REMOTE_FILES_DOWNLOAD_JARS_VOLUME_NAME = "download-remote-jars" + private[spark] val INIT_CONTAINER_REMOTE_FILES_DOWNLOAD_FILES_VOLUME_NAME = + "download-remote-files" } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesFileUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/KubernetesFileUtils.scala similarity index 88% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesFileUtils.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/KubernetesFileUtils.scala index b8e644219097e..55b84bbd747e1 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesFileUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/KubernetesFileUtils.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.deploy.rest.kubernetes.v1 +package org.apache.spark.deploy.kubernetes.submit import org.apache.spark.util.Utils @@ -41,4 +41,8 @@ private[spark] object KubernetesFileUtils { Option(Utils.resolveURI(uri).getScheme).getOrElse("file") == "file" } + def getOnlyRemoteFiles(uris: Iterable[String]): Iterable[String] = { + filterUriStringsByScheme(uris, scheme => scheme != "file" && scheme != "local") + } + } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/Client.scala index e1cfac8feba37..3a93e7cee8253 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/Client.scala @@ -33,7 +33,8 @@ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.deploy.kubernetes.CompressionUtils import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.deploy.rest.kubernetes.v1.{AppResource, ContainerAppResource, HttpClientUtil, KubernetesCreateSubmissionRequest, KubernetesCredentials, KubernetesFileUtils, KubernetesSparkRestApi, RemoteAppResource, UploadedAppResource} +import org.apache.spark.deploy.kubernetes.submit.KubernetesFileUtils +import org.apache.spark.deploy.rest.kubernetes.v1.{AppResource, ContainerAppResource, HttpClientUtil, KubernetesCreateSubmissionRequest, KubernetesCredentials, KubernetesSparkRestApi, RemoteAppResource, UploadedAppResource} import org.apache.spark.internal.Logging import org.apache.spark.util.{ShutdownHookManager, Utils} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/DriverSubmitSslConfigurationProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/DriverSubmitSslConfigurationProvider.scala index 10ffddcd7e7fc..174e9c57a65ca 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/DriverSubmitSslConfigurationProvider.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/DriverSubmitSslConfigurationProvider.scala @@ -29,7 +29,8 @@ import scala.collection.JavaConverters._ import org.apache.spark.{SecurityManager => SparkSecurityManager, SparkConf, SparkException, SSLOptions} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.deploy.rest.kubernetes.v1.{KubernetesFileUtils, PemsToKeyStoreConverter} +import org.apache.spark.deploy.kubernetes.submit.KubernetesFileUtils +import org.apache.spark.deploy.rest.kubernetes.v1.PemsToKeyStoreConverter import org.apache.spark.util.Utils /** diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/Client.scala index 69dbfd041bb86..0c423941cb01f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/Client.scala @@ -17,13 +17,14 @@ package org.apache.spark.deploy.kubernetes.submit.v2 import java.io.File +import java.net.URI import java.util.Collections import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, HasMetadata, OwnerReferenceBuilder, PodBuilder} import scala.collection.JavaConverters._ import scala.collection.mutable -import org.apache.spark.{SecurityManager => SparkSecurityManager, SparkConf, SparkException} +import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ import org.apache.spark.internal.Logging @@ -50,7 +51,8 @@ private[spark] class Client( appArgs: Array[String], mainAppResource: String, kubernetesClientProvider: SubmissionKubernetesClientProvider, - mountedDependencyManagerProvider: MountedDependencyManagerProvider) extends Logging { + submittedDependencyManagerProvider: SubmittedDependencyManagerProvider, + remoteDependencyManagerProvider: DownloadRemoteDependencyManagerProvider) extends Logging { private val namespace = sparkConf.get(KUBERNETES_NAMESPACE) private val master = resolveK8sMaster(sparkConf.get("spark.master")) @@ -132,27 +134,33 @@ private[spark] class Client( .endSpec() val nonDriverPodKubernetesResources = mutable.Buffer[HasMetadata]() + + // resolvedJars is all of the original Spark jars, but files on the local disk that + // were uploaded to the resource staging server are converted to the paths they would + // have been downloaded at. Similarly for resolvedFiles. + // If the resource staging server isn't being used, then resolvedJars = spark.jars. val resolvedJars = mutable.Buffer[String]() val resolvedFiles = mutable.Buffer[String]() val driverPodWithMountedDeps = maybeStagingServerUri.map { stagingServerUri => - val mountedDependencyManager = mountedDependencyManagerProvider.getMountedDependencyManager( - kubernetesAppId, - stagingServerUri, - allLabels, - namespace, - sparkJars, - sparkFiles) - val jarsResourceIdentifier = mountedDependencyManager.uploadJars() - val filesResourceIdentifier = mountedDependencyManager.uploadFiles() - val initContainerKubernetesSecret = mountedDependencyManager.buildInitContainerSecret( + val submittedDependencyManager = submittedDependencyManagerProvider + .getSubmittedDependencyManager( + kubernetesAppId, + stagingServerUri, + allLabels, + namespace, + sparkJars, + sparkFiles) + val jarsResourceIdentifier = submittedDependencyManager.uploadJars() + val filesResourceIdentifier = submittedDependencyManager.uploadFiles() + val initContainerKubernetesSecret = submittedDependencyManager.buildInitContainerSecret( jarsResourceIdentifier.resourceSecret, filesResourceIdentifier.resourceSecret) - val initContainerConfigMap = mountedDependencyManager.buildInitContainerConfigMap( + val initContainerConfigMap = submittedDependencyManager.buildInitContainerConfigMap( jarsResourceIdentifier.resourceId, filesResourceIdentifier.resourceId) - resolvedJars ++= mountedDependencyManager.resolveSparkJars() - resolvedFiles ++= mountedDependencyManager.resolveSparkFiles() + resolvedJars ++= submittedDependencyManager.resolveSparkJars() + resolvedFiles ++= submittedDependencyManager.resolveSparkFiles() nonDriverPodKubernetesResources += initContainerKubernetesSecret nonDriverPodKubernetesResources += initContainerConfigMap - mountedDependencyManager.configurePodToMountLocalDependencies( + submittedDependencyManager.configurePodToMountLocalDependencies( driverContainer.getName, initContainerKubernetesSecret, initContainerConfigMap, basePod) }.getOrElse { sparkJars.map(Utils.resolveURI).foreach { jar => @@ -186,19 +194,35 @@ private[spark] class Client( resolvedSparkConf.get(KUBERNETES_DRIVER_OAUTH_TOKEN).foreach { _ => resolvedSparkConf.set(KUBERNETES_DRIVER_OAUTH_TOKEN.key, "") } + val remoteDependencyManager = remoteDependencyManagerProvider + .getDownloadRemoteDependencyManager( + kubernetesAppId, + resolvedJars, + resolvedFiles) + val downloadRemoteDependenciesConfigMap = remoteDependencyManager + .buildInitContainerConfigMap() + nonDriverPodKubernetesResources += downloadRemoteDependenciesConfigMap + val driverPodWithMountedAndDownloadedDeps = remoteDependencyManager + .configurePodToDownloadRemoteDependencies( + downloadRemoteDependenciesConfigMap, driverContainer.getName, driverPodWithMountedDeps) - val mountedClassPath = resolvedJars.map(Utils.resolveURI).filter { jarUri => - val scheme = Option.apply(jarUri.getScheme).getOrElse("file") - scheme == "local" || scheme == "file" - }.map(_.getPath).mkString(File.pathSeparator) + // The resolved local classpath should *only* contain local file URIs. It consists of the + // driver's classpath (minus spark.driver.extraClassPath which was handled above) with the + // assumption that the remote dependency manager has downloaded all of the remote + // dependencies through its init-container, and thus replaces all the remote URIs with the + // local paths they were downloaded to. + val resolvedLocalClassPath = remoteDependencyManager.resolveLocalClasspath() + resolvedLocalClassPath.foreach { classPathEntry => + require(Option(URI.create(classPathEntry).getScheme).isEmpty) + } val resolvedDriverJavaOpts = resolvedSparkConf.getAll.map { case (confKey, confValue) => s"-D$confKey=$confValue" }.mkString(" ") + driverJavaOptions.map(" " + _).getOrElse("") - val resolvedDriverPod = driverPodWithMountedDeps.editSpec() + val resolvedDriverPod = driverPodWithMountedAndDownloadedDeps.editSpec() .editMatchingContainer(new ContainerNameEqualityPredicate(driverContainer.getName)) .addNewEnv() .withName(ENV_MOUNTED_CLASSPATH) - .withValue(mountedClassPath) + .withValue(resolvedLocalClassPath.mkString(File.pathSeparator)) .endEnv() .addNewEnv() .withName(ENV_DRIVER_JAVA_OPTS) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/DownloadRemoteDependencyManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/DownloadRemoteDependencyManager.scala new file mode 100644 index 0000000000000..d346c971ad06b --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/DownloadRemoteDependencyManager.scala @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.submit.v2 + +import java.io.File + +import io.fabric8.kubernetes.api.model.{ConfigMap, ContainerBuilder, EmptyDirVolumeSource, PodBuilder, VolumeMountBuilder} + +import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.deploy.kubernetes.submit.KubernetesFileUtils +import org.apache.spark.util.Utils + +private[spark] trait DownloadRemoteDependencyManager { + + def buildInitContainerConfigMap(): ConfigMap + + def configurePodToDownloadRemoteDependencies( + initContainerConfigMap: ConfigMap, + driverContainerName: String, + originalPodSpec: PodBuilder): PodBuilder + + /** + * Return the local classpath of the driver after all of its dependencies have been downloaded. + */ + def resolveLocalClasspath(): Seq[String] +} + +// TODO this is very similar to SubmittedDependencyManagerImpl. We should consider finding a way to +// consolidate the implementations. +private[spark] class DownloadRemoteDependencyManagerImpl( + kubernetesAppId: String, + sparkJars: Seq[String], + sparkFiles: Seq[String], + jarsDownloadPath: String, + filesDownloadPath: String, + initContainerImage: String) extends DownloadRemoteDependencyManager { + + private val jarsToDownload = KubernetesFileUtils.getOnlyRemoteFiles(sparkJars) + private val filesToDownload = KubernetesFileUtils.getOnlyRemoteFiles(sparkFiles) + + override def buildInitContainerConfigMap(): ConfigMap = { + val remoteJarsConf = if (jarsToDownload.nonEmpty) { + Map(INIT_CONTAINER_REMOTE_JARS.key -> jarsToDownload.mkString(",")) + } else { + Map.empty[String, String] + } + val remoteFilesConf = if (filesToDownload.nonEmpty) { + Map(INIT_CONTAINER_REMOTE_FILES.key -> filesToDownload.mkString(",")) + } else { + Map.empty[String, String] + } + val initContainerConfig = Map[String, String]( + DRIVER_REMOTE_JARS_DOWNLOAD_LOCATION.key -> jarsDownloadPath, + DRIVER_REMOTE_FILES_DOWNLOAD_LOCATION.key -> filesDownloadPath) ++ + remoteJarsConf ++ + remoteFilesConf + PropertiesConfigMapFromScalaMapBuilder.buildConfigMap( + s"$kubernetesAppId-remote-files-download-init", + INIT_CONTAINER_REMOTE_FILES_CONFIG_MAP_KEY, + initContainerConfig) + } + + override def configurePodToDownloadRemoteDependencies( + initContainerConfigMap: ConfigMap, + driverContainerName: String, + originalPodSpec: PodBuilder): PodBuilder = { + val sharedVolumeMounts = Seq( + new VolumeMountBuilder() + .withName(INIT_CONTAINER_REMOTE_FILES_DOWNLOAD_JARS_VOLUME_NAME) + .withMountPath(jarsDownloadPath) + .build(), + new VolumeMountBuilder() + .withName(INIT_CONTAINER_REMOTE_FILES_DOWNLOAD_FILES_VOLUME_NAME) + .withMountPath(filesDownloadPath) + .build()) + val initContainer = new ContainerBuilder() + .withName(INIT_CONTAINER_REMOTE_FILES_CONTAINER_NAME) + .withArgs(INIT_CONTAINER_REMOTE_FILES_PROPERTIES_FILE_PATH) + .addNewVolumeMount() + .withName(INIT_CONTAINER_REMOTE_FILES_PROPERTIES_FILE_VOLUME) + .withMountPath(INIT_CONTAINER_REMOTE_FILES_PROPERTIES_FILE_MOUNT_PATH) + .endVolumeMount() + .addToVolumeMounts(sharedVolumeMounts: _*) + .withImage(initContainerImage) + .withImagePullPolicy("IfNotPresent") + .build() + InitContainerUtil.appendInitContainer(originalPodSpec, initContainer) + .editSpec() + .addNewVolume() + .withName(INIT_CONTAINER_REMOTE_FILES_PROPERTIES_FILE_VOLUME) + .withNewConfigMap() + .withName(initContainerConfigMap.getMetadata.getName) + .addNewItem() + .withKey(INIT_CONTAINER_REMOTE_FILES_CONFIG_MAP_KEY) + .withPath(INIT_CONTAINER_REMOTE_FILES_PROPERTIES_FILE_NAME) + .endItem() + .endConfigMap() + .endVolume() + .addNewVolume() + .withName(INIT_CONTAINER_REMOTE_FILES_DOWNLOAD_JARS_VOLUME_NAME) + .withEmptyDir(new EmptyDirVolumeSource()) + .endVolume() + .addNewVolume() + .withName(INIT_CONTAINER_REMOTE_FILES_DOWNLOAD_FILES_VOLUME_NAME) + .withEmptyDir(new EmptyDirVolumeSource()) + .endVolume() + .editMatchingContainer(new ContainerNameEqualityPredicate(driverContainerName)) + .addToVolumeMounts(sharedVolumeMounts: _*) + .endContainer() + .endSpec() + } + + override def resolveLocalClasspath(): Seq[String] = { + sparkJars.map { jar => + val jarUri = Utils.resolveURI(jar) + val scheme = Option.apply(jarUri.getScheme).getOrElse("file") + scheme match { + case "file" | "local" => jarUri.getPath + case _ => + val fileName = new File(jarUri.getPath).getName + s"$jarsDownloadPath/$fileName" + } + } + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/DownloadRemoteDependencyManagerProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/DownloadRemoteDependencyManagerProvider.scala new file mode 100644 index 0000000000000..b5cc4c24d582d --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/DownloadRemoteDependencyManagerProvider.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.submit.v2 + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.kubernetes.config._ + +private[spark] trait DownloadRemoteDependencyManagerProvider { + def getDownloadRemoteDependencyManager( + kubernetesAppId: String, + sparkJars: Seq[String], + sparkFiles: Seq[String]): DownloadRemoteDependencyManager +} + +private[spark] class DownloadRemoteDependencyManagerProviderImpl( + sparkConf: SparkConf) extends DownloadRemoteDependencyManagerProvider { + + override def getDownloadRemoteDependencyManager( + kubernetesAppId: String, + sparkJars: Seq[String], + sparkFiles: Seq[String]): DownloadRemoteDependencyManager = { + new DownloadRemoteDependencyManagerImpl( + kubernetesAppId, + sparkJars, + sparkFiles, + sparkConf.get(DRIVER_REMOTE_JARS_DOWNLOAD_LOCATION), + sparkConf.get(DRIVER_REMOTE_FILES_DOWNLOAD_LOCATION), + sparkConf.get(INIT_CONTAINER_DOCKER_IMAGE)) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/InitContainerUtil.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/InitContainerUtil.scala new file mode 100644 index 0000000000000..0526ca53baaab --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/InitContainerUtil.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.submit.v2 + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import io.fabric8.kubernetes.api.model.{Container, PodBuilder} +import scala.collection.JavaConverters._ + +import org.apache.spark.deploy.kubernetes.constants._ + +private[spark] object InitContainerUtil { + + private val OBJECT_MAPPER = new ObjectMapper().registerModule(new DefaultScalaModule) + + def appendInitContainer( + originalPodSpec: PodBuilder, initContainer: Container): PodBuilder = { + val resolvedInitContainers = originalPodSpec + .editMetadata() + .getAnnotations + .asScala + .get(INIT_CONTAINER_ANNOTATION) + .map { existingInitContainerAnnotation => + val existingInitContainers = OBJECT_MAPPER.readValue( + existingInitContainerAnnotation, classOf[List[Container]]) + existingInitContainers ++ Seq(initContainer) + }.getOrElse(Seq(initContainer)) + val resolvedSerializedInitContainers = OBJECT_MAPPER.writeValueAsString(resolvedInitContainers) + originalPodSpec + .editMetadata() + .removeFromAnnotations(INIT_CONTAINER_ANNOTATION) + .addToAnnotations(INIT_CONTAINER_ANNOTATION, resolvedSerializedInitContainers) + .endMetadata() + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/PropertiesConfigMapFromScalaMapBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/PropertiesConfigMapFromScalaMapBuilder.scala new file mode 100644 index 0000000000000..cb9194552d2b6 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/PropertiesConfigMapFromScalaMapBuilder.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.kubernetes.submit.v2 + +import java.io.StringWriter +import java.util.Properties + +import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapBuilder} + +/** + * Creates a config map from a map object, with a single given key + * and writing the map in a {@link java.util.Properties} format. + */ +private[spark] object PropertiesConfigMapFromScalaMapBuilder { + + def buildConfigMap( + configMapName: String, + configMapKey: String, + config: Map[String, String]): ConfigMap = { + val properties = new Properties() + config.foreach { case (key, value) => properties.setProperty(key, value) } + val propertiesWriter = new StringWriter() + properties.store(propertiesWriter, + s"Java properties built from Kubernetes config map with name: $configMapName" + + " and config map key: $configMapKey") + new ConfigMapBuilder() + .withNewMetadata() + .withName(configMapName) + .endMetadata() + .addToData(configMapKey, propertiesWriter.toString) + .build() + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/MountedDependencyManagerImpl.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencyManagerImpl.scala similarity index 67% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/MountedDependencyManagerImpl.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencyManagerImpl.scala index 9dbbcd0d56a3b..bf7c846d447fb 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/MountedDependencyManagerImpl.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencyManagerImpl.scala @@ -16,15 +16,14 @@ */ package org.apache.spark.deploy.kubernetes.submit.v2 -import java.io.{File, FileOutputStream, StringWriter} -import java.util.Properties +import java.io.{File, FileOutputStream} import javax.ws.rs.core.MediaType import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule import com.google.common.base.Charsets import com.google.common.io.{BaseEncoding, Files} -import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapBuilder, Container, ContainerBuilder, EmptyDirVolumeSource, PodBuilder, Secret, SecretBuilder, VolumeMount, VolumeMountBuilder} +import io.fabric8.kubernetes.api.model.{ConfigMap, ContainerBuilder, EmptyDirVolumeSource, PodBuilder, Secret, SecretBuilder, VolumeMount, VolumeMountBuilder} import okhttp3.RequestBody import retrofit2.Call import scala.collection.JavaConverters._ @@ -34,11 +33,12 @@ import org.apache.spark.{SparkException, SSLOptions} import org.apache.spark.deploy.kubernetes.CompressionUtils import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.deploy.rest.kubernetes.v1.{KubernetesCredentials, KubernetesFileUtils} +import org.apache.spark.deploy.kubernetes.submit.KubernetesFileUtils +import org.apache.spark.deploy.rest.kubernetes.v1.KubernetesCredentials import org.apache.spark.deploy.rest.kubernetes.v2.{ResourceStagingServiceRetrofit, RetrofitClientFactory, StagedResourceIdentifier} import org.apache.spark.util.Utils -private[spark] trait MountedDependencyManager { +private[spark] trait SubmittedDependencyManager { /** * Upload submitter-local jars to the resource staging server. @@ -77,10 +77,10 @@ private[spark] trait MountedDependencyManager { } /** - * Default implementation of a MountedDependencyManager that is backed by a + * Default implementation of a SubmittedDependencyManager that is backed by a * Resource Staging Service. */ -private[spark] class MountedDependencyManagerImpl( +private[spark] class SubmittedDependencyManagerImpl( kubernetesAppId: String, podLabels: Map[String, String], podNamespace: String, @@ -92,7 +92,7 @@ private[spark] class MountedDependencyManagerImpl( sparkJars: Seq[String], sparkFiles: Seq[String], stagingServiceSslOptions: SSLOptions, - retrofitClientFactory: RetrofitClientFactory) extends MountedDependencyManager { + retrofitClientFactory: RetrofitClientFactory) extends SubmittedDependencyManager { private val OBJECT_MAPPER = new ObjectMapper().registerModule(new DefaultScalaModule) private def localUriStringsToFiles(uris: Seq[String]): Iterable[File] = { @@ -145,79 +145,57 @@ private[spark] class MountedDependencyManagerImpl( originalPodSpec: PodBuilder): PodBuilder = { val sharedVolumeMounts = Seq[VolumeMount]( new VolumeMountBuilder() - .withName(DOWNLOAD_JARS_VOLUME_NAME) + .withName(INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_JARS_VOLUME_NAME) .withMountPath(jarsDownloadPath) .build(), new VolumeMountBuilder() - .withName(DOWNLOAD_FILES_VOLUME_NAME) + .withName(INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_FILES_VOLUME_NAME) .withMountPath(filesDownloadPath) .build()) - val initContainers = Seq(new ContainerBuilder() - .withName("spark-driver-init") + val initContainer = new ContainerBuilder() + .withName(INIT_CONTAINER_SUBMITTED_FILES_CONTAINER_NAME) .withImage(initContainerImage) .withImagePullPolicy("IfNotPresent") .addNewVolumeMount() - .withName(INIT_CONTAINER_PROPERTIES_FILE_VOLUME) - .withMountPath(INIT_CONTAINER_PROPERTIES_FILE_MOUNT_PATH) + .withName(INIT_CONTAINER_SUBMITTED_FILES_PROPERTIES_FILE_VOLUME) + .withMountPath(INIT_CONTAINER_SUBMITTED_FILES_PROPERTIES_FILE_MOUNT_PATH) .endVolumeMount() .addNewVolumeMount() - .withName(INIT_CONTAINER_SECRETS_VOLUME_NAME) - .withMountPath(INIT_CONTAINER_SECRETS_VOLUME_MOUNT_PATH) + .withName(INIT_CONTAINER_SUBMITTED_FILES_SECRETS_VOLUME_NAME) + .withMountPath(INIT_CONTAINER_SUBMITTED_FILES_SECRETS_VOLUME_MOUNT_PATH) .endVolumeMount() .addToVolumeMounts(sharedVolumeMounts: _*) - .addToArgs(INIT_CONTAINER_PROPERTIES_FILE_PATH) - .build()) - - // Make sure we don't override any user-provided init containers by just appending ours to - // the existing list. - val resolvedInitContainers = originalPodSpec - .editMetadata() - .getAnnotations - .asScala - .get(INIT_CONTAINER_ANNOTATION) - .map { existingInitContainerAnnotation => - val existingInitContainers = OBJECT_MAPPER.readValue( - existingInitContainerAnnotation, classOf[List[Container]]) - existingInitContainers ++ initContainers - }.getOrElse(initContainers) - val resolvedSerializedInitContainers = OBJECT_MAPPER.writeValueAsString(resolvedInitContainers) - originalPodSpec - .editMetadata() - .removeFromAnnotations(INIT_CONTAINER_ANNOTATION) - .addToAnnotations(INIT_CONTAINER_ANNOTATION, resolvedSerializedInitContainers) - .endMetadata() + .addToArgs(INIT_CONTAINER_SUBMITTED_FILES_PROPERTIES_FILE_PATH) + .build() + InitContainerUtil.appendInitContainer(originalPodSpec, initContainer) .editSpec() .addNewVolume() - .withName(INIT_CONTAINER_PROPERTIES_FILE_VOLUME) + .withName(INIT_CONTAINER_SUBMITTED_FILES_PROPERTIES_FILE_VOLUME) .withNewConfigMap() .withName(initContainerConfigMap.getMetadata.getName) .addNewItem() - .withKey(INIT_CONTAINER_CONFIG_MAP_KEY) - .withPath(INIT_CONTAINER_PROPERTIES_FILE_NAME) + .withKey(INIT_CONTAINER_SUBMITTED_FILES_CONFIG_MAP_KEY) + .withPath(INIT_CONTAINER_SUBMITTED_FILES_PROPERTIES_FILE_NAME) .endItem() .endConfigMap() .endVolume() .addNewVolume() - .withName(DOWNLOAD_JARS_VOLUME_NAME) + .withName(INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_JARS_VOLUME_NAME) .withEmptyDir(new EmptyDirVolumeSource()) .endVolume() .addNewVolume() - .withName(DOWNLOAD_FILES_VOLUME_NAME) + .withName(INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_FILES_VOLUME_NAME) .withEmptyDir(new EmptyDirVolumeSource()) .endVolume() .addNewVolume() - .withName(INIT_CONTAINER_SECRETS_VOLUME_NAME) + .withName(INIT_CONTAINER_SUBMITTED_FILES_SECRETS_VOLUME_NAME) .withNewSecret() .withSecretName(initContainerSecret.getMetadata.getName) .endSecret() .endVolume() .editMatchingContainer(new ContainerNameEqualityPredicate(driverContainerName)) .addToVolumeMounts(sharedVolumeMounts: _*) - .addNewEnv() - .withName(ENV_UPLOADED_JARS_DIR) - .withValue(jarsDownloadPath) - .endEnv() .endContainer() .endSpec() } @@ -226,14 +204,14 @@ private[spark] class MountedDependencyManagerImpl( val trustStoreBase64 = stagingServiceSslOptions.trustStore.map { trustStoreFile => require(trustStoreFile.isFile, "Dependency server trustStore provided at" + trustStoreFile.getAbsolutePath + " does not exist or is not a file.") - (INIT_CONTAINER_TRUSTSTORE_SECRET_KEY, + (INIT_CONTAINER_SUBMITTED_FILES_TRUSTSTORE_SECRET_KEY, BaseEncoding.base64().encode(Files.toByteArray(trustStoreFile))) }.toMap val jarsSecretBase64 = BaseEncoding.base64().encode(jarsSecret.getBytes(Charsets.UTF_8)) val filesSecretBase64 = BaseEncoding.base64().encode(filesSecret.getBytes(Charsets.UTF_8)) val secretData = Map( - INIT_CONTAINER_DOWNLOAD_JARS_SECRET_KEY -> jarsSecretBase64, - INIT_CONTAINER_DOWNLOAD_FILES_SECRET_KEY -> filesSecretBase64) ++ + INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_JARS_SECRET_KEY -> jarsSecretBase64, + INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_FILES_SECRET_KEY -> filesSecretBase64) ++ trustStoreBase64 val kubernetesSecret = new SecretBuilder() .withNewMetadata() @@ -246,40 +224,32 @@ private[spark] class MountedDependencyManagerImpl( override def buildInitContainerConfigMap( jarsResourceId: String, filesResourceId: String): ConfigMap = { - val initContainerProperties = new Properties() - initContainerProperties.setProperty(RESOURCE_STAGING_SERVER_URI.key, stagingServerUri) - initContainerProperties.setProperty(DRIVER_LOCAL_JARS_DOWNLOAD_LOCATION.key, jarsDownloadPath) - initContainerProperties.setProperty(DRIVER_LOCAL_FILES_DOWNLOAD_LOCATION.key, filesDownloadPath) - initContainerProperties.setProperty( - INIT_CONTAINER_DOWNLOAD_JARS_RESOURCE_IDENTIFIER.key, jarsResourceId) - initContainerProperties.setProperty( - INIT_CONTAINER_DOWNLOAD_JARS_SECRET_LOCATION.key, INIT_CONTAINER_DOWNLOAD_JARS_SECRET_PATH) - initContainerProperties.setProperty( - INIT_CONTAINER_DOWNLOAD_FILES_RESOURCE_IDENTIFIER.key, filesResourceId) - initContainerProperties.setProperty( - INIT_CONTAINER_DOWNLOAD_FILES_SECRET_LOCATION.key, INIT_CONTAINER_DOWNLOAD_FILES_SECRET_PATH) - initContainerProperties.setProperty(DRIVER_MOUNT_DEPENDENCIES_INIT_TIMEOUT.key, - s"${downloadTimeoutMinutes}m") - stagingServiceSslOptions.trustStore.foreach { _ => - initContainerProperties.setProperty(RESOURCE_STAGING_SERVER_TRUSTSTORE_FILE.key, - INIT_CONTAINER_TRUSTSTORE_PATH) - } - initContainerProperties.setProperty(RESOURCE_STAGING_SERVER_SSL_ENABLED.key, - stagingServiceSslOptions.enabled.toString) - stagingServiceSslOptions.trustStorePassword.foreach { password => - initContainerProperties.setProperty(RESOURCE_STAGING_SERVER_TRUSTSTORE_PASSWORD.key, password) - } - stagingServiceSslOptions.trustStoreType.foreach { storeType => - initContainerProperties.setProperty(RESOURCE_STAGING_SERVER_TRUSTSTORE_TYPE.key, storeType) - } - val propertiesWriter = new StringWriter() - initContainerProperties.store(propertiesWriter, "Init-container properties.") - new ConfigMapBuilder() - .withNewMetadata() - .withName(s"$kubernetesAppId-init-properties") - .endMetadata() - .addToData(INIT_CONTAINER_CONFIG_MAP_KEY, propertiesWriter.toString) - .build() + val initContainerConfig = Map[String, String]( + RESOURCE_STAGING_SERVER_URI.key -> stagingServerUri, + DRIVER_SUBMITTED_JARS_DOWNLOAD_LOCATION.key -> jarsDownloadPath, + DRIVER_SUBMITTED_FILES_DOWNLOAD_LOCATION.key -> filesDownloadPath, + INIT_CONTAINER_DOWNLOAD_JARS_RESOURCE_IDENTIFIER.key -> jarsResourceId, + INIT_CONTAINER_DOWNLOAD_JARS_SECRET_LOCATION.key -> + INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_JARS_SECRET_PATH, + INIT_CONTAINER_DOWNLOAD_FILES_RESOURCE_IDENTIFIER.key -> filesResourceId, + INIT_CONTAINER_DOWNLOAD_FILES_SECRET_LOCATION.key -> + INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_FILES_SECRET_PATH, + DRIVER_MOUNT_DEPENDENCIES_INIT_TIMEOUT.key -> s"${downloadTimeoutMinutes}m", + RESOURCE_STAGING_SERVER_SSL_ENABLED.key -> stagingServiceSslOptions.enabled.toString) ++ + stagingServiceSslOptions.trustStore.map { _ => + (RESOURCE_STAGING_SERVER_TRUSTSTORE_FILE.key, + INIT_CONTAINER_SUBMITTED_FILES_TRUSTSTORE_PATH) + }.toMap ++ + stagingServiceSslOptions.trustStorePassword.map { password => + (RESOURCE_STAGING_SERVER_TRUSTSTORE_PASSWORD.key, password) + }.toMap ++ + stagingServiceSslOptions.trustStoreType.map { storeType => + (RESOURCE_STAGING_SERVER_TRUSTSTORE_TYPE.key, storeType) + }.toMap + PropertiesConfigMapFromScalaMapBuilder.buildConfigMap( + s"$kubernetesAppId-staging-server-download-init", + INIT_CONTAINER_SUBMITTED_FILES_CONFIG_MAP_KEY, + initContainerConfig) } override def resolveSparkJars(): Seq[String] = resolveLocalFiles(sparkJars, jarsDownloadPath) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/MountedDependencyManagerProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencyManagerProvider.scala similarity index 76% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/MountedDependencyManagerProvider.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencyManagerProvider.scala index 8f09112132b2c..799fbb1b26c43 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/MountedDependencyManagerProvider.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencyManagerProvider.scala @@ -20,35 +20,35 @@ import org.apache.spark.{SecurityManager => SparkSecurityManager, SparkConf} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.rest.kubernetes.v2.RetrofitClientFactoryImpl -private[spark] trait MountedDependencyManagerProvider { - def getMountedDependencyManager( +private[spark] trait SubmittedDependencyManagerProvider { + def getSubmittedDependencyManager( kubernetesAppId: String, stagingServerUri: String, podLabels: Map[String, String], podNamespace: String, sparkJars: Seq[String], - sparkFiles: Seq[String]): MountedDependencyManager + sparkFiles: Seq[String]): SubmittedDependencyManager } -private[spark] class MountedDependencyManagerProviderImpl(sparkConf: SparkConf) - extends MountedDependencyManagerProvider { - override def getMountedDependencyManager( +private[spark] class SubmittedDependencyManagerProviderImpl(sparkConf: SparkConf) + extends SubmittedDependencyManagerProvider { + override def getSubmittedDependencyManager( kubernetesAppId: String, stagingServerUri: String, podLabels: Map[String, String], podNamespace: String, sparkJars: Seq[String], - sparkFiles: Seq[String]): MountedDependencyManager = { + sparkFiles: Seq[String]): SubmittedDependencyManager = { val resourceStagingServerSslOptions = new SparkSecurityManager(sparkConf) .getSSLOptions("kubernetes.resourceStagingServer") - new MountedDependencyManagerImpl( + new SubmittedDependencyManagerImpl( kubernetesAppId, podLabels, podNamespace, stagingServerUri, sparkConf.get(INIT_CONTAINER_DOCKER_IMAGE), - sparkConf.get(DRIVER_LOCAL_JARS_DOWNLOAD_LOCATION), - sparkConf.get(DRIVER_LOCAL_FILES_DOWNLOAD_LOCATION), + sparkConf.get(DRIVER_SUBMITTED_JARS_DOWNLOAD_LOCATION), + sparkConf.get(DRIVER_SUBMITTED_FILES_DOWNLOAD_LOCATION), sparkConf.get(DRIVER_MOUNT_DEPENDENCIES_INIT_TIMEOUT), sparkJars, sparkFiles, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesSparkRestServer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesSparkRestServer.scala index ca05fe767146b..7847ba2546594 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesSparkRestServer.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesSparkRestServer.scala @@ -35,6 +35,7 @@ import org.apache.spark.{SecurityManager, SPARK_VERSION => sparkVersion, SparkCo import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.kubernetes.CompressionUtils import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.deploy.kubernetes.submit.KubernetesFileUtils import org.apache.spark.deploy.rest._ import org.apache.spark.internal.config.OptionalConfigEntry import org.apache.spark.util.{ShutdownHookManager, ThreadUtils, Utils} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyDownloadInitContainer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyDownloadInitContainer.scala index 680d305985cc0..af6022f9e1465 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyDownloadInitContainer.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyDownloadInitContainer.scala @@ -25,12 +25,15 @@ import com.google.common.io.Files import com.google.common.util.concurrent.SettableFuture import okhttp3.ResponseBody import retrofit2.{Call, Callback, Response} +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration.Duration -import org.apache.spark.{SecurityManager => SparkSecurityManager, SparkConf, SparkException} +import org.apache.spark.{SecurityManager => SparkSecurityManager, SparkConf} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.CompressionUtils import org.apache.spark.internal.Logging -import org.apache.spark.util.Utils +import org.apache.spark.util.{ThreadUtils, Utils} private trait WaitableCallback[T] extends Callback[T] { private val complete = SettableFuture.create[Boolean] @@ -61,55 +64,153 @@ private class DownloadTarGzCallback(downloadDir: File) extends WaitableCallback[ } } +// Extracted for testing so that unit tests don't have to depend on Utils.fetchFile +private[v2] trait FileFetcher { + def fetchFile(uri: String, targetDir: File): Unit +} + +private class FileFetcherImpl(sparkConf: SparkConf, securityManager: SparkSecurityManager) + extends FileFetcher { + def fetchFile(uri: String, targetDir: File): Unit = { + Utils.fetchFile( + url = uri, + targetDir = targetDir, + conf = sparkConf, + securityMgr = securityManager, + hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf), + timestamp = System.currentTimeMillis(), + useCache = false) + } +} + +/** + * Process that fetches files from a resource staging server and/or arbi trary remote locations. + * + * The init-container can handle fetching files from any of those sources, but not all of the + * sources need to be specified. This allows for composing multiple instances of this container + * with different configurations for different download sources, or using the same container to + * download everything at once. + */ private[spark] class KubernetesSparkDependencyDownloadInitContainer( - sparkConf: SparkConf, retrofitClientFactory: RetrofitClientFactory) extends Logging { + sparkConf: SparkConf, + retrofitClientFactory: RetrofitClientFactory, + fileFetcher: FileFetcher, + securityManager: SparkSecurityManager) extends Logging { - private val resourceStagingServerUri = sparkConf.get(RESOURCE_STAGING_SERVER_URI) - .getOrElse(throw new SparkException("No dependency server URI was provided.")) + private implicit val downloadExecutor = ExecutionContext.fromExecutorService( + ThreadUtils.newDaemonCachedThreadPool("download-executor")) + private val maybeResourceStagingServerUri = sparkConf.get(RESOURCE_STAGING_SERVER_URI) - private val downloadJarsResourceIdentifier = sparkConf + private val maybeDownloadJarsResourceIdentifier = sparkConf .get(INIT_CONTAINER_DOWNLOAD_JARS_RESOURCE_IDENTIFIER) - .getOrElse(throw new SparkException("No resource identifier provided for jars.")) private val downloadJarsSecretLocation = new File( sparkConf.get(INIT_CONTAINER_DOWNLOAD_JARS_SECRET_LOCATION)) - private val downloadFilesResourceIdentifier = sparkConf + private val maybeDownloadFilesResourceIdentifier = sparkConf .get(INIT_CONTAINER_DOWNLOAD_FILES_RESOURCE_IDENTIFIER) - .getOrElse(throw new SparkException("No resource identifier provided for files.")) private val downloadFilesSecretLocation = new File( sparkConf.get(INIT_CONTAINER_DOWNLOAD_FILES_SECRET_LOCATION)) - require(downloadJarsSecretLocation.isFile, "Application jars download secret provided" + - s" at ${downloadJarsSecretLocation.getAbsolutePath} does not exist or is not a file.") - require(downloadFilesSecretLocation.isFile, "Application files download secret provided" + - s" at ${downloadFilesSecretLocation.getAbsolutePath} does not exist or is not a file.") - - private val jarsDownloadDir = new File(sparkConf.get(DRIVER_LOCAL_JARS_DOWNLOAD_LOCATION)) - require(jarsDownloadDir.isDirectory, "Application jars download directory provided at" + - s" ${jarsDownloadDir.getAbsolutePath} does not exist or is not a directory.") - - private val filesDownloadDir = new File(sparkConf.get(DRIVER_LOCAL_FILES_DOWNLOAD_LOCATION)) - require(filesDownloadDir.isDirectory, "Application files download directory provided at" + - s" ${filesDownloadDir.getAbsolutePath} does not exist or is not a directory.") + + private val stagingServerJarsDownloadDir = new File( + sparkConf.get(DRIVER_SUBMITTED_JARS_DOWNLOAD_LOCATION)) + private val stagingServerFilesDownloadDir = new File( + sparkConf.get(DRIVER_SUBMITTED_FILES_DOWNLOAD_LOCATION)) + + private val remoteJars = sparkConf.get(INIT_CONTAINER_REMOTE_JARS) + private val remoteFiles = sparkConf.get(INIT_CONTAINER_REMOTE_FILES) + private val remoteJarsDownloadDir = new File( + sparkConf.get(DRIVER_REMOTE_JARS_DOWNLOAD_LOCATION)) + private val remoteFilesDownloadDir = new File( + sparkConf.get(DRIVER_REMOTE_FILES_DOWNLOAD_LOCATION)) + private val downloadTimeoutMinutes = sparkConf.get(DRIVER_MOUNT_DEPENDENCIES_INIT_TIMEOUT) def run(): Unit = { - val securityManager = new SparkSecurityManager(sparkConf) - val sslOptions = securityManager.getSSLOptions("kubernetes.resourceStagingServer") - val service = retrofitClientFactory.createRetrofitClient( - resourceStagingServerUri, classOf[ResourceStagingServiceRetrofit], sslOptions) - val jarsSecret = Files.toString(downloadJarsSecretLocation, Charsets.UTF_8) - val filesSecret = Files.toString(downloadFilesSecretLocation, Charsets.UTF_8) - val downloadJarsCallback = new DownloadTarGzCallback(jarsDownloadDir) - val downloadFilesCallback = new DownloadTarGzCallback(filesDownloadDir) - service.downloadResources(downloadJarsResourceIdentifier, jarsSecret) - .enqueue(downloadJarsCallback) - service.downloadResources(downloadFilesResourceIdentifier, filesSecret) - .enqueue(downloadFilesCallback) - logInfo("Waiting to download jars...") - downloadJarsCallback.waitForCompletion(downloadTimeoutMinutes, TimeUnit.MINUTES) - logInfo(s"Jars downloaded to ${jarsDownloadDir.getAbsolutePath}") - logInfo("Waiting to download files...") - downloadFilesCallback.waitForCompletion(downloadTimeoutMinutes, TimeUnit.MINUTES) - logInfo(s"Files downloaded to ${filesDownloadDir.getAbsolutePath}") + val resourceStagingServerJarsDownload = Future[Unit] { + downloadResourcesFromStagingServer( + maybeDownloadJarsResourceIdentifier, + downloadJarsSecretLocation, + stagingServerJarsDownloadDir, + "Starting to download jars from resource staging server...", + "Finished downloading jars from resource staging server.", + s"Application jars download secret provided at" + + s" ${downloadJarsSecretLocation.getAbsolutePath} does not exist or is not a file.", + s"Application jars download directory provided at" + + s" ${stagingServerJarsDownloadDir.getAbsolutePath} does not exist or is not a directory.") + } + val resourceStagingServerFilesDownload = Future[Unit] { + downloadResourcesFromStagingServer( + maybeDownloadFilesResourceIdentifier, + downloadFilesSecretLocation, + stagingServerFilesDownloadDir, + "Starting to download files from resource staging server...", + "Finished downloading files from resource staging server.", + s"Application files download secret provided at" + + s" ${downloadFilesSecretLocation.getAbsolutePath} does not exist or is not a file.", + s"Application files download directory provided at" + + s" ${stagingServerFilesDownloadDir.getAbsolutePath} does not exist or is not" + + s" a directory.") + } + val remoteJarsDownload = Future[Unit] { + downloadFiles(remoteJars, + remoteJarsDownloadDir, + s"Remote jars download directory specified at $remoteJarsDownloadDir does not exist" + + s" or is not a directory.") + } + val remoteFilesDownload = Future[Unit] { + downloadFiles(remoteFiles, + remoteFilesDownloadDir, + s"Remote files download directory specified at $remoteFilesDownloadDir does not exist" + + s" or is not a directory.") + } + waitForFutures( + resourceStagingServerJarsDownload, + resourceStagingServerFilesDownload, + remoteJarsDownload, + remoteFilesDownload) + } + + private def downloadResourcesFromStagingServer( + maybeResourceId: Option[String], + resourceSecretLocation: File, + resourceDownloadDir: File, + downloadStartMessage: String, + downloadFinishedMessage: String, + errMessageOnSecretNotAFile: String, + errMessageOnDownloadDirNotADirectory: String): Unit = { + maybeResourceStagingServerUri.foreach { resourceStagingServerUri => + maybeResourceId.foreach { resourceId => + require(resourceSecretLocation.isFile, errMessageOnSecretNotAFile) + require(resourceDownloadDir.isDirectory, errMessageOnDownloadDirNotADirectory) + val sslOptions = securityManager.getSSLOptions("kubernetes.resourceStagingServer") + val service = retrofitClientFactory.createRetrofitClient( + resourceStagingServerUri, classOf[ResourceStagingServiceRetrofit], sslOptions) + val resourceSecret = Files.toString(resourceSecretLocation, Charsets.UTF_8) + val downloadResourceCallback = new DownloadTarGzCallback(resourceDownloadDir) + logInfo(downloadStartMessage) + service.downloadResources(resourceId, resourceSecret) + .enqueue(downloadResourceCallback) + downloadResourceCallback.waitForCompletion(downloadTimeoutMinutes, TimeUnit.MINUTES) + logInfo(downloadFinishedMessage) + } + } + } + + private def downloadFiles( + filesCommaSeparated: Option[String], + downloadDir: File, + errMessageOnDestinationNotADirectory: String): Unit = { + if (filesCommaSeparated.isDefined) { + require(downloadDir.isDirectory, errMessageOnDestinationNotADirectory) + } + filesCommaSeparated.map(_.split(",")).toSeq.flatten.foreach { file => + fileFetcher.fetchFile(file, downloadDir) + } + } + + private def waitForFutures(futures: Future[_]*) { + futures.foreach { + ThreadUtils.awaitResult(_, Duration.create(downloadTimeoutMinutes, TimeUnit.MINUTES)) + } } } @@ -121,7 +222,13 @@ object KubernetesSparkDependencyDownloadInitContainer extends Logging { } else { new SparkConf(true) } - new KubernetesSparkDependencyDownloadInitContainer(sparkConf, RetrofitClientFactoryImpl).run() + val securityManager = new SparkSecurityManager(sparkConf) + val fileFetcher = new FileFetcherImpl(sparkConf, securityManager) + new KubernetesSparkDependencyDownloadInitContainer( + sparkConf, + RetrofitClientFactoryImpl, + fileFetcher, + securityManager).run() logInfo("Finished downloading application dependencies.") } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/ClientV2Suite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/ClientV2Suite.scala index 9e2ab26460412..73d073a60148c 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/ClientV2Suite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/ClientV2Suite.scala @@ -22,7 +22,8 @@ import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapBuilder, Container, import io.fabric8.kubernetes.client.KubernetesClient import io.fabric8.kubernetes.client.dsl.{MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, PodResource} import org.hamcrest.{BaseMatcher, Description} -import org.mockito.Matchers.{any, anyVararg, argThat, startsWith, eq => mockitoEq} +import org.mockito.{AdditionalAnswers, Mockito} +import org.mockito.Matchers.{any, anyVararg, argThat, eq => mockitoEq, startsWith} import org.mockito.Mockito.when import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer @@ -63,6 +64,11 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { "filesId", "filesSecret") private val MOUNTED_FILES_ANNOTATION_KEY = "mountedFiles" + private val downloadRemoteDependenciesConfigMap = new ConfigMapBuilder() + .withNewMetadata().withName("init-container").endMetadata() + .addToData("key", "value") + .build() + private var sparkConf: SparkConf = _ private var submissionKubernetesClientProvider: SubmissionKubernetesClientProvider = _ private var submissionKubernetesClient: KubernetesClient = _ @@ -71,12 +77,18 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { HasMetadata, Boolean] private var podOperations: PODS = _ private var resourceListOperations: RESOURCES = _ - private var mountedDependencyManagerProvider: MountedDependencyManagerProvider = _ - private var mountedDependencyManager: MountedDependencyManager = _ + private var submittedDependencyManagerProvider: SubmittedDependencyManagerProvider = _ + private var remoteDependencyManagerProvider: DownloadRemoteDependencyManagerProvider = _ + private var remoteDependencyManager: DownloadRemoteDependencyManager = _ + private var mountedDependencyManager: SubmittedDependencyManager = _ private var captureCreatedPodAnswer: SelfArgumentCapturingAnswer[Pod] = _ private var captureCreatedResourcesAnswer: AllArgumentsCapturingAnswer[HasMetadata, RESOURCES] = _ + private var capturedJars: Option[Seq[String]] = None + private var capturedFiles: Option[Seq[String]] = None before { + capturedJars = None + capturedFiles = None sparkConf = new SparkConf(true) .set("spark.app.name", APP_NAME) .set("spark.master", "k8s://https://localhost:443") @@ -88,8 +100,30 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { submissionKubernetesClient = mock[KubernetesClient] podOperations = mock[PODS] resourceListOperations = mock[RESOURCES] - mountedDependencyManagerProvider = mock[MountedDependencyManagerProvider] - mountedDependencyManager = mock[MountedDependencyManager] + submittedDependencyManagerProvider = mock[SubmittedDependencyManagerProvider] + mountedDependencyManager = mock[SubmittedDependencyManager] + remoteDependencyManagerProvider = mock[DownloadRemoteDependencyManagerProvider] + remoteDependencyManager = mock[DownloadRemoteDependencyManager] + when(remoteDependencyManagerProvider.getDownloadRemoteDependencyManager(any(), any(), any())) + .thenAnswer(new Answer[DownloadRemoteDependencyManager] { + override def answer(invocationOnMock: InvocationOnMock): DownloadRemoteDependencyManager = { + capturedJars = Some(invocationOnMock.getArgumentAt(1, classOf[Seq[String]])) + capturedFiles = Some(invocationOnMock.getArgumentAt(2, classOf[Seq[String]])) + remoteDependencyManager + } + }) + when(remoteDependencyManager.buildInitContainerConfigMap()) + .thenReturn(downloadRemoteDependenciesConfigMap) + when(remoteDependencyManager.resolveLocalClasspath()).thenAnswer(new Answer[Seq[String]] { + override def answer(invocationOnMock: InvocationOnMock): Seq[String] = { + assert(capturedJars.isDefined) + capturedJars.toSeq.flatten.map(Utils.resolveURI(_).getPath) + } + }) + when(remoteDependencyManager.configurePodToDownloadRemoteDependencies( + mockitoEq(downloadRemoteDependenciesConfigMap), + any(), + any())).thenAnswer(AdditionalAnswers.returnsArgAt(2)) when(submissionKubernetesClientProvider.get).thenReturn(submissionKubernetesClient) when(submissionKubernetesClient.pods()).thenReturn(podOperations) captureCreatedPodAnswer = new SelfArgumentCapturingAnswer[Pod] @@ -176,9 +210,9 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { runWithMountedDependencies(initContainerConfigMap, initContainerSecret) val driverPod = captureCreatedPodAnswer.capturedArgument assert(captureCreatedResourcesAnswer.capturedArguments != null) - assert(captureCreatedResourcesAnswer.capturedArguments.size === 2) + assert(captureCreatedResourcesAnswer.capturedArguments.size === 3) assert(captureCreatedResourcesAnswer.capturedArguments.toSet === - Set(initContainerSecret, initContainerConfigMap)) + Set(initContainerSecret, initContainerConfigMap, downloadRemoteDependenciesConfigMap)) captureCreatedResourcesAnswer.capturedArguments.foreach { resource => val driverPodOwnerReferences = resource.getMetadata.getOwnerReferences assert(driverPodOwnerReferences.size === 1) @@ -211,12 +245,51 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { } } + test("Remote dependency manager should configure the driver pod and the local classpath") { + Mockito.reset(remoteDependencyManager) + when(remoteDependencyManager + .configurePodToDownloadRemoteDependencies( + mockitoEq(downloadRemoteDependenciesConfigMap), any(), any())) + .thenAnswer(new Answer[PodBuilder]() { + override def answer(invocationOnMock: InvocationOnMock): PodBuilder = { + val originalPod = invocationOnMock.getArgumentAt(2, classOf[PodBuilder]) + originalPod.editMetadata().addToLabels("added-remote-dependency", "true").endMetadata() + } + }) + when(remoteDependencyManager.resolveLocalClasspath()) + .thenReturn(Seq("/app/jars/resolved-jar-1.jar", "/app/jars/resolved-jar-2.jar")) + when(remoteDependencyManager.buildInitContainerConfigMap()) + .thenReturn(downloadRemoteDependenciesConfigMap) + val createdDriverPod = createAndGetDriverPod() + Mockito.verify(remoteDependencyManager).configurePodToDownloadRemoteDependencies( + mockitoEq(downloadRemoteDependenciesConfigMap), + any(), + any()) + assert(createdDriverPod.getMetadata.getLabels.get("added-remote-dependency") === "true") + val driverContainer = createdDriverPod + .getSpec + .getContainers + .asScala + .find(_.getName == DRIVER_CONTAINER_NAME) + assert(driverContainer.isDefined) + driverContainer.foreach { container => + val env = container.getEnv.asScala + val mountedClasspathEnv = env.find(_.getName == ENV_MOUNTED_CLASSPATH) + assert(mountedClasspathEnv.isDefined) + mountedClasspathEnv.foreach { classpathEnv => + assert(classpathEnv.getValue === + "/app/jars/resolved-jar-1.jar:/app/jars/resolved-jar-2.jar") + } + } + } + private def getInitContainerSecret(): Secret = { new SecretBuilder() .withNewMetadata().withName(s"$APP_NAME-init-container-secret").endMetadata() .addToData( - INIT_CONTAINER_DOWNLOAD_JARS_SECRET_KEY, DOWNLOAD_JARS_RESOURCE_IDENTIFIER.resourceSecret) - .addToData(INIT_CONTAINER_DOWNLOAD_FILES_SECRET_KEY, + INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_JARS_SECRET_KEY, + DOWNLOAD_JARS_RESOURCE_IDENTIFIER.resourceSecret) + .addToData(INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_FILES_SECRET_KEY, DOWNLOAD_FILES_RESOURCE_IDENTIFIER.resourceSecret) .build() } @@ -247,7 +320,7 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { description.appendText("Checks if the labels contain the app ID and app name.") } } - when(mountedDependencyManagerProvider.getMountedDependencyManager( + when(submittedDependencyManagerProvider.getSubmittedDependencyManager( startsWith(APP_NAME), mockitoEq(STAGING_SERVER_URI), argThat(labelsMatcher), @@ -303,7 +376,8 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { APP_ARGS, MAIN_APP_RESOURCE, submissionKubernetesClientProvider, - mountedDependencyManagerProvider) + submittedDependencyManagerProvider, + remoteDependencyManagerProvider) } private class SelfArgumentCapturingAnswer[T: ClassTag] extends Answer[T] { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/DownloadRemoteDependencyManagerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/DownloadRemoteDependencyManagerSuite.scala new file mode 100644 index 0000000000000..df41516d10f91 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/DownloadRemoteDependencyManagerSuite.scala @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.submit.v2 + +import java.io.StringReader +import java.util.Properties + +import com.fasterxml.jackson.databind.ObjectMapper +import io.fabric8.kubernetes.api.model.{ConfigMapBuilder, Container, PodBuilder} +import scala.collection.JavaConverters._ + +import org.apache.spark.SparkFunSuite +import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.deploy.kubernetes.constants._ + +class DownloadRemoteDependencyManagerSuite extends SparkFunSuite { + + private val OBJECT_MAPPER = new ObjectMapper() + private val APP_ID = "app-id" + private val SPARK_JARS = Seq( + "hdfs://localhost:9000/jar1.jar", + "local:///app/jars/jar2.jar", + "http://localhost:8080/jar2.jar") + private val SPARK_FILES = Seq( + "hdfs://localhost:9000/file.txt", + "file:///app/files/file.txt") + private val JARS_DOWNLOAD_PATH = "/var/data/spark-data/spark-jars" + private val FILES_DOWNLOAD_PATH = "/var/data/spark-files/spark-files" + private val INIT_CONTAINER_IMAGE = "spark-driver-init:latest" + private val dependencyManagerUnderTest = new DownloadRemoteDependencyManagerImpl( + APP_ID, + SPARK_JARS, + SPARK_FILES, + JARS_DOWNLOAD_PATH, + FILES_DOWNLOAD_PATH, + INIT_CONTAINER_IMAGE) + + test("Config map should set the files to download") { + val configMap = dependencyManagerUnderTest.buildInitContainerConfigMap() + assert(configMap.getMetadata.getName === s"$APP_ID-remote-files-download-init") + val configProperties = configMap.getData.asScala + assert(configProperties.size === 1) + val propertiesString = configProperties(INIT_CONTAINER_REMOTE_FILES_CONFIG_MAP_KEY) + assert(propertiesString != null) + val propertiesReader = new StringReader(propertiesString) + val initContainerProperties = new Properties() + initContainerProperties.load(propertiesReader) + assert(initContainerProperties.size() === 4) + val downloadRemoteJars = initContainerProperties.getProperty(INIT_CONTAINER_REMOTE_JARS.key) + assert(downloadRemoteJars != null) + val downloadRemoteJarsSplit = downloadRemoteJars.split(",").toSet + val expectedRemoteJars = Set( + "hdfs://localhost:9000/jar1.jar", "http://localhost:8080/jar2.jar") + assert(expectedRemoteJars === downloadRemoteJarsSplit) + val downloadRemoteFiles = initContainerProperties.getProperty(INIT_CONTAINER_REMOTE_FILES.key) + assert(downloadRemoteFiles != null) + val downloadRemoteFilesSplit = downloadRemoteFiles.split(",").toSet + val expectedRemoteFiles = Set("hdfs://localhost:9000/file.txt") + assert(downloadRemoteFilesSplit === expectedRemoteFiles) + assert(initContainerProperties.getProperty(DRIVER_REMOTE_JARS_DOWNLOAD_LOCATION.key) === + JARS_DOWNLOAD_PATH) + assert(initContainerProperties.getProperty(DRIVER_REMOTE_FILES_DOWNLOAD_LOCATION.key) === + FILES_DOWNLOAD_PATH) + } + + test("Pod should have an appropriate init-container attached") { + val originalPodSpec = new PodBuilder() + .withNewMetadata() + .withName("driver") + .endMetadata() + .withNewSpec() + .addNewContainer() + .withName("driver") + .endContainer() + .endSpec() + val configMap = new ConfigMapBuilder() + .withNewMetadata() + .withName("config-map") + .endMetadata() + .build() + val adjustedPod = dependencyManagerUnderTest.configurePodToDownloadRemoteDependencies( + configMap, "driver", originalPodSpec).build() + val annotations = adjustedPod.getMetadata.getAnnotations + assert(annotations.size === 1) + val initContainerAnnotation = annotations.get(INIT_CONTAINER_ANNOTATION) + assert(annotations != null) + val initContainers = OBJECT_MAPPER.readValue(initContainerAnnotation, classOf[Array[Container]]) + assert(initContainers.length === 1) + val initContainer = initContainers(0) + assert(initContainer.getName === INIT_CONTAINER_REMOTE_FILES_CONTAINER_NAME) + assert(initContainer.getArgs.size() === 1) + assert(initContainer.getArgs.get(0) === INIT_CONTAINER_REMOTE_FILES_PROPERTIES_FILE_PATH) + assert(initContainer.getImage === INIT_CONTAINER_IMAGE) + assert(initContainer.getImagePullPolicy === "IfNotPresent") + val initContainerVolumeMounts = initContainer + .getVolumeMounts + .asScala + .map { mount => + (mount.getName, mount.getMountPath) + }.toSet + val expectedVolumeMounts = Set( + (INIT_CONTAINER_REMOTE_FILES_DOWNLOAD_JARS_VOLUME_NAME, JARS_DOWNLOAD_PATH), + (INIT_CONTAINER_REMOTE_FILES_DOWNLOAD_FILES_VOLUME_NAME, FILES_DOWNLOAD_PATH), + (INIT_CONTAINER_REMOTE_FILES_PROPERTIES_FILE_VOLUME, + INIT_CONTAINER_REMOTE_FILES_PROPERTIES_FILE_MOUNT_PATH)) + assert(initContainerVolumeMounts === expectedVolumeMounts) + val podVolumes = adjustedPod.getSpec.getVolumes.asScala.map { volume => + (volume.getName, volume) + }.toMap + assert(podVolumes.size === 3) + assert(podVolumes.get(INIT_CONTAINER_REMOTE_FILES_PROPERTIES_FILE_VOLUME).isDefined) + val propertiesConfigMapVolume = podVolumes(INIT_CONTAINER_REMOTE_FILES_PROPERTIES_FILE_VOLUME) + assert(propertiesConfigMapVolume.getConfigMap != null) + val configMapItems = propertiesConfigMapVolume.getConfigMap.getItems.asScala + assert(configMapItems.size === 1) + assert(configMapItems(0).getKey === INIT_CONTAINER_REMOTE_FILES_CONFIG_MAP_KEY) + assert(configMapItems(0).getPath === INIT_CONTAINER_REMOTE_FILES_PROPERTIES_FILE_NAME) + assert(podVolumes.get(INIT_CONTAINER_REMOTE_FILES_DOWNLOAD_JARS_VOLUME_NAME).isDefined) + assert(podVolumes(INIT_CONTAINER_REMOTE_FILES_DOWNLOAD_JARS_VOLUME_NAME).getEmptyDir != null) + assert(podVolumes.get(INIT_CONTAINER_REMOTE_FILES_DOWNLOAD_FILES_VOLUME_NAME).isDefined) + assert(podVolumes(INIT_CONTAINER_REMOTE_FILES_DOWNLOAD_FILES_VOLUME_NAME).getEmptyDir != null) + val addedVolumeMounts = adjustedPod + .getSpec + .getContainers + .get(0) + .getVolumeMounts + .asScala + .map { mount => (mount.getName, mount.getMountPath) } + .toSet + val expectedAddedVolumeMounts = Set( + (INIT_CONTAINER_REMOTE_FILES_DOWNLOAD_JARS_VOLUME_NAME, JARS_DOWNLOAD_PATH), + (INIT_CONTAINER_REMOTE_FILES_DOWNLOAD_FILES_VOLUME_NAME, FILES_DOWNLOAD_PATH)) + assert(addedVolumeMounts === expectedAddedVolumeMounts) + } + + test("Resolving the local classpath should map remote jars to their downloaded locations") { + val resolvedLocalClasspath = dependencyManagerUnderTest.resolveLocalClasspath() + val expectedLocalClasspath = Set( + s"$JARS_DOWNLOAD_PATH/jar1.jar", + s"$JARS_DOWNLOAD_PATH/jar2.jar", + "/app/jars/jar2.jar") + assert(resolvedLocalClasspath.toSet === expectedLocalClasspath) + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/MountedDependencyManagerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencyManagerSuite.scala similarity index 84% rename from resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/MountedDependencyManagerSuite.scala rename to resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencyManagerSuite.scala index 321fe1b3fd889..4b04681868d38 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/MountedDependencyManagerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencyManagerSuite.scala @@ -43,8 +43,8 @@ import org.apache.spark.deploy.kubernetes.constants._ import org.apache.spark.deploy.rest.kubernetes.v2.{ResourceStagingServiceRetrofit, RetrofitClientFactory, StagedResourceIdentifier} import org.apache.spark.util.Utils -private[spark] class MountedDependencyManagerSuite extends SparkFunSuite with BeforeAndAfter { - import MountedDependencyManagerSuite.createTempFile +private[spark] class SubmittedDependencyManagerSuite extends SparkFunSuite with BeforeAndAfter { + import SubmittedDependencyManagerSuite.createTempFile private val OBJECT_MAPPER = new ObjectMapper().registerModule(new DefaultScalaModule) private val APP_ID = "app-id" @@ -52,8 +52,8 @@ private[spark] class MountedDependencyManagerSuite extends SparkFunSuite with Be private val NAMESPACE = "namespace" private val STAGING_SERVER_URI = "http://localhost:8000" private val INIT_CONTAINER_IMAGE = "spark-driver-init:latest" - private val JARS_DOWNLOAD_PATH = DRIVER_LOCAL_JARS_DOWNLOAD_LOCATION.defaultValue.get - private val FILES_DOWNLOAD_PATH = DRIVER_LOCAL_FILES_DOWNLOAD_LOCATION.defaultValue.get + private val JARS_DOWNLOAD_PATH = DRIVER_SUBMITTED_JARS_DOWNLOAD_LOCATION.defaultValue.get + private val FILES_DOWNLOAD_PATH = DRIVER_SUBMITTED_FILES_DOWNLOAD_LOCATION.defaultValue.get private val DOWNLOAD_TIMEOUT_MINUTES = 5 private val LOCAL_JARS = Seq(createTempFile("jar"), createTempFile("jar")) private val JARS = Seq("hdfs://localhost:9000/jars/jar1.jar", @@ -77,7 +77,7 @@ private[spark] class MountedDependencyManagerSuite extends SparkFunSuite with Be private var retrofitClientFactory: RetrofitClientFactory = _ private var retrofitClient: ResourceStagingServiceRetrofit = _ - private var dependencyManagerUnderTest: MountedDependencyManager = _ + private var dependencyManagerUnderTest: SubmittedDependencyManager = _ before { retrofitClientFactory = mock[RetrofitClientFactory] @@ -86,7 +86,7 @@ private[spark] class MountedDependencyManagerSuite extends SparkFunSuite with Be retrofitClientFactory.createRetrofitClient( STAGING_SERVER_URI, classOf[ResourceStagingServiceRetrofit], STAGING_SERVER_SSL_OPTIONS)) .thenReturn(retrofitClient) - dependencyManagerUnderTest = new MountedDependencyManagerImpl( + dependencyManagerUnderTest = new SubmittedDependencyManagerImpl( APP_ID, LABELS, NAMESPACE, @@ -126,17 +126,17 @@ private[spark] class MountedDependencyManagerSuite extends SparkFunSuite with Be val secret = dependencyManagerUnderTest.buildInitContainerSecret("jarsSecret", "filesSecret") assert(secret.getMetadata.getName === s"$APP_ID-spark-init") val expectedSecrets = Map( - INIT_CONTAINER_DOWNLOAD_JARS_SECRET_KEY -> jarsSecretBase64, - INIT_CONTAINER_DOWNLOAD_FILES_SECRET_KEY -> filesSecretBase64, - INIT_CONTAINER_TRUSTSTORE_SECRET_KEY -> trustStoreBase64) + INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_JARS_SECRET_KEY -> jarsSecretBase64, + INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_FILES_SECRET_KEY -> filesSecretBase64, + INIT_CONTAINER_SUBMITTED_FILES_TRUSTSTORE_SECRET_KEY -> trustStoreBase64) assert(secret.getData.asScala === expectedSecrets) } test("Init container config map should contain parameters for downloading from staging server") { val configMap = dependencyManagerUnderTest.buildInitContainerConfigMap( JARS_RESOURCE_ID, FILES_RESOURCE_ID) - assert(configMap.getMetadata.getName === s"$APP_ID-init-properties") - val propertiesRawString = configMap.getData.get(INIT_CONTAINER_CONFIG_MAP_KEY) + assert(configMap.getMetadata.getName === s"$APP_ID-staging-server-download-init") + val propertiesRawString = configMap.getData.get(INIT_CONTAINER_SUBMITTED_FILES_CONFIG_MAP_KEY) assert(propertiesRawString != null) val propertiesReader = new StringReader(propertiesRawString) val properties = new Properties() @@ -146,16 +146,16 @@ private[spark] class MountedDependencyManagerSuite extends SparkFunSuite with Be }.toMap val expectedProperties = Map[String, String]( RESOURCE_STAGING_SERVER_URI.key -> STAGING_SERVER_URI, - DRIVER_LOCAL_JARS_DOWNLOAD_LOCATION.key -> JARS_DOWNLOAD_PATH, - DRIVER_LOCAL_FILES_DOWNLOAD_LOCATION.key -> FILES_DOWNLOAD_PATH, + DRIVER_SUBMITTED_JARS_DOWNLOAD_LOCATION.key -> JARS_DOWNLOAD_PATH, + DRIVER_SUBMITTED_FILES_DOWNLOAD_LOCATION.key -> FILES_DOWNLOAD_PATH, INIT_CONTAINER_DOWNLOAD_JARS_RESOURCE_IDENTIFIER.key -> JARS_RESOURCE_ID, INIT_CONTAINER_DOWNLOAD_JARS_SECRET_LOCATION.key -> - INIT_CONTAINER_DOWNLOAD_JARS_SECRET_PATH, + INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_JARS_SECRET_PATH, INIT_CONTAINER_DOWNLOAD_FILES_RESOURCE_IDENTIFIER.key -> FILES_RESOURCE_ID, INIT_CONTAINER_DOWNLOAD_FILES_SECRET_LOCATION.key -> - INIT_CONTAINER_DOWNLOAD_FILES_SECRET_PATH, + INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_FILES_SECRET_PATH, DRIVER_MOUNT_DEPENDENCIES_INIT_TIMEOUT.key -> s"${DOWNLOAD_TIMEOUT_MINUTES}m", - RESOURCE_STAGING_SERVER_TRUSTSTORE_FILE.key -> INIT_CONTAINER_TRUSTSTORE_PATH, + RESOURCE_STAGING_SERVER_TRUSTSTORE_FILE.key -> INIT_CONTAINER_SUBMITTED_FILES_TRUSTSTORE_PATH, RESOURCE_STAGING_SERVER_SSL_ENABLED.key -> "true", RESOURCE_STAGING_SERVER_TRUSTSTORE_PASSWORD.key -> TRUSTSTORE_PASSWORD, RESOURCE_STAGING_SERVER_TRUSTSTORE_TYPE.key -> TRUSTSTORE_TYPE) @@ -188,7 +188,7 @@ private[spark] class MountedDependencyManagerSuite extends SparkFunSuite with Be initContainerRawAnnotation, classOf[Array[Container]]) assert(initContainers.size === 1) val initContainer = initContainers.head - assert(initContainer.getName === "spark-driver-init") + assert(initContainer.getName === INIT_CONTAINER_SUBMITTED_FILES_CONTAINER_NAME) assert(initContainer.getImage === INIT_CONTAINER_IMAGE) assert(initContainer.getImagePullPolicy === "IfNotPresent") val volumeMounts = initContainer.getVolumeMounts @@ -196,28 +196,31 @@ private[spark] class MountedDependencyManagerSuite extends SparkFunSuite with Be .map(mount => (mount.getName, mount.getMountPath)) .toMap val expectedVolumeMounts = Map[String, String]( - DOWNLOAD_JARS_VOLUME_NAME -> JARS_DOWNLOAD_PATH, - DOWNLOAD_FILES_VOLUME_NAME -> FILES_DOWNLOAD_PATH, - INIT_CONTAINER_PROPERTIES_FILE_VOLUME -> INIT_CONTAINER_PROPERTIES_FILE_MOUNT_PATH, - INIT_CONTAINER_SECRETS_VOLUME_NAME -> INIT_CONTAINER_SECRETS_VOLUME_MOUNT_PATH) + INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_JARS_VOLUME_NAME -> JARS_DOWNLOAD_PATH, + INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_FILES_VOLUME_NAME -> FILES_DOWNLOAD_PATH, + INIT_CONTAINER_SUBMITTED_FILES_PROPERTIES_FILE_VOLUME -> + INIT_CONTAINER_SUBMITTED_FILES_PROPERTIES_FILE_MOUNT_PATH, + INIT_CONTAINER_SUBMITTED_FILES_SECRETS_VOLUME_NAME -> + INIT_CONTAINER_SUBMITTED_FILES_SECRETS_VOLUME_MOUNT_PATH) assert(volumeMounts === expectedVolumeMounts) } test("Driver pod should have added volumes and volume mounts for file downloads") { val driverPod = configureDriverPod() val volumes = driverPod.getSpec.getVolumes.asScala.map(volume => (volume.getName, volume)).toMap - val initContainerPropertiesVolume = volumes(INIT_CONTAINER_PROPERTIES_FILE_VOLUME).getConfigMap + val initContainerPropertiesVolume = volumes( + INIT_CONTAINER_SUBMITTED_FILES_PROPERTIES_FILE_VOLUME).getConfigMap assert(initContainerPropertiesVolume != null) assert(initContainerPropertiesVolume.getName === "config") assert(initContainerPropertiesVolume.getItems.asScala.exists { keyToPath => - keyToPath.getKey == INIT_CONTAINER_CONFIG_MAP_KEY && - keyToPath.getPath == INIT_CONTAINER_PROPERTIES_FILE_NAME + keyToPath.getKey == INIT_CONTAINER_SUBMITTED_FILES_CONFIG_MAP_KEY && + keyToPath.getPath == INIT_CONTAINER_SUBMITTED_FILES_PROPERTIES_FILE_NAME }) - val jarsVolume = volumes(DOWNLOAD_JARS_VOLUME_NAME) + val jarsVolume = volumes(INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_JARS_VOLUME_NAME) assert(jarsVolume.getEmptyDir != null) - val filesVolume = volumes(DOWNLOAD_FILES_VOLUME_NAME) + val filesVolume = volumes(INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_FILES_VOLUME_NAME) assert(filesVolume.getEmptyDir != null) - val initContainerSecretVolume = volumes(INIT_CONTAINER_SECRETS_VOLUME_NAME) + val initContainerSecretVolume = volumes(INIT_CONTAINER_SUBMITTED_FILES_SECRETS_VOLUME_NAME) assert(initContainerSecretVolume.getSecret != null) assert(initContainerSecretVolume.getSecret.getSecretName === "secret") val driverContainer = driverPod.getSpec @@ -229,13 +232,9 @@ private[spark] class MountedDependencyManagerSuite extends SparkFunSuite with Be .map(mount => (mount.getName, mount.getMountPath)) .toMap val expectedVolumeMountNamesAndPaths = Map[String, String]( - DOWNLOAD_JARS_VOLUME_NAME -> JARS_DOWNLOAD_PATH, - DOWNLOAD_FILES_VOLUME_NAME -> FILES_DOWNLOAD_PATH) + INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_JARS_VOLUME_NAME -> JARS_DOWNLOAD_PATH, + INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_FILES_VOLUME_NAME -> FILES_DOWNLOAD_PATH) assert(driverContainerVolumeMounts === expectedVolumeMountNamesAndPaths) - val envs = driverContainer.getEnv - assert(envs.size() === 1) - assert(envs.asScala.head.getName === ENV_UPLOADED_JARS_DIR) - assert(envs.asScala.head.getValue === JARS_DOWNLOAD_PATH) } private def configureDriverPod(): Pod = { @@ -313,7 +312,7 @@ private class UploadDependenciesArgumentsCapturingAnswer(returnValue: StagedReso } } -private object MountedDependencyManagerSuite { +private object SubmittedDependencyManagerSuite { def createTempFile(extension: String): String = { val dir = Utils.createTempDir() val file = new File(dir, s"${UUID.randomUUID().toString}.$extension") diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyDownloadInitContainerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyDownloadInitContainerSuite.scala index 77eb7f2b9f49c..a572981519857 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyDownloadInitContainerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyDownloadInitContainerSuite.scala @@ -24,6 +24,7 @@ import com.google.common.base.Charsets import com.google.common.io.Files import okhttp3.{MediaType, ResponseBody} import org.mockito.Matchers.any +import org.mockito.Mockito import org.mockito.Mockito.{doAnswer, when} import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer @@ -31,7 +32,7 @@ import org.scalatest.BeforeAndAfter import org.scalatest.mock.MockitoSugar._ import retrofit2.{Call, Callback, Response} -import org.apache.spark.{SparkConf, SparkFunSuite, SSLOptions} +import org.apache.spark.{SecurityManager => SparkSecurityManager, SparkConf, SparkFunSuite, SSLOptions} import org.apache.spark.deploy.kubernetes.CompressionUtils import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.util.Utils @@ -55,7 +56,6 @@ class KubernetesSparkDependencyDownloadInitContainerSuite private val JARS_RESOURCE_ID = "jarsId" private val FILES_RESOURCE_ID = "filesId" - private var sparkConf: SparkConf = _ private var downloadJarsDir: File = _ private var downloadFilesDir: File = _ private var downloadJarsSecretValue: String = _ @@ -64,7 +64,7 @@ class KubernetesSparkDependencyDownloadInitContainerSuite private var filesCompressedBytes: Array[Byte] = _ private var retrofitClientFactory: RetrofitClientFactory = _ private var retrofitClient: ResourceStagingServiceRetrofit = _ - private var initContainerUnderTest: KubernetesSparkDependencyDownloadInitContainer = _ + private var fileFetcher: FileFetcher = _ override def beforeAll(): Unit = { jarsCompressedBytes = compressPathsToBytes(JARS) @@ -80,24 +80,10 @@ class KubernetesSparkDependencyDownloadInitContainerSuite downloadFilesDir = Utils.createTempDir() retrofitClientFactory = mock[RetrofitClientFactory] retrofitClient = mock[ResourceStagingServiceRetrofit] - sparkConf = new SparkConf(true) - .set(RESOURCE_STAGING_SERVER_URI, STAGING_SERVER_URI) - .set(INIT_CONTAINER_DOWNLOAD_JARS_RESOURCE_IDENTIFIER, JARS_RESOURCE_ID) - .set(INIT_CONTAINER_DOWNLOAD_JARS_SECRET_LOCATION, DOWNLOAD_JARS_SECRET_LOCATION) - .set(INIT_CONTAINER_DOWNLOAD_FILES_RESOURCE_IDENTIFIER, FILES_RESOURCE_ID) - .set(INIT_CONTAINER_DOWNLOAD_FILES_SECRET_LOCATION, DOWNLOAD_FILES_SECRET_LOCATION) - .set(DRIVER_LOCAL_JARS_DOWNLOAD_LOCATION, downloadJarsDir.getAbsolutePath) - .set(DRIVER_LOCAL_FILES_DOWNLOAD_LOCATION, downloadFilesDir.getAbsolutePath) - .set(RESOURCE_STAGING_SERVER_SSL_ENABLED, true) - .set(RESOURCE_STAGING_SERVER_TRUSTSTORE_FILE, TRUSTSTORE_FILE.getAbsolutePath) - .set(RESOURCE_STAGING_SERVER_TRUSTSTORE_PASSWORD, TRUSTSTORE_PASSWORD) - .set(RESOURCE_STAGING_SERVER_TRUSTSTORE_TYPE, TRUSTSTORE_TYPE) - + fileFetcher = mock[FileFetcher] when(retrofitClientFactory.createRetrofitClient( STAGING_SERVER_URI, classOf[ResourceStagingServiceRetrofit], STAGING_SERVER_SSL_OPTIONS)) .thenReturn(retrofitClient) - initContainerUnderTest = new KubernetesSparkDependencyDownloadInitContainer( - sparkConf, retrofitClientFactory) } after { @@ -105,9 +91,15 @@ class KubernetesSparkDependencyDownloadInitContainerSuite downloadFilesDir.delete() } - test("Downloads should unpack response body streams to directories") { + test("Downloads from resource staging server should unpack response body to directories") { val downloadJarsCall = mock[Call[ResponseBody]] val downloadFilesCall = mock[Call[ResponseBody]] + val sparkConf = getSparkConfForResourceStagingServerDownloads + val initContainerUnderTest = new KubernetesSparkDependencyDownloadInitContainer( + sparkConf, + retrofitClientFactory, + fileFetcher, + securityManager = new SparkSecurityManager(sparkConf)) when(retrofitClient.downloadResources(JARS_RESOURCE_ID, downloadJarsSecretValue)) .thenReturn(downloadJarsCall) when(retrofitClient.downloadResources(FILES_RESOURCE_ID, downloadFilesSecretValue)) @@ -125,6 +117,46 @@ class KubernetesSparkDependencyDownloadInitContainerSuite initContainerUnderTest.run() checkWrittenFilesAreTheSameAsOriginal(JARS, downloadJarsDir) checkWrittenFilesAreTheSameAsOriginal(FILES, downloadFilesDir) + Mockito.verifyZeroInteractions(fileFetcher) + } + + test("Downloads from remote server should invoke the file fetcher") { + val sparkConf = getSparkConfForRemoteFileDownloads + val initContainerUnderTest = new KubernetesSparkDependencyDownloadInitContainer( + sparkConf, + retrofitClientFactory, + fileFetcher, + securityManager = new SparkSecurityManager(sparkConf)) + initContainerUnderTest.run() + Mockito.verify(fileFetcher).fetchFile("http://localhost:9000/jar1.jar", downloadJarsDir) + Mockito.verify(fileFetcher).fetchFile("hdfs://localhost:9000/jar2.jar", downloadJarsDir) + Mockito.verify(fileFetcher).fetchFile("http://localhost:9000/file.txt", downloadFilesDir) + + } + + private def getSparkConfForResourceStagingServerDownloads: SparkConf = { + new SparkConf(true) + .set(RESOURCE_STAGING_SERVER_URI, STAGING_SERVER_URI) + .set(INIT_CONTAINER_DOWNLOAD_JARS_RESOURCE_IDENTIFIER, JARS_RESOURCE_ID) + .set(INIT_CONTAINER_DOWNLOAD_JARS_SECRET_LOCATION, DOWNLOAD_JARS_SECRET_LOCATION) + .set(INIT_CONTAINER_DOWNLOAD_FILES_RESOURCE_IDENTIFIER, FILES_RESOURCE_ID) + .set(INIT_CONTAINER_DOWNLOAD_FILES_SECRET_LOCATION, DOWNLOAD_FILES_SECRET_LOCATION) + .set(DRIVER_SUBMITTED_JARS_DOWNLOAD_LOCATION, downloadJarsDir.getAbsolutePath) + .set(DRIVER_SUBMITTED_FILES_DOWNLOAD_LOCATION, downloadFilesDir.getAbsolutePath) + .set(RESOURCE_STAGING_SERVER_SSL_ENABLED, true) + .set(RESOURCE_STAGING_SERVER_TRUSTSTORE_FILE, TRUSTSTORE_FILE.getAbsolutePath) + .set(RESOURCE_STAGING_SERVER_TRUSTSTORE_PASSWORD, TRUSTSTORE_PASSWORD) + .set(RESOURCE_STAGING_SERVER_TRUSTSTORE_TYPE, TRUSTSTORE_TYPE) + } + + private def getSparkConfForRemoteFileDownloads: SparkConf = { + new SparkConf(true) + .set(INIT_CONTAINER_REMOTE_JARS, + "http://localhost:9000/jar1.jar,hdfs://localhost:9000/jar2.jar") + .set(INIT_CONTAINER_REMOTE_FILES, + "http://localhost:9000/file.txt") + .set(DRIVER_REMOTE_JARS_DOWNLOAD_LOCATION, downloadJarsDir.getAbsolutePath) + .set(DRIVER_REMOTE_FILES_DOWNLOAD_LOCATION, downloadFilesDir.getAbsolutePath) } private def checkWrittenFilesAreTheSameAsOriginal( diff --git a/resource-managers/kubernetes/integration-tests/pom.xml b/resource-managers/kubernetes/integration-tests/pom.xml index ac7a549c9b483..c2573f4ca5e57 100644 --- a/resource-managers/kubernetes/integration-tests/pom.xml +++ b/resource-managers/kubernetes/integration-tests/pom.xml @@ -197,6 +197,28 @@ + + maven-resources-plugin + 3.0.2 + + + copy-integration-test-http-server-dockerfile + pre-integration-test + + copy-resources + + + ${project.build.directory}/docker/dockerfiles + + + src/main/docker + true + + + + + + com.googlecode.maven-download-plugin download-maven-plugin diff --git a/resource-managers/kubernetes/integration-tests/src/main/docker/integration-test-asset-server/Dockerfile b/resource-managers/kubernetes/integration-tests/src/main/docker/integration-test-asset-server/Dockerfile new file mode 100644 index 0000000000000..e26d207cf4397 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/main/docker/integration-test-asset-server/Dockerfile @@ -0,0 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Simple asset server that can provide the integration test jars over HTTP. +FROM trinitronx/python-simplehttpserver:travis-12 + +ADD examples/integration-tests-jars /var/www diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index abbf7e4d5ce1b..faed39593e4bd 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -42,9 +42,9 @@ private[spark] class KubernetesSuite extends SparkFunSuite { } override def nestedSuites: scala.collection.immutable.IndexedSeq[Suite] = { - Vector( - new KubernetesV1Suite, - new KubernetesV2Suite) + Vector( + new KubernetesV1Suite, + new KubernetesV2Suite) } } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV2Suite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV2Suite.scala index 0d74067334028..479060891e4ed 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV2Suite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV2Suite.scala @@ -25,7 +25,8 @@ import org.apache.spark.{SparkConf, SparkFunSuite, SSLOptions} import org.apache.spark.deploy.kubernetes.SSLUtils import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube -import org.apache.spark.deploy.kubernetes.submit.v2.{MountedDependencyManagerProviderImpl, SubmissionKubernetesClientProviderImpl} +import org.apache.spark.deploy.kubernetes.submit.v2.{DownloadRemoteDependencyManagerProviderImpl, SubmissionKubernetesClientProviderImpl, SubmittedDependencyManagerProviderImpl} +import org.apache.spark.launcher.SparkLauncher @DoNotDiscover private[spark] class KubernetesV2Suite extends SparkFunSuite with BeforeAndAfter { @@ -34,11 +35,14 @@ private[spark] class KubernetesV2Suite extends SparkFunSuite with BeforeAndAfter private var kubernetesTestComponents: KubernetesTestComponents = _ private var sparkConf: SparkConf = _ private var resourceStagingServerLauncher: ResourceStagingServerLauncher = _ + private var staticAssetServerLauncher: StaticAssetServerLauncher = _ override def beforeAll(): Unit = { kubernetesTestComponents = new KubernetesTestComponents resourceStagingServerLauncher = new ResourceStagingServerLauncher( kubernetesTestComponents.kubernetesClient.inNamespace(kubernetesTestComponents.namespace)) + staticAssetServerLauncher = new StaticAssetServerLauncher( + kubernetesTestComponents.kubernetesClient.inNamespace(kubernetesTestComponents.namespace)) } before { @@ -87,6 +91,25 @@ private[spark] class KubernetesV2Suite extends SparkFunSuite with BeforeAndAfter runSparkAppAndVerifyCompletion(KubernetesSuite.CONTAINER_LOCAL_MAIN_APP_RESOURCE) } + test("Use remote resources without the resource staging server.") { + val assetServerUri = staticAssetServerLauncher.launchStaticAssetServer() + sparkConf.setJars(Seq( + s"$assetServerUri/${KubernetesSuite.EXAMPLES_JAR_FILE.getName}", + s"$assetServerUri/${KubernetesSuite.HELPER_JAR_FILE.getName}" + )) + runSparkAppAndVerifyCompletion(SparkLauncher.NO_RESOURCE) + } + + test("Mix remote resources with submitted ones.") { + launchStagingServer(SSLOptions()) + val assetServerUri = staticAssetServerLauncher.launchStaticAssetServer() + sparkConf.setJars(Seq( + KubernetesSuite.SUBMITTER_LOCAL_MAIN_APP_RESOURCE, + s"$assetServerUri/${KubernetesSuite.HELPER_JAR_FILE.getName}" + )) + runSparkAppAndVerifyCompletion(SparkLauncher.NO_RESOURCE) + } + private def launchStagingServer(resourceStagingServerSslOptions: SSLOptions): Unit = { val resourceStagingServerPort = resourceStagingServerLauncher.launchStagingServer( resourceStagingServerSslOptions) @@ -107,8 +130,10 @@ private[spark] class KubernetesV2Suite extends SparkFunSuite with BeforeAndAfter mainAppResource = appResource, kubernetesClientProvider = new SubmissionKubernetesClientProviderImpl(sparkConf), - mountedDependencyManagerProvider = - new MountedDependencyManagerProviderImpl(sparkConf)) + submittedDependencyManagerProvider = + new SubmittedDependencyManagerProviderImpl(sparkConf), + remoteDependencyManagerProvider = + new DownloadRemoteDependencyManagerProviderImpl(sparkConf)) client.run() val driverPod = kubernetesTestComponents.kubernetesClient .pods() diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ResourceStagingServerLauncher.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ResourceStagingServerLauncher.scala index ca549fa27d630..3a99f907d15fd 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ResourceStagingServerLauncher.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ResourceStagingServerLauncher.scala @@ -43,7 +43,6 @@ private[spark] class ResourceStagingServerLauncher(kubernetesClient: KubernetesC private val PROPERTIES_FILE_NAME = "staging-server.properties" private val PROPERTIES_DIR = "/var/data/spark-staging-server" private val PROPERTIES_FILE_PATH = s"$PROPERTIES_DIR/$PROPERTIES_FILE_NAME" - private var activeResources = Seq.empty[HasMetadata] // Returns the NodePort the staging server is listening on def launchStagingServer(sslOptions: SSLOptions): Int = { @@ -146,8 +145,8 @@ private[spark] class ResourceStagingServerLauncher(kubernetesClient: KubernetesC .endPort() .endSpec() .build() - val stagingServerPodReadyWatcher = new ReadinessWatcher[Pod] - val serviceReadyWatcher = new ReadinessWatcher[Endpoints] + val stagingServerPodReadyWatcher = new SparkReadinessWatcher[Pod] + val serviceReadyWatcher = new SparkReadinessWatcher[Endpoints] val allResources = Seq( stagingServerService, stagingServerConfigMap, @@ -159,9 +158,7 @@ private[spark] class ResourceStagingServerLauncher(kubernetesClient: KubernetesC Utils.tryWithResource(kubernetesClient.endpoints() .withName(stagingServerService.getMetadata.getName) .watch(serviceReadyWatcher)) { _ => - activeResources = kubernetesClient.resourceList(allResources: _*) - .createOrReplace() - .asScala + kubernetesClient.resourceList(allResources: _*).createOrReplace() stagingServerPodReadyWatcher.waitUntilReady() serviceReadyWatcher.waitUntilReady() } @@ -172,25 +169,4 @@ private[spark] class ResourceStagingServerLauncher(kubernetesClient: KubernetesC .get(0) .getNodePort } - - def tearDownStagingServer(): Unit = { - kubernetesClient.resourceList(activeResources: _*).delete() - activeResources = Seq.empty[HasMetadata] - } - - private class ReadinessWatcher[T <: HasMetadata] extends Watcher[T] { - - private val signal = SettableFuture.create[Boolean] - - override def eventReceived(action: Action, resource: T): Unit = { - if ((action == Action.MODIFIED || action == Action.ADDED) && - Readiness.isReady(resource)) { - signal.set(true) - } - } - - override def onClose(cause: KubernetesClientException): Unit = {} - - def waitUntilReady(): Boolean = signal.get(30, TimeUnit.SECONDS) - } } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/SparkReadinessWatcher.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/SparkReadinessWatcher.scala new file mode 100644 index 0000000000000..20517eb2fc2a6 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/SparkReadinessWatcher.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.integrationtest + +import java.util.concurrent.TimeUnit + +import com.google.common.util.concurrent.SettableFuture +import io.fabric8.kubernetes.api.model.HasMetadata +import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action +import io.fabric8.kubernetes.client.internal.readiness.Readiness + +private[spark] class SparkReadinessWatcher[T <: HasMetadata] extends Watcher[T] { + + private val signal = SettableFuture.create[Boolean] + + override def eventReceived(action: Action, resource: T): Unit = { + if ((action == Action.MODIFIED || action == Action.ADDED) && + Readiness.isReady(resource)) { + signal.set(true) + } + } + + override def onClose(cause: KubernetesClientException): Unit = {} + + def waitUntilReady(): Boolean = signal.get(30, TimeUnit.SECONDS) +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/StaticAssetServerLauncher.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/StaticAssetServerLauncher.scala new file mode 100644 index 0000000000000..6b483769f5254 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/StaticAssetServerLauncher.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.integrationtest + +import io.fabric8.kubernetes.api.model.{HTTPGetActionBuilder, Pod} +import io.fabric8.kubernetes.client.KubernetesClient + +import org.apache.spark.util.Utils + +/** + * Launches a simple HTTP server which provides jars that can be downloaded by Spark applications + * in integration tests. + */ +private[spark] class StaticAssetServerLauncher(kubernetesClient: KubernetesClient) { + + // Returns the HTTP Base URI of the server. + def launchStaticAssetServer(): String = { + val readinessWatcher = new SparkReadinessWatcher[Pod] + val probePingHttpGet = new HTTPGetActionBuilder() + .withNewPort(8080) + .withScheme("HTTP") + .withPath("/") + .build() + Utils.tryWithResource(kubernetesClient + .pods() + .withName("integration-test-static-assets") + .watch(readinessWatcher)) { _ => + val pod = kubernetesClient.pods().createNew() + .withNewMetadata() + .withName("integration-test-static-assets") + .endMetadata() + .withNewSpec() + .addNewContainer() + .withName("static-asset-server-container") + .withImage("spark-integration-test-asset-server:latest") + .withImagePullPolicy("IfNotPresent") + .withNewReadinessProbe() + .withHttpGet(probePingHttpGet) + .endReadinessProbe() + .endContainer() + .endSpec() + .done() + readinessWatcher.waitUntilReady() + val podIP = kubernetesClient.pods().withName(pod.getMetadata.getName).get() + .getStatus + .getPodIP + s"http://$podIP:8080" + } + } +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala index d807c4d81009b..04bd0f084c34d 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala @@ -33,6 +33,8 @@ private[spark] class SparkDockerImageBuilder(private val dockerEnv: Map[String, private val EXECUTOR_DOCKER_FILE = "dockerfiles/executor/Dockerfile" private val DRIVER_INIT_DOCKER_FILE = "dockerfiles/driver-init/Dockerfile" private val STAGING_SERVER_DOCKER_FILE = "dockerfiles/resource-staging-server/Dockerfile" + private val STATIC_ASSET_SERVER_DOCKER_FILE = + "dockerfiles/integration-test-asset-server/Dockerfile" private val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes)) private val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds)) private val dockerHost = dockerEnv.getOrElse("DOCKER_HOST", @@ -63,6 +65,7 @@ private[spark] class SparkDockerImageBuilder(private val dockerEnv: Map[String, buildImage("spark-driver-v2", DRIVER_V2_DOCKER_FILE) buildImage("spark-resource-staging-server", STAGING_SERVER_DOCKER_FILE) buildImage("spark-driver-init", DRIVER_INIT_DOCKER_FILE) + buildImage("spark-integration-test-asset-server", STATIC_ASSET_SERVER_DOCKER_FILE) } private def buildImage(name: String, dockerFile: String): Unit = {