From 2ca41c2c09fa18165fc76443b178319dc18cb8c7 Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 27 Apr 2017 12:18:06 -0700 Subject: [PATCH] Use the dependency download init-containers on executors. --- .../spark/deploy/kubernetes/config.scala | 125 +++++++++++++----- .../spark/deploy/kubernetes/constants.scala | 43 ++---- .../deploy/kubernetes/submit/v2/Client.scala | 36 +++-- .../v2/DownloadRemoteDependencyManager.scala | 81 ++++-------- ...nloadRemoteDependencyManagerProvider.scala | 4 +- .../v2/SparkPodInitContainerBootstrap.scala | 121 +++++++++++++++++ .../v2/SubmittedDependencyManagerImpl.scala | 111 ++++++---------- .../SubmittedDependencyManagerProvider.scala | 6 +- ...SparkDependencyDownloadInitContainer.scala | 10 +- ...cutorInitContainerBootstrapsProvider.scala | 115 ++++++++++++++++ .../kubernetes/KubernetesClusterManager.scala | 4 +- .../KubernetesClusterSchedulerBackend.scala | 38 ++++-- .../kubernetes/submit/v2/ClientV2Suite.scala | 113 +++++++++++----- ...DownloadRemoteDependencyManagerSuite.scala | 75 +---------- .../v2/SubmittedDependencyManagerSuite.scala | 95 +------------ ...DependencyDownloadInitContainerSuite.scala | 8 +- .../src/main/docker/executor/Dockerfile | 5 +- 17 files changed, 562 insertions(+), 428 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/SparkPodInitContainerBootstrap.scala create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorInitContainerBootstrapsProvider.scala diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index 172d15925c26b..4c36716f6fb19 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -365,14 +365,14 @@ package object config extends Logging { .createOptional private[spark] val INIT_CONTAINER_DOWNLOAD_JARS_RESOURCE_IDENTIFIER = - ConfigBuilder("spark.kubernetes.driver.initcontainer.downloadJarsResourceIdentifier") + ConfigBuilder("spark.kubernetes.initcontainer.downloadJarsResourceIdentifier") .doc("Identifier for the jars tarball that was uploaded to the staging service.") .internal() .stringConf .createOptional private[spark] val INIT_CONTAINER_DOWNLOAD_JARS_SECRET_LOCATION = - ConfigBuilder("spark.kubernetes.driver.initcontainer.downloadJarsSecretLocation") + ConfigBuilder("spark.kubernetes.initcontainer.downloadJarsSecretLocation") .doc("Location of the application secret to use when the init-container contacts the" + " resource staging server to download jars.") .internal() @@ -380,14 +380,14 @@ package object config extends Logging { .createWithDefault(INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_JARS_SECRET_PATH) private[spark] val INIT_CONTAINER_DOWNLOAD_FILES_RESOURCE_IDENTIFIER = - ConfigBuilder("spark.kubernetes.driver.initcontainer.downloadFilesResourceIdentifier") + ConfigBuilder("spark.kubernetes.initcontainer.downloadFilesResourceIdentifier") .doc("Identifier for the files tarball that was uploaded to the staging service.") .internal() .stringConf .createOptional private[spark] val INIT_CONTAINER_DOWNLOAD_FILES_SECRET_LOCATION = - ConfigBuilder("spark.kubernetes.driver.initcontainer.downloadFilesSecretLocation") + ConfigBuilder("spark.kubernetes.initcontainer.downloadFilesSecretLocation") .doc("Location of the application secret to use when the init-container contacts the" + " resource staging server to download files.") .internal() @@ -395,7 +395,7 @@ package object config extends Logging { .createWithDefault(INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_FILES_SECRET_PATH) private[spark] val INIT_CONTAINER_REMOTE_JARS = - ConfigBuilder("spark.kubernetes.driver.initcontainer.remoteJars") + ConfigBuilder("spark.kubernetes.initcontainer.remoteJars") .doc("Comma-separated list of jar URIs to download in the init-container. This is inferred" + " from spark.jars.") .internal() @@ -403,7 +403,7 @@ package object config extends Logging { .createOptional private[spark] val INIT_CONTAINER_REMOTE_FILES = - ConfigBuilder("spark.kubernetes.driver.initcontainer.remoteFiles") + ConfigBuilder("spark.kubernetes.initcontainer.remoteFiles") .doc("Comma-separated list of file URIs to download in the init-container. This is inferred" + " from spark.files.") .internal() @@ -411,50 +411,109 @@ package object config extends Logging { .createOptional private[spark] val INIT_CONTAINER_DOCKER_IMAGE = - ConfigBuilder("spark.kubernetes.driver.initcontainer.docker.image") - .doc("Image for the driver's init-container that downloads mounted dependencies.") + ConfigBuilder("spark.kubernetes.initcontainer.docker.image") + .doc("Image for the driver and executor's init-container that downloads dependencies.") .stringConf - .createWithDefault(s"spark-driver-init:$sparkVersion") + .createWithDefault(s"spark-init:$sparkVersion") - private[spark] val DRIVER_SUBMITTED_JARS_DOWNLOAD_LOCATION = - ConfigBuilder("spark.kubernetes.driver.mountdependencies.submittedJars.downloadDir") - .doc("Location to download local jars to in the driver. When using spark-submit, this" + - " directory must be empty and will be mounted as an empty directory volume on the" + - " driver pod.") + private[spark] val SUBMITTED_JARS_DOWNLOAD_LOCATION = + ConfigBuilder("spark.kubernetes.mountdependencies.submittedJars.downloadDir") + .doc("Location to download local jars to in the driver and executors. When using" + + " spark-submit, this directory must be empty and will be mounted as an empty directory" + + " volume on the driver and executor pod.") .stringConf - .createWithDefault("/var/spark-data/spark-local-jars") + .createWithDefault("/var/spark-data/spark-submitted-jars") - private[spark] val DRIVER_SUBMITTED_FILES_DOWNLOAD_LOCATION = - ConfigBuilder("spark.kubernetes.driver.mountdependencies.submittedFiles.downloadDir") - .doc("Location to download local files to in the driver. When using spark-submit, this" + - " directory must be empty and will be mounted as an empty directory volume on the" + - " driver pod.") + private[spark] val SUBMITTED_FILES_DOWNLOAD_LOCATION = + ConfigBuilder("spark.kubernetes.mountdependencies.submittedFiles.downloadDir") + .doc("Location to download submitted files to in the driver and executors. When using" + + " spark-submit, this directory must be empty and will be mounted as an empty directory" + + " volume on the driver and executor pods.") .stringConf - .createWithDefault("/var/spark-data/spark-local-files") + .createWithDefault("/var/spark-data/spark-submitted-files") - private[spark] val DRIVER_REMOTE_JARS_DOWNLOAD_LOCATION = - ConfigBuilder("spark.kubernetes.driver.mountdependencies.remoteJars.downloadDir") - .doc("Location to download remotely-located (e.g. HDFS) jars to in the driver. When" + - " using spark-submit, this directory must be empty and will be mounted as an empty" + - " directory volume on the driver pod.") + private[spark] val REMOTE_JARS_DOWNLOAD_LOCATION = + ConfigBuilder("spark.kubernetes.mountdependencies.remoteJars.downloadDir") + .doc("Location to download remotely-located (e.g. HDFS) jars to in the driver and" + + " executors. When using spark-submit, this directory must be empty and will be" + + " mounted as an empty directory volume on the driver and executor pods.") .stringConf .createWithDefault("/var/spark-data/spark-remote-jars") - private[spark] val DRIVER_REMOTE_FILES_DOWNLOAD_LOCATION = - ConfigBuilder("spark.kubernetes.driver.mountdependencies.remoteFiles.downloadDir") - .doc("Location to download remotely-located (e.g. HDFS) files to in the driver. When" + - " using spark-submit, this directory must be empty and will be mounted as an empty" + - " directory volume on the driver pod.") + private[spark] val REMOTE_FILES_DOWNLOAD_LOCATION = + ConfigBuilder("spark.kubernetes.mountdependencies.remoteFiles.downloadDir") + .doc("Location to download remotely-located (e.g. HDFS) files to in the driver and" + + " executors. When using spark-submit, this directory must be empty and will be mounted" + + " as an empty directory volume on the driver and executor pods.") .stringConf .createWithDefault("/var/spark-data/spark-remote-files") - private[spark] val DRIVER_MOUNT_DEPENDENCIES_INIT_TIMEOUT = + private[spark] val MOUNT_DEPENDENCIES_INIT_TIMEOUT = ConfigBuilder("spark.kubernetes.mountdependencies.mountTimeout") .doc("Timeout before aborting the attempt to download and unpack local dependencies from" + - " the dependency staging server when initializing the driver pod.") + " remote locations and the resource etaging server when initializing the driver and" + + " executor pods.") .timeConf(TimeUnit.MINUTES) .createWithDefault(5) + private[spark] val EXECUTOR_INIT_CONTAINER_SUBMITTED_FILES_CONFIG_MAP = + ConfigBuilder("spark.kubernetes.initcontainer.executor.submittedfiles.configmapname") + .doc("Name of the config map to use in the init-container that retrieves submitted files" + + " for the executor.") + .internal() + .stringConf + .createOptional + + private[spark] val EXECUTOR_INIT_CONTAINER_SUBMITTED_FILES_CONFIG_MAP_KEY = + ConfigBuilder("spark.kubernetes.initcontainer.executor.submittedfiles.configmapkey") + .doc("Key for the entry in the init container config map for submitted files that" + + " corresponds to the properties for this init-container.") + .internal() + .stringConf + .createOptional + + private[spark] val EXECUTOR_INIT_CONTAINER_SUBMITTED_FILES_RESOURCE_STAGING_SERVER_SECRET = + ConfigBuilder("spark.kubernetes.initcontainer.executor.submittedfiles.stagingServerSecret") + .doc("Name of the secret to mount into the init-container that retrieves submitted files.") + .internal() + .stringConf + .createOptional + + private[spark] val EXECUTOR_INIT_CONTAINER_SUBMITTED_FILES_RESOURCE_STAGING_SERVER_SECRET_DIR = + ConfigBuilder("spark.kubernetes.initcontainer.executor.submittedfiles.stagingServerSecretDir") + .doc("Directory to mount the executor's init container secret for retrieving submitted" + + " files.") + .internal() + .stringConf + .createOptional + + private[spark] val EXECUTOR_INIT_CONTAINER_REMOTE_FILES_CONFIG_MAP = + ConfigBuilder("spark.kubernetes.initcontainer.executor.remoteFiles.configmapname") + .doc("Name of the config map to use in the init-container that retrieves remote files" + + " for the executor.") + .internal() + .stringConf + .createOptional + + private[spark] val EXECUTOR_INIT_CONTAINER_REMOTE_FILES_CONFIG_MAP_KEY = + ConfigBuilder("spark.kubernetes.initcontainer.executor.remoteFiles.configmapkey") + .doc("Key for the entry in the init container config map for remote files that" + + " corresponds to the properties for this init-container.") + .internal() + .stringConf + .createOptional + + private[spark] val EXECUTOR_RESOLVED_MOUNTED_CLASSPATH = + ConfigBuilder("spark.kubernetes.executor.resolvedMountedClasspath") + .doc("Expected resolved classpath after the executor's init-containers download" + + " dependencies from the resource staging server and from remote locations, if" + + " applicable. The submission client determines this assuming that the executors will" + + " download the dependencies in the same way that the driver does.") + .internal() + .stringConf + .toSequence + .createWithDefault(Seq.empty[String]) + private[spark] def resolveK8sMaster(rawMasterString: String): String = { if (!rawMasterString.startsWith("k8s://")) { throw new IllegalArgumentException("Master URL should start with k8s:// in Kubernetes mode.") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala index b8f118955d5db..93d7c03a9f4c3 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala @@ -87,6 +87,7 @@ package object constants { private[spark] val ENV_EXECUTOR_POD_IP = "SPARK_EXECUTOR_POD_IP" private[spark] val ENV_DRIVER_MEMORY = "SPARK_DRIVER_MEMORY" private[spark] val ENV_SUBMIT_EXTRA_CLASSPATH = "SPARK_SUBMIT_EXTRA_CLASSPATH" + private[spark] val ENV_EXECUTOR_EXTRA_CLASSPATH = "SPARK_SUBMIT_EXTRA_CLASSPATH" private[spark] val ENV_MOUNTED_CLASSPATH = "SPARK_MOUNTED_CLASSPATH" private[spark] val ENV_DRIVER_MAIN_CLASS = "SPARK_DRIVER_CLASS" private[spark] val ENV_DRIVER_ARGS = "SPARK_DRIVER_ARGS" @@ -107,12 +108,6 @@ package object constants { // V2 submission init container private[spark] val INIT_CONTAINER_ANNOTATION = "pod.beta.kubernetes.io/init-containers" - - // Init container for downloading submitted files from the staging server. - private[spark] val INIT_CONTAINER_SUBMITTED_FILES_CONTAINER_NAME = - "spark-driver-download-submitted-files" - private[spark] val INIT_CONTAINER_SUBMITTED_FILES_SECRETS_VOLUME_NAME = - "resource-staging-server-secret" private[spark] val INIT_CONTAINER_SUBMITTED_FILES_SECRETS_VOLUME_MOUNT_PATH = "/mnt/secrets/spark-init" private[spark] val INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_JARS_SECRET_KEY = @@ -131,35 +126,13 @@ package object constants { s"$INIT_CONTAINER_SUBMITTED_FILES_TRUSTSTORE_SECRET_KEY" private[spark] val INIT_CONTAINER_SUBMITTED_FILES_CONFIG_MAP_KEY = "download-submitted-files" - private[spark] val INIT_CONTAINER_SUBMITTED_FILES_PROPERTIES_FILE_VOLUME = - "download-submitted-files-properties" - private[spark] val INIT_CONTAINER_SUBMITTED_FILES_PROPERTIES_FILE_MOUNT_PATH = - "/etc/spark-init/download-submitted-files" - private[spark] val INIT_CONTAINER_SUBMITTED_FILES_PROPERTIES_FILE_NAME = - "init-driver-download-submitted-files.properties" - private[spark] val INIT_CONTAINER_SUBMITTED_FILES_PROPERTIES_FILE_PATH = - s"$INIT_CONTAINER_SUBMITTED_FILES_PROPERTIES_FILE_MOUNT_PATH/" + - s"$INIT_CONTAINER_SUBMITTED_FILES_PROPERTIES_FILE_NAME" - private[spark] val INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_JARS_VOLUME_NAME = - "download-submitted-jars" - private[spark] val INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_FILES_VOLUME_NAME = - "download-submitted-files" - - // Init container for fetching remote dependencies. - private[spark] val INIT_CONTAINER_REMOTE_FILES_CONTAINER_NAME = - "spark-driver-download-remote-files" private[spark] val INIT_CONTAINER_REMOTE_FILES_CONFIG_MAP_KEY = "download-remote-files" - private[spark] val INIT_CONTAINER_REMOTE_FILES_PROPERTIES_FILE_VOLUME = - "download-remote-files-properties" - private[spark] val INIT_CONTAINER_REMOTE_FILES_PROPERTIES_FILE_MOUNT_PATH = - "/etc/spark-init/download-remote-files" - private[spark] val INIT_CONTAINER_REMOTE_FILES_PROPERTIES_FILE_NAME = - "init-driver-download-remote-files.properties" - private[spark] val INIT_CONTAINER_REMOTE_FILES_PROPERTIES_FILE_PATH = - s"$INIT_CONTAINER_REMOTE_FILES_PROPERTIES_FILE_MOUNT_PATH/" + - s"$INIT_CONTAINER_REMOTE_FILES_PROPERTIES_FILE_NAME" - private[spark] val INIT_CONTAINER_REMOTE_FILES_DOWNLOAD_JARS_VOLUME_NAME = "download-remote-jars" - private[spark] val INIT_CONTAINER_REMOTE_FILES_DOWNLOAD_FILES_VOLUME_NAME = - "download-remote-files" + private[spark] val INIT_CONTAINER_SUBMITTED_FILES_SUFFIX = "staged-files" + private[spark] val INIT_CONTAINER_REMOTE_FILES_SUFFIX = "remote-files" + private[spark] val INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME = "download-jars-volume" + private[spark] val INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME = "download-files" + private[spark] val INIT_CONTAINER_PROPERTIES_FILE_VOLUME = "spark-init-properties" + private[spark] val INIT_CONTAINER_PROPERTIES_FILE_NAME = "spark-init.properties" + private[spark] val INIT_CONTAINER_SECRET_VOLUME_NAME = "spark-init-secret" } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/Client.scala index 13ea5e360fbb0..e4ab25d686dc9 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/Client.scala @@ -156,11 +156,11 @@ private[spark] class Client( // If the resource staging server isn't being used, then resolvedJars = spark.jars. val resolvedJars = mutable.Buffer[String]() val resolvedFiles = mutable.Buffer[String]() - val driverPodWithMountedDeps = maybeStagingServerUri.map { stagingServerUri => + val (resolvedSparkConf, driverPodWithMountedDeps) = maybeStagingServerUri.map { stagingUri => val submittedDependencyManager = submittedDependencyManagerProvider .getSubmittedDependencyManager( kubernetesAppId, - stagingServerUri, + stagingUri, allLabels, namespace, sparkJars, @@ -175,11 +175,16 @@ private[spark] class Client( resolvedFiles ++= submittedDependencyManager.resolveSparkFiles() nonDriverPodKubernetesResources += initContainerKubernetesSecret nonDriverPodKubernetesResources += initContainerConfigMap - submittedDependencyManager.configurePodToMountLocalDependencies( - driverContainer.getName, - initContainerKubernetesSecret, - initContainerConfigMap, - driverPodWithMountedDriverKubernetesCredentials) + val bootstrappedPod = submittedDependencyManager.getInitContainerBootstrap( + initContainerKubernetesSecret, initContainerConfigMap) + .bootstrapInitContainerAndVolumes( + driverContainer.getName, driverPodWithMountedDriverKubernetesCredentials) + val sparkConfWithExecutorInitContainer = submittedDependencyManager + .configureExecutorsToFetchSubmittedDependencies( + sparkConfWithDriverPodKubernetesCredentialLocations, + initContainerConfigMap, + initContainerKubernetesSecret) + (sparkConfWithExecutorInitContainer, bootstrappedPod) }.getOrElse { sparkJars.map(Utils.resolveURI).foreach { jar => require(Option.apply(jar.getScheme).getOrElse("file") != "file", @@ -193,9 +198,9 @@ private[spark] class Client( } resolvedJars ++= sparkJars resolvedFiles ++= sparkFiles - driverPodWithMountedDriverKubernetesCredentials + (sparkConfWithDriverPodKubernetesCredentialLocations.clone(), + driverPodWithMountedDriverKubernetesCredentials) } - val resolvedSparkConf = sparkConfWithDriverPodKubernetesCredentialLocations.clone() if (resolvedJars.nonEmpty) { resolvedSparkConf.set("spark.jars", resolvedJars.mkString(",")) } @@ -215,8 +220,11 @@ private[spark] class Client( .buildInitContainerConfigMap() nonDriverPodKubernetesResources += downloadRemoteDependenciesConfigMap val driverPodWithMountedAndDownloadedDeps = remoteDependencyManager - .configurePodToDownloadRemoteDependencies( - downloadRemoteDependenciesConfigMap, driverContainer.getName, driverPodWithMountedDeps) + .getInitContainerBootstrap(downloadRemoteDependenciesConfigMap) + .bootstrapInitContainerAndVolumes(driverContainer.getName, driverPodWithMountedDeps) + val sparkConfExecutorsFetchRemoteDeps = remoteDependencyManager + .configureExecutorsToFetchRemoteDependencies( + resolvedSparkConf, downloadRemoteDependenciesConfigMap) // The resolved local classpath should *only* contain local file URIs. It consists of the // driver's classpath (minus spark.driver.extraClassPath which was handled above) with the @@ -227,8 +235,10 @@ private[spark] class Client( resolvedLocalClassPath.foreach { classPathEntry => require(Option(URI.create(classPathEntry).getScheme).isEmpty) } - val resolvedDriverJavaOpts = resolvedSparkConf.getAll.map { case (confKey, confValue) => - s"-D$confKey=$confValue" + sparkConfExecutorsFetchRemoteDeps.set( + EXECUTOR_RESOLVED_MOUNTED_CLASSPATH, resolvedLocalClassPath) + val resolvedDriverJavaOpts = sparkConfExecutorsFetchRemoteDeps.getAll.map { + case (confKey, confValue) => s"-D$confKey=$confValue" }.mkString(" ") + driverJavaOptions.map(" " + _).getOrElse("") val resolvedDriverPod = driverPodWithMountedAndDownloadedDeps.editSpec() .editMatchingContainer(new ContainerNameEqualityPredicate(driverContainer.getName)) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/DownloadRemoteDependencyManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/DownloadRemoteDependencyManager.scala index d346c971ad06b..f19027c203daf 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/DownloadRemoteDependencyManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/DownloadRemoteDependencyManager.scala @@ -18,8 +18,9 @@ package org.apache.spark.deploy.kubernetes.submit.v2 import java.io.File -import io.fabric8.kubernetes.api.model.{ConfigMap, ContainerBuilder, EmptyDirVolumeSource, PodBuilder, VolumeMountBuilder} +import io.fabric8.kubernetes.api.model.ConfigMap +import org.apache.spark.SparkConf import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ import org.apache.spark.deploy.kubernetes.submit.KubernetesFileUtils @@ -29,15 +30,15 @@ private[spark] trait DownloadRemoteDependencyManager { def buildInitContainerConfigMap(): ConfigMap - def configurePodToDownloadRemoteDependencies( - initContainerConfigMap: ConfigMap, - driverContainerName: String, - originalPodSpec: PodBuilder): PodBuilder + def getInitContainerBootstrap(initContainerConfigMap: ConfigMap): SparkPodInitContainerBootstrap /** * Return the local classpath of the driver after all of its dependencies have been downloaded. */ def resolveLocalClasspath(): Seq[String] + + def configureExecutorsToFetchRemoteDependencies( + sparkConf: SparkConf, initContainerConfigMap: ConfigMap): SparkConf } // TODO this is very similar to SubmittedDependencyManagerImpl. We should consider finding a way to @@ -65,8 +66,8 @@ private[spark] class DownloadRemoteDependencyManagerImpl( Map.empty[String, String] } val initContainerConfig = Map[String, String]( - DRIVER_REMOTE_JARS_DOWNLOAD_LOCATION.key -> jarsDownloadPath, - DRIVER_REMOTE_FILES_DOWNLOAD_LOCATION.key -> filesDownloadPath) ++ + REMOTE_JARS_DOWNLOAD_LOCATION.key -> jarsDownloadPath, + REMOTE_FILES_DOWNLOAD_LOCATION.key -> filesDownloadPath) ++ remoteJarsConf ++ remoteFilesConf PropertiesConfigMapFromScalaMapBuilder.buildConfigMap( @@ -75,54 +76,15 @@ private[spark] class DownloadRemoteDependencyManagerImpl( initContainerConfig) } - override def configurePodToDownloadRemoteDependencies( - initContainerConfigMap: ConfigMap, - driverContainerName: String, - originalPodSpec: PodBuilder): PodBuilder = { - val sharedVolumeMounts = Seq( - new VolumeMountBuilder() - .withName(INIT_CONTAINER_REMOTE_FILES_DOWNLOAD_JARS_VOLUME_NAME) - .withMountPath(jarsDownloadPath) - .build(), - new VolumeMountBuilder() - .withName(INIT_CONTAINER_REMOTE_FILES_DOWNLOAD_FILES_VOLUME_NAME) - .withMountPath(filesDownloadPath) - .build()) - val initContainer = new ContainerBuilder() - .withName(INIT_CONTAINER_REMOTE_FILES_CONTAINER_NAME) - .withArgs(INIT_CONTAINER_REMOTE_FILES_PROPERTIES_FILE_PATH) - .addNewVolumeMount() - .withName(INIT_CONTAINER_REMOTE_FILES_PROPERTIES_FILE_VOLUME) - .withMountPath(INIT_CONTAINER_REMOTE_FILES_PROPERTIES_FILE_MOUNT_PATH) - .endVolumeMount() - .addToVolumeMounts(sharedVolumeMounts: _*) - .withImage(initContainerImage) - .withImagePullPolicy("IfNotPresent") - .build() - InitContainerUtil.appendInitContainer(originalPodSpec, initContainer) - .editSpec() - .addNewVolume() - .withName(INIT_CONTAINER_REMOTE_FILES_PROPERTIES_FILE_VOLUME) - .withNewConfigMap() - .withName(initContainerConfigMap.getMetadata.getName) - .addNewItem() - .withKey(INIT_CONTAINER_REMOTE_FILES_CONFIG_MAP_KEY) - .withPath(INIT_CONTAINER_REMOTE_FILES_PROPERTIES_FILE_NAME) - .endItem() - .endConfigMap() - .endVolume() - .addNewVolume() - .withName(INIT_CONTAINER_REMOTE_FILES_DOWNLOAD_JARS_VOLUME_NAME) - .withEmptyDir(new EmptyDirVolumeSource()) - .endVolume() - .addNewVolume() - .withName(INIT_CONTAINER_REMOTE_FILES_DOWNLOAD_FILES_VOLUME_NAME) - .withEmptyDir(new EmptyDirVolumeSource()) - .endVolume() - .editMatchingContainer(new ContainerNameEqualityPredicate(driverContainerName)) - .addToVolumeMounts(sharedVolumeMounts: _*) - .endContainer() - .endSpec() + override def getInitContainerBootstrap(initContainerConfigMap: ConfigMap) + : SparkPodInitContainerBootstrap = { + new SparkPodInitContainerBootstrapImpl( + INIT_CONTAINER_REMOTE_FILES_SUFFIX, + initContainerConfigMap.getMetadata.getName, + INIT_CONTAINER_REMOTE_FILES_CONFIG_MAP_KEY, + initContainerImage, + jarsDownloadPath, + filesDownloadPath) } override def resolveLocalClasspath(): Seq[String] = { @@ -137,4 +99,13 @@ private[spark] class DownloadRemoteDependencyManagerImpl( } } } + + override def configureExecutorsToFetchRemoteDependencies( + sparkConf: SparkConf, initContainerConfigMap: ConfigMap): SparkConf = { + sparkConf.clone() + .set(EXECUTOR_INIT_CONTAINER_REMOTE_FILES_CONFIG_MAP, + initContainerConfigMap.getMetadata.getName) + .set(EXECUTOR_INIT_CONTAINER_REMOTE_FILES_CONFIG_MAP_KEY, + INIT_CONTAINER_REMOTE_FILES_CONFIG_MAP_KEY) + } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/DownloadRemoteDependencyManagerProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/DownloadRemoteDependencyManagerProvider.scala index b5cc4c24d582d..ad077e14ce032 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/DownloadRemoteDependencyManagerProvider.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/DownloadRemoteDependencyManagerProvider.scala @@ -37,8 +37,8 @@ private[spark] class DownloadRemoteDependencyManagerProviderImpl( kubernetesAppId, sparkJars, sparkFiles, - sparkConf.get(DRIVER_REMOTE_JARS_DOWNLOAD_LOCATION), - sparkConf.get(DRIVER_REMOTE_FILES_DOWNLOAD_LOCATION), + sparkConf.get(REMOTE_JARS_DOWNLOAD_LOCATION), + sparkConf.get(REMOTE_FILES_DOWNLOAD_LOCATION), sparkConf.get(INIT_CONTAINER_DOCKER_IMAGE)) } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/SparkPodInitContainerBootstrap.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/SparkPodInitContainerBootstrap.scala new file mode 100644 index 0000000000000..9a408a3d7269d --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/SparkPodInitContainerBootstrap.scala @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.submit.v2 + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, EmptyDirVolumeSource, PodBuilder, VolumeBuilder, VolumeMount, VolumeMountBuilder} + +import org.apache.spark.deploy.kubernetes.constants._ + +private[spark] case class InitContainerSecretConfiguration( + secretName: String, + secretMountPath: String) + +private[spark] trait SparkPodInitContainerBootstrap { + + /** + * Bootstraps an init-container that downloads dependencies to be used by a main container. + * Note that this primarily assumes that the init-container's configuration is being provided + * by a Config Map that was installed by some other component; that is, the implementation + * here makes no assumptions about how the init-container is specifically configured. For + * example, this class is unaware if the init-container is fetching remote dependencies or if + * it is fetching dependencies from a resource staging server. + */ + def bootstrapInitContainerAndVolumes( + mainContainerName: String, + originalPodSpec: PodBuilder): PodBuilder +} + +private[spark] class SparkPodInitContainerBootstrapImpl( + initContainerComponentsSuffix: String, + initContainerConfigMapName: String, + initContainerConfigMapKey: String, + initContainerImage: String, + jarsDownloadPath: String, + filesDownloadPath: String, + initContainerSecret: Option[InitContainerSecretConfiguration] = None) + extends SparkPodInitContainerBootstrap { + override def bootstrapInitContainerAndVolumes( + mainContainerName: String, + originalPodSpec: PodBuilder): PodBuilder = { + val jarsVolumeName = s"$INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME-$initContainerComponentsSuffix" + val filesVolumeName = + s"$INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME-$initContainerComponentsSuffix" + val sharedVolumeMounts = Seq[VolumeMount]( + new VolumeMountBuilder() + .withName(jarsVolumeName) + .withMountPath(jarsDownloadPath) + .build(), + new VolumeMountBuilder() + .withName(filesVolumeName) + .withMountPath(filesDownloadPath) + .build()) + + val propertiesFileDirectory = s"/etc/spark-init/init-$initContainerComponentsSuffix" + val propertiesFilePath = s"$propertiesFileDirectory/$INIT_CONTAINER_PROPERTIES_FILE_NAME" + val initContainerSecretVolume = initContainerSecret.map { secretNameAndMountPath => + new VolumeBuilder() + .withName(s"$INIT_CONTAINER_SECRET_VOLUME_NAME-$initContainerComponentsSuffix") + .withNewSecret() + .withSecretName(secretNameAndMountPath.secretName) + .endSecret() + .build() + } + val initContainerSecretVolumeMount = initContainerSecret.map { secretNameAndMountPath => + new VolumeMountBuilder() + .withName(s"$INIT_CONTAINER_SECRET_VOLUME_NAME-$initContainerComponentsSuffix") + .withMountPath(secretNameAndMountPath.secretMountPath) + .build() + } + val initContainer = new ContainerBuilder() + .withName(s"spark-init-$initContainerComponentsSuffix") + .withImage(initContainerImage) + .withImagePullPolicy("IfNotPresent") + .addNewVolumeMount() + .withName(s"$INIT_CONTAINER_PROPERTIES_FILE_VOLUME-$initContainerComponentsSuffix") + .withMountPath(propertiesFileDirectory) + .endVolumeMount() + .addToVolumeMounts(initContainerSecretVolumeMount.toSeq: _*) + .addToVolumeMounts(sharedVolumeMounts: _*) + .addToArgs(propertiesFilePath) + .build() + InitContainerUtil.appendInitContainer(originalPodSpec, initContainer) + .editSpec() + .addNewVolume() + .withName(s"$INIT_CONTAINER_PROPERTIES_FILE_VOLUME-$initContainerComponentsSuffix") + .withNewConfigMap() + .withName(initContainerConfigMapName) + .addNewItem() + .withKey(initContainerConfigMapKey) + .withPath(INIT_CONTAINER_PROPERTIES_FILE_NAME) + .endItem() + .endConfigMap() + .endVolume() + .addNewVolume() + .withName(jarsVolumeName) + .withEmptyDir(new EmptyDirVolumeSource()) + .endVolume() + .addNewVolume() + .withName(filesVolumeName) + .withEmptyDir(new EmptyDirVolumeSource()) + .endVolume() + .addToVolumes(initContainerSecretVolume.toSeq: _*) + .editMatchingContainer(new ContainerNameEqualityPredicate(mainContainerName)) + .addToVolumeMounts(sharedVolumeMounts: _*) + .endContainer() + .endSpec() + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencyManagerImpl.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencyManagerImpl.scala index da9c8a8b348fa..0f27d703bd155 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencyManagerImpl.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencyManagerImpl.scala @@ -23,13 +23,13 @@ import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule import com.google.common.base.Charsets import com.google.common.io.{BaseEncoding, Files} -import io.fabric8.kubernetes.api.model.{ConfigMap, ContainerBuilder, EmptyDirVolumeSource, PodBuilder, Secret, SecretBuilder, VolumeMount, VolumeMountBuilder} +import io.fabric8.kubernetes.api.model.{ConfigMap, Secret, SecretBuilder} import okhttp3.RequestBody import retrofit2.Call import scala.collection.JavaConverters._ import scala.collection.mutable -import org.apache.spark.{SparkException, SSLOptions} +import org.apache.spark.{SparkConf, SparkException, SSLOptions} import org.apache.spark.deploy.kubernetes.CompressionUtils import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ @@ -52,11 +52,9 @@ private[spark] trait SubmittedDependencyManager { */ def uploadFiles(): StagedResourceIdentifier - def configurePodToMountLocalDependencies( - driverContainerName: String, + def getInitContainerBootstrap( initContainerSecret: Secret, - initContainerConfigMap: ConfigMap, - originalPodSpec: PodBuilder): PodBuilder + initContainerConfigMap: ConfigMap): SparkPodInitContainerBootstrap def buildInitContainerSecret(jarsSecret: String, filesSecret: String): Secret @@ -74,6 +72,15 @@ private[spark] trait SubmittedDependencyManager { * the locations they will be downloaded to on the driver's disk. */ def resolveSparkFiles(): Seq[String] + + /** + * Adjusts the Spark configuration such that the scheduler backend will configure + * executor pods to attach an init-container that fetches these submitted dependencies. + */ + def configureExecutorsToFetchSubmittedDependencies( + sparkConf: SparkConf, + initContainerConfigMap: ConfigMap, + initContainerSecret: Secret): SparkConf } /** @@ -138,66 +145,19 @@ private[spark] class SubmittedDependencyManagerImpl( getTypedResponseResult(uploadResponse) } - override def configurePodToMountLocalDependencies( - driverContainerName: String, + override def getInitContainerBootstrap( initContainerSecret: Secret, - initContainerConfigMap: ConfigMap, - originalPodSpec: PodBuilder): PodBuilder = { - val sharedVolumeMounts = Seq[VolumeMount]( - new VolumeMountBuilder() - .withName(INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_JARS_VOLUME_NAME) - .withMountPath(jarsDownloadPath) - .build(), - new VolumeMountBuilder() - .withName(INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_FILES_VOLUME_NAME) - .withMountPath(filesDownloadPath) - .build()) - - val initContainer = new ContainerBuilder() - .withName(INIT_CONTAINER_SUBMITTED_FILES_CONTAINER_NAME) - .withImage(initContainerImage) - .withImagePullPolicy("IfNotPresent") - .addNewVolumeMount() - .withName(INIT_CONTAINER_SUBMITTED_FILES_PROPERTIES_FILE_VOLUME) - .withMountPath(INIT_CONTAINER_SUBMITTED_FILES_PROPERTIES_FILE_MOUNT_PATH) - .endVolumeMount() - .addNewVolumeMount() - .withName(INIT_CONTAINER_SUBMITTED_FILES_SECRETS_VOLUME_NAME) - .withMountPath(INIT_CONTAINER_SUBMITTED_FILES_SECRETS_VOLUME_MOUNT_PATH) - .endVolumeMount() - .addToVolumeMounts(sharedVolumeMounts: _*) - .addToArgs(INIT_CONTAINER_SUBMITTED_FILES_PROPERTIES_FILE_PATH) - .build() - InitContainerUtil.appendInitContainer(originalPodSpec, initContainer) - .editSpec() - .addNewVolume() - .withName(INIT_CONTAINER_SUBMITTED_FILES_PROPERTIES_FILE_VOLUME) - .withNewConfigMap() - .withName(initContainerConfigMap.getMetadata.getName) - .addNewItem() - .withKey(INIT_CONTAINER_SUBMITTED_FILES_CONFIG_MAP_KEY) - .withPath(INIT_CONTAINER_SUBMITTED_FILES_PROPERTIES_FILE_NAME) - .endItem() - .endConfigMap() - .endVolume() - .addNewVolume() - .withName(INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_JARS_VOLUME_NAME) - .withEmptyDir(new EmptyDirVolumeSource()) - .endVolume() - .addNewVolume() - .withName(INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_FILES_VOLUME_NAME) - .withEmptyDir(new EmptyDirVolumeSource()) - .endVolume() - .addNewVolume() - .withName(INIT_CONTAINER_SUBMITTED_FILES_SECRETS_VOLUME_NAME) - .withNewSecret() - .withSecretName(initContainerSecret.getMetadata.getName) - .endSecret() - .endVolume() - .editMatchingContainer(new ContainerNameEqualityPredicate(driverContainerName)) - .addToVolumeMounts(sharedVolumeMounts: _*) - .endContainer() - .endSpec() + initContainerConfigMap: ConfigMap): SparkPodInitContainerBootstrap = { + new SparkPodInitContainerBootstrapImpl( + INIT_CONTAINER_SUBMITTED_FILES_SUFFIX, + initContainerConfigMap.getMetadata.getName, + INIT_CONTAINER_SUBMITTED_FILES_CONFIG_MAP_KEY, + initContainerImage, + jarsDownloadPath, + filesDownloadPath, + Some(InitContainerSecretConfiguration( + secretName = initContainerSecret.getMetadata.getName, + secretMountPath = INIT_CONTAINER_SUBMITTED_FILES_SECRETS_VOLUME_MOUNT_PATH))) } override def buildInitContainerSecret(jarsSecret: String, filesSecret: String): Secret = { @@ -226,15 +186,15 @@ private[spark] class SubmittedDependencyManagerImpl( jarsResourceId: String, filesResourceId: String): ConfigMap = { val initContainerConfig = Map[String, String]( RESOURCE_STAGING_SERVER_URI.key -> stagingServerUri, - DRIVER_SUBMITTED_JARS_DOWNLOAD_LOCATION.key -> jarsDownloadPath, - DRIVER_SUBMITTED_FILES_DOWNLOAD_LOCATION.key -> filesDownloadPath, + SUBMITTED_JARS_DOWNLOAD_LOCATION.key -> jarsDownloadPath, + SUBMITTED_FILES_DOWNLOAD_LOCATION.key -> filesDownloadPath, INIT_CONTAINER_DOWNLOAD_JARS_RESOURCE_IDENTIFIER.key -> jarsResourceId, INIT_CONTAINER_DOWNLOAD_JARS_SECRET_LOCATION.key -> INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_JARS_SECRET_PATH, INIT_CONTAINER_DOWNLOAD_FILES_RESOURCE_IDENTIFIER.key -> filesResourceId, INIT_CONTAINER_DOWNLOAD_FILES_SECRET_LOCATION.key -> INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_FILES_SECRET_PATH, - DRIVER_MOUNT_DEPENDENCIES_INIT_TIMEOUT.key -> s"${downloadTimeoutMinutes}m", + MOUNT_DEPENDENCIES_INIT_TIMEOUT.key -> s"${downloadTimeoutMinutes}m", RESOURCE_STAGING_SERVER_SSL_ENABLED.key -> stagingServiceSslOptions.enabled.toString) ++ stagingServiceSslOptions.trustStore.map { _ => (RESOURCE_STAGING_SERVER_TRUSTSTORE_FILE.key, @@ -256,6 +216,21 @@ private[spark] class SubmittedDependencyManagerImpl( override def resolveSparkFiles(): Seq[String] = resolveLocalFiles(sparkFiles, filesDownloadPath) + override def configureExecutorsToFetchSubmittedDependencies( + sparkConf: SparkConf, + initContainerConfigMap: ConfigMap, + initContainerSecret: Secret): SparkConf = { + sparkConf.clone() + .set(EXECUTOR_INIT_CONTAINER_SUBMITTED_FILES_CONFIG_MAP, + initContainerConfigMap.getMetadata.getName) + .set(EXECUTOR_INIT_CONTAINER_SUBMITTED_FILES_CONFIG_MAP_KEY, + INIT_CONTAINER_SUBMITTED_FILES_CONFIG_MAP_KEY) + .set(EXECUTOR_INIT_CONTAINER_SUBMITTED_FILES_RESOURCE_STAGING_SERVER_SECRET, + initContainerSecret.getMetadata.getName) + .set(EXECUTOR_INIT_CONTAINER_SUBMITTED_FILES_RESOURCE_STAGING_SERVER_SECRET_DIR, + INIT_CONTAINER_SUBMITTED_FILES_SECRETS_VOLUME_MOUNT_PATH) + } + private def resolveLocalFiles( allFileUriStrings: Seq[String], localDownloadRoot: String): Seq[String] = { val usedLocalFileNames = mutable.HashSet.empty[String] diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencyManagerProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencyManagerProvider.scala index 799fbb1b26c43..f89a6ca25e0b7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencyManagerProvider.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencyManagerProvider.scala @@ -47,9 +47,9 @@ private[spark] class SubmittedDependencyManagerProviderImpl(sparkConf: SparkConf podNamespace, stagingServerUri, sparkConf.get(INIT_CONTAINER_DOCKER_IMAGE), - sparkConf.get(DRIVER_SUBMITTED_JARS_DOWNLOAD_LOCATION), - sparkConf.get(DRIVER_SUBMITTED_FILES_DOWNLOAD_LOCATION), - sparkConf.get(DRIVER_MOUNT_DEPENDENCIES_INIT_TIMEOUT), + sparkConf.get(SUBMITTED_JARS_DOWNLOAD_LOCATION), + sparkConf.get(SUBMITTED_FILES_DOWNLOAD_LOCATION), + sparkConf.get(MOUNT_DEPENDENCIES_INIT_TIMEOUT), sparkJars, sparkFiles, resourceStagingServerSslOptions, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyDownloadInitContainer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyDownloadInitContainer.scala index af6022f9e1465..fcb3d6274b456 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyDownloadInitContainer.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyDownloadInitContainer.scala @@ -111,18 +111,18 @@ private[spark] class KubernetesSparkDependencyDownloadInitContainer( sparkConf.get(INIT_CONTAINER_DOWNLOAD_FILES_SECRET_LOCATION)) private val stagingServerJarsDownloadDir = new File( - sparkConf.get(DRIVER_SUBMITTED_JARS_DOWNLOAD_LOCATION)) + sparkConf.get(SUBMITTED_JARS_DOWNLOAD_LOCATION)) private val stagingServerFilesDownloadDir = new File( - sparkConf.get(DRIVER_SUBMITTED_FILES_DOWNLOAD_LOCATION)) + sparkConf.get(SUBMITTED_FILES_DOWNLOAD_LOCATION)) private val remoteJars = sparkConf.get(INIT_CONTAINER_REMOTE_JARS) private val remoteFiles = sparkConf.get(INIT_CONTAINER_REMOTE_FILES) private val remoteJarsDownloadDir = new File( - sparkConf.get(DRIVER_REMOTE_JARS_DOWNLOAD_LOCATION)) + sparkConf.get(REMOTE_JARS_DOWNLOAD_LOCATION)) private val remoteFilesDownloadDir = new File( - sparkConf.get(DRIVER_REMOTE_FILES_DOWNLOAD_LOCATION)) + sparkConf.get(REMOTE_FILES_DOWNLOAD_LOCATION)) - private val downloadTimeoutMinutes = sparkConf.get(DRIVER_MOUNT_DEPENDENCIES_INIT_TIMEOUT) + private val downloadTimeoutMinutes = sparkConf.get(MOUNT_DEPENDENCIES_INIT_TIMEOUT) def run(): Unit = { val resourceStagingServerJarsDownload = Future[Unit] { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorInitContainerBootstrapsProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorInitContainerBootstrapsProvider.scala new file mode 100644 index 0000000000000..0d5f493816bdb --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorInitContainerBootstrapsProvider.scala @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.kubernetes + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.deploy.kubernetes.submit.v2.{InitContainerSecretConfiguration, SparkPodInitContainerBootstrap, SparkPodInitContainerBootstrapImpl} +import org.apache.spark.internal.config.OptionalConfigEntry + +/** + * Adds init-containers to the executor pods that downloads resources from remote locations and the + * resource staging server, if applicable. + */ +private[spark] trait ExecutorInitContainerBootstrapsProvider { + def getInitContainerBootstraps: Iterable[SparkPodInitContainerBootstrap] + +} + +private[spark] class ExecutorInitContainerBootstrapsProviderImpl(sparkConf: SparkConf) + extends ExecutorInitContainerBootstrapsProvider { + import ExecutorInitContainerBootstrapsProvider._ + private val submittedJarsDownloadPath = sparkConf.get(SUBMITTED_JARS_DOWNLOAD_LOCATION) + private val submittedFilesDownloadPath = sparkConf.get(SUBMITTED_FILES_DOWNLOAD_LOCATION) + private val submittedFilesInitContainerConfigMap = sparkConf.get( + EXECUTOR_INIT_CONTAINER_SUBMITTED_FILES_CONFIG_MAP) + private val submittedFilesInitContainerConfigMapKey = sparkConf.get( + EXECUTOR_INIT_CONTAINER_SUBMITTED_FILES_CONFIG_MAP_KEY) + private val submittedFilesInitContainerSecret = sparkConf.get( + EXECUTOR_INIT_CONTAINER_SUBMITTED_FILES_RESOURCE_STAGING_SERVER_SECRET) + private val submittedFilesInitContainerSecretDir = sparkConf.get( + EXECUTOR_INIT_CONTAINER_SUBMITTED_FILES_RESOURCE_STAGING_SERVER_SECRET_DIR) + + requireAllOrNoneDefined(Seq( + EXECUTOR_INIT_CONTAINER_SUBMITTED_FILES_CONFIG_MAP, + EXECUTOR_INIT_CONTAINER_SUBMITTED_FILES_CONFIG_MAP_KEY, + EXECUTOR_INIT_CONTAINER_SUBMITTED_FILES_RESOURCE_STAGING_SERVER_SECRET, + EXECUTOR_INIT_CONTAINER_SUBMITTED_FILES_RESOURCE_STAGING_SERVER_SECRET_DIR), + submittedFilesInitContainerConfigMap, + submittedFilesInitContainerConfigMapKey, + submittedFilesInitContainerSecret, + submittedFilesInitContainerSecretDir) + + private val remoteJarsDownloadPath = sparkConf.get(REMOTE_JARS_DOWNLOAD_LOCATION) + private val remoteFilesDownloadPath = sparkConf.get(REMOTE_FILES_DOWNLOAD_LOCATION) + private val remoteFilesInitContainerConfigMap = sparkConf.get( + EXECUTOR_INIT_CONTAINER_REMOTE_FILES_CONFIG_MAP) + private val remoteFilesInitContainerConfigMapKey = sparkConf.get( + EXECUTOR_INIT_CONTAINER_REMOTE_FILES_CONFIG_MAP_KEY) + + requireAllOrNoneDefined(Seq( + EXECUTOR_INIT_CONTAINER_REMOTE_FILES_CONFIG_MAP, + EXECUTOR_INIT_CONTAINER_REMOTE_FILES_CONFIG_MAP_KEY), + remoteFilesInitContainerConfigMap, + remoteFilesInitContainerConfigMapKey) + + private val initContainerImage = sparkConf.get(INIT_CONTAINER_DOCKER_IMAGE) + + override def getInitContainerBootstraps: Iterable[SparkPodInitContainerBootstrap] = { + val submittedFileBootstrap = for { + configMap <- submittedFilesInitContainerConfigMap + configMapKey <- submittedFilesInitContainerConfigMapKey + secret <- submittedFilesInitContainerSecret + secretDir <- submittedFilesInitContainerSecretDir + } yield { + new SparkPodInitContainerBootstrapImpl( + INIT_CONTAINER_SUBMITTED_FILES_SUFFIX, + configMap, + configMapKey, + initContainerImage, + submittedJarsDownloadPath, + submittedFilesDownloadPath, + Some(InitContainerSecretConfiguration(secretName = secret, secretMountPath = secretDir))) + } + + val remoteFilesBootstrap = for { + configMap <- remoteFilesInitContainerConfigMap + configMapKey <- remoteFilesInitContainerConfigMapKey + } yield { + new SparkPodInitContainerBootstrapImpl( + INIT_CONTAINER_REMOTE_FILES_SUFFIX, + configMap, + configMapKey, + initContainerImage, + remoteJarsDownloadPath, + remoteFilesDownloadPath) + } + submittedFileBootstrap.toSeq ++ remoteFilesBootstrap.toSeq + } +} + +private object ExecutorInitContainerBootstrapsProvider { + def requireAllOrNoneDefined( + configEntries: Seq[OptionalConfigEntry[_]], opts: Option[_]*): Unit = { + if (opts.exists(_.isDefined)) { + val errorMessage = s"If any of the following configurations are defined, all of the others" + + s" must also be provided: ${configEntries.map(_.key).mkString(",")}" + require(opts.forall(_.isDefined), errorMessage) + } + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala index 36f7149a832c3..ae1a628cfd9ad 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala @@ -31,7 +31,9 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager { override def createSchedulerBackend(sc: SparkContext, masterURL: String, scheduler: TaskScheduler) : SchedulerBackend = { - new KubernetesClusterSchedulerBackend(sc.taskScheduler.asInstanceOf[TaskSchedulerImpl], sc) + val bootstrapsProvider = new ExecutorInitContainerBootstrapsProviderImpl(sc.getConf) + new KubernetesClusterSchedulerBackend( + sc.taskScheduler.asInstanceOf[TaskSchedulerImpl], sc, bootstrapsProvider) } override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 15457db7e1459..edc5054574051 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -16,10 +16,10 @@ */ package org.apache.spark.scheduler.cluster.kubernetes +import java.io.File import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} -import io.fabric8.kubernetes.api.model.{ContainerPortBuilder, EnvVarBuilder, - EnvVarSourceBuilder, Pod, QuantityBuilder} +import io.fabric8.kubernetes.api.model.{ContainerPortBuilder, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder} import scala.collection.JavaConverters._ import scala.concurrent.{ExecutionContext, Future} @@ -33,7 +33,8 @@ import org.apache.spark.util.{ThreadUtils, Utils} private[spark] class KubernetesClusterSchedulerBackend( scheduler: TaskSchedulerImpl, - val sc: SparkContext) + val sc: SparkContext, + executorInitContainerBootstrapsProvider: ExecutorInitContainerBootstrapsProvider) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) { import KubernetesClusterSchedulerBackend._ @@ -41,6 +42,9 @@ private[spark] class KubernetesClusterSchedulerBackend( private val EXECUTOR_MODIFICATION_LOCK = new Object private val runningExecutorPods = new scala.collection.mutable.HashMap[String, Pod] + private val executorExtraClasspath = conf.get( + org.apache.spark.internal.config.EXECUTOR_CLASS_PATH) + private val executorResolvedMountedClasspath = conf.get(EXECUTOR_RESOLVED_MOUNTED_CLASSPATH) private val executorDockerImage = conf.get(EXECUTOR_DOCKER_IMAGE) private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) private val executorPort = conf.getInt("spark.executor.port", DEFAULT_STATIC_PORT) @@ -64,6 +68,8 @@ private[spark] class KubernetesClusterSchedulerBackend( private val executorMemoryWithOverhead = executorMemoryMb + memoryOverheadMb private val executorCores = conf.getOption("spark.executor.cores").getOrElse("1") + private val executorBootstraps = executorInitContainerBootstrapsProvider + .getInitContainerBootstraps private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("kubernetes-executor-requests")) @@ -164,13 +170,21 @@ private[spark] class KubernetesClusterSchedulerBackend( val executorCpuQuantity = new QuantityBuilder(false) .withAmount(executorCores) .build() + val executorExtraClasspathEnv = executorExtraClasspath.map { cp => + new EnvVarBuilder() + .withName(ENV_EXECUTOR_EXTRA_CLASSPATH) + .withValue(cp) + .build() + } val requiredEnv = Seq( (ENV_EXECUTOR_PORT, executorPort.toString), (ENV_DRIVER_URL, driverUrl), (ENV_EXECUTOR_CORES, executorCores), (ENV_EXECUTOR_MEMORY, executorMemoryString), (ENV_APPLICATION_ID, applicationId()), - (ENV_EXECUTOR_ID, executorId)) + (ENV_EXECUTOR_ID, executorId), + (ENV_MOUNTED_CLASSPATH, + executorResolvedMountedClasspath.mkString(File.pathSeparator))) .map(env => new EnvVarBuilder() .withName(env._1) .withValue(env._2) @@ -193,19 +207,19 @@ private[spark] class KubernetesClusterSchedulerBackend( .build() }) try { - (executorId, kubernetesClient.pods().createNew() + val baseExecutorPod = new PodBuilder() .withNewMetadata() .withName(name) .withLabels(selectors) .withOwnerReferences() - .addNewOwnerReference() + .addNewOwnerReference() .withController(true) .withApiVersion(driverPod.getApiVersion) .withKind(driverPod.getKind) .withName(driverPod.getMetadata.getName) .withUid(driverPod.getMetadata.getUid) - .endOwnerReference() - .endMetadata() + .endOwnerReference() + .endMetadata() .withNewSpec() .withHostname(hostname) .addNewContainer() @@ -219,10 +233,16 @@ private[spark] class KubernetesClusterSchedulerBackend( .addToLimits("cpu", executorCpuQuantity) .endResources() .withEnv(requiredEnv.asJava) + .addToEnv(executorExtraClasspathEnv.toSeq: _*) .withPorts(requiredPorts.asJava) .endContainer() .endSpec() - .done()) + var resolvedExecutorPod = baseExecutorPod + for (bootstrap <- executorBootstraps) { + resolvedExecutorPod = bootstrap + .bootstrapInitContainerAndVolumes("executor", resolvedExecutorPod) + } + (executorId, kubernetesClient.pods.create(resolvedExecutorPod.build())) } catch { case throwable: Throwable => logError("Failed to allocate executor pod.", throwable) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/ClientV2Suite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/ClientV2Suite.scala index d2a7e7fa7b26c..9f263a3f5eb94 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/ClientV2Suite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/ClientV2Suite.scala @@ -85,6 +85,8 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { private var captureCreatedResourcesAnswer: AllArgumentsCapturingAnswer[HasMetadata, RESOURCES] = _ private var credentialsMounterProvider: DriverPodKubernetesCredentialsMounterProvider = _ private var credentialsMounter: DriverPodKubernetesCredentialsMounter = _ + private var submittedFilesInitContainerBootstrap: SparkPodInitContainerBootstrap = _ + private var downloadRemoteFilesInitContainerBootstrap: SparkPodInitContainerBootstrap = _ private var capturedJars: Option[Seq[String]] = None private var capturedFiles: Option[Seq[String]] = None @@ -108,6 +110,8 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { remoteDependencyManager = mock[DownloadRemoteDependencyManager] credentialsMounterProvider = mock[DriverPodKubernetesCredentialsMounterProvider] credentialsMounter = mock[DriverPodKubernetesCredentialsMounter] + submittedFilesInitContainerBootstrap = mock[SparkPodInitContainerBootstrap] + downloadRemoteFilesInitContainerBootstrap = mock[SparkPodInitContainerBootstrap] when(remoteDependencyManagerProvider.getDownloadRemoteDependencyManager(any(), any(), any())) .thenAnswer(new Answer[DownloadRemoteDependencyManager] { override def answer(invocationOnMock: InvocationOnMock): DownloadRemoteDependencyManager = { @@ -124,10 +128,13 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { capturedJars.toSeq.flatten.map(Utils.resolveURI(_).getPath) } }) - when(remoteDependencyManager.configurePodToDownloadRemoteDependencies( - mockitoEq(downloadRemoteDependenciesConfigMap), - any(), - any())).thenAnswer(AdditionalAnswers.returnsArgAt(2)) + when(remoteDependencyManager.getInitContainerBootstrap(downloadRemoteDependenciesConfigMap)) + .thenReturn(downloadRemoteFilesInitContainerBootstrap) + when(downloadRemoteFilesInitContainerBootstrap.bootstrapInitContainerAndVolumes( + mockitoEq(DRIVER_CONTAINER_NAME), any())) + .thenAnswer(AdditionalAnswers.returnsArgAt(1)) + when(remoteDependencyManager.configureExecutorsToFetchRemoteDependencies(any(), any())) + .thenAnswer(AdditionalAnswers.returnsArgAt(0)) when(submissionKubernetesClientProvider.get).thenReturn(submissionKubernetesClient) when(submissionKubernetesClient.pods()).thenReturn(podOperations) captureCreatedPodAnswer = new SelfArgumentCapturingAnswer[Pod] @@ -189,32 +196,52 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { sparkConf.set(org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS, "-Dopt1=opt1value") sparkConf.set("spark.logConf", "true") val createdDriverPod = createAndGetDriverPod() - val maybeDriverContainer = getDriverContainer(createdDriverPod) - maybeDriverContainer.foreach { driverContainer => + val maybeJvmOptions = getDriverJvmOptions(createdDriverPod) + assert(maybeJvmOptions.isDefined) + maybeJvmOptions.foreach { jvmOptions => + assert(jvmOptions("opt1") === "opt1value") + assert(jvmOptions.contains("spark.app.id")) + assert(jvmOptions("spark.jars") === MAIN_APP_RESOURCE) + assert(jvmOptions(KUBERNETES_DRIVER_POD_NAME.key).startsWith(APP_NAME)) + assert(jvmOptions("spark.app.name") === APP_NAME) + assert(jvmOptions("spark.logConf") === "true") + } + } + + // Tests with local dependencies with the mounted dependency manager. + private def getDriverJvmOptions(driverPod: Pod): Option[Map[String, String]] = { + val maybeDriverContainer = getDriverContainer(driverPod) + val maybeJvmOptions = maybeDriverContainer.flatMap { driverContainer => val maybeJvmOptionsEnv = driverContainer.getEnv .asScala .find(_.getName == ENV_DRIVER_JAVA_OPTS) - assert(maybeJvmOptionsEnv.isDefined) - maybeJvmOptionsEnv.foreach { jvmOptionsEnv => + maybeJvmOptionsEnv.map { jvmOptionsEnv => val jvmOptions = jvmOptionsEnv.getValue.split(" ") jvmOptions.foreach { opt => assert(opt.startsWith("-D")) } - val optionKeyValues = jvmOptions.map { option => + jvmOptions.map { option => val withoutDashDPrefix = option.stripPrefix("-D") val split = withoutDashDPrefix.split('=') assert(split.length == 2) (split(0), split(1)) }.toMap - assert(optionKeyValues("opt1") === "opt1value") - assert(optionKeyValues.contains("spark.app.id")) - assert(optionKeyValues("spark.jars") === MAIN_APP_RESOURCE) - assert(optionKeyValues(KUBERNETES_DRIVER_POD_NAME.key).startsWith(APP_NAME)) - assert(optionKeyValues("spark.app.name") === APP_NAME) - assert(optionKeyValues("spark.logConf") === "true") } } + maybeJvmOptions + } + + test("Uploading local dependencies should configure executors to pull from the" + + " resource staging server") { + val initContainerConfigMap = getInitContainerConfigMap + val initContainerSecret = getInitContainerSecret + runWithMountedDependencies(initContainerConfigMap, initContainerSecret) + val driverPod = captureCreatedPodAnswer.capturedArgument + val jvmOptions = getDriverJvmOptions(driverPod) + assert(jvmOptions.isDefined) + jvmOptions.foreach { options => + assert(options("spark.testing.configuredExecutorsMountLocal") === "true") + } } - // Tests with local dependencies with the mounted dependency manager. test("Uploading local dependencies should create Kubernetes secrets and config map") { val initContainerConfigMap = getInitContainerConfigMap val initContainerSecret = getInitContainerSecret @@ -257,13 +284,16 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { } test("Remote dependency manager should configure the driver pod and the local classpath") { - Mockito.reset(remoteDependencyManager) - when(remoteDependencyManager - .configurePodToDownloadRemoteDependencies( - mockitoEq(downloadRemoteDependenciesConfigMap), any(), any())) + Mockito.reset(remoteDependencyManager, downloadRemoteFilesInitContainerBootstrap) + when(remoteDependencyManager.configureExecutorsToFetchRemoteDependencies(any(), any())) + .thenAnswer(AdditionalAnswers.returnsFirstArg()) + when(remoteDependencyManager.getInitContainerBootstrap(downloadRemoteDependenciesConfigMap)) + .thenReturn(downloadRemoteFilesInitContainerBootstrap) + when(downloadRemoteFilesInitContainerBootstrap.bootstrapInitContainerAndVolumes( + mockitoEq(DRIVER_CONTAINER_NAME), any())) .thenAnswer(new Answer[PodBuilder]() { override def answer(invocationOnMock: InvocationOnMock): PodBuilder = { - val originalPod = invocationOnMock.getArgumentAt(2, classOf[PodBuilder]) + val originalPod = invocationOnMock.getArgumentAt(1, classOf[PodBuilder]) originalPod.editMetadata().addToLabels("added-remote-dependency", "true").endMetadata() } }) @@ -272,9 +302,10 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { when(remoteDependencyManager.buildInitContainerConfigMap()) .thenReturn(downloadRemoteDependenciesConfigMap) val createdDriverPod = createAndGetDriverPod() - Mockito.verify(remoteDependencyManager).configurePodToDownloadRemoteDependencies( - mockitoEq(downloadRemoteDependenciesConfigMap), - any(), + Mockito.verify(remoteDependencyManager) + .getInitContainerBootstrap(downloadRemoteDependenciesConfigMap) + Mockito.verify(downloadRemoteFilesInitContainerBootstrap).bootstrapInitContainerAndVolumes( + mockitoEq(DRIVER_CONTAINER_NAME), any()) assert(createdDriverPod.getMetadata.getLabels.get("added-remote-dependency") === "true") val driverContainer = createdDriverPod @@ -347,20 +378,30 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { when(mountedDependencyManager.buildInitContainerConfigMap( DOWNLOAD_JARS_RESOURCE_IDENTIFIER.resourceId, DOWNLOAD_FILES_RESOURCE_IDENTIFIER.resourceId)) .thenReturn(initContainerConfigMap) + when(mountedDependencyManager.configureExecutorsToFetchSubmittedDependencies( + any(), any(), any())) + .thenAnswer(new Answer[SparkConf]() { + override def answer(invocationOnMock: InvocationOnMock): SparkConf = { + val originalSparkConf = invocationOnMock.getArgumentAt(0, classOf[SparkConf]) + originalSparkConf.clone().set("spark.testing.configuredExecutorsMountLocal", "true") + } + }) when(mountedDependencyManager.resolveSparkJars()).thenReturn(RESOLVED_SPARK_JARS) when(mountedDependencyManager.resolveSparkFiles()).thenReturn(RESOLVED_SPARK_FILES) - when(mountedDependencyManager.configurePodToMountLocalDependencies( - mockitoEq(DRIVER_CONTAINER_NAME), - mockitoEq(initContainerSecret), - mockitoEq(initContainerConfigMap), - any())).thenAnswer(new Answer[PodBuilder] { - override def answer(invocationOnMock: InvocationOnMock): PodBuilder = { - val basePod = invocationOnMock.getArgumentAt(3, classOf[PodBuilder]) - basePod.editMetadata().addToAnnotations(MOUNTED_FILES_ANNOTATION_KEY, "true").endMetadata() - } - }) - val clientUnderTest = createClient() - clientUnderTest.run() + when(mountedDependencyManager.getInitContainerBootstrap( + initContainerSecret, + initContainerConfigMap)).thenReturn(submittedFilesInitContainerBootstrap) + when(submittedFilesInitContainerBootstrap.bootstrapInitContainerAndVolumes( + mockitoEq(DRIVER_CONTAINER_NAME), any())) + .thenAnswer(new Answer[PodBuilder]() { + override def answer(invocationOnMock: InvocationOnMock): PodBuilder = { + val basePod = invocationOnMock.getArgumentAt(1, classOf[PodBuilder]) + basePod.editMetadata() + .addToAnnotations(MOUNTED_FILES_ANNOTATION_KEY, "true") + .endMetadata() + } + }) + createAndGetDriverPod() } private def getDriverContainer(driverPod: Pod): Option[Container] = { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/DownloadRemoteDependencyManagerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/DownloadRemoteDependencyManagerSuite.scala index df41516d10f91..17fff111815e0 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/DownloadRemoteDependencyManagerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/DownloadRemoteDependencyManagerSuite.scala @@ -29,7 +29,6 @@ import org.apache.spark.deploy.kubernetes.constants._ class DownloadRemoteDependencyManagerSuite extends SparkFunSuite { - private val OBJECT_MAPPER = new ObjectMapper() private val APP_ID = "app-id" private val SPARK_JARS = Seq( "hdfs://localhost:9000/jar1.jar", @@ -71,82 +70,12 @@ class DownloadRemoteDependencyManagerSuite extends SparkFunSuite { val downloadRemoteFilesSplit = downloadRemoteFiles.split(",").toSet val expectedRemoteFiles = Set("hdfs://localhost:9000/file.txt") assert(downloadRemoteFilesSplit === expectedRemoteFiles) - assert(initContainerProperties.getProperty(DRIVER_REMOTE_JARS_DOWNLOAD_LOCATION.key) === + assert(initContainerProperties.getProperty(REMOTE_JARS_DOWNLOAD_LOCATION.key) === JARS_DOWNLOAD_PATH) - assert(initContainerProperties.getProperty(DRIVER_REMOTE_FILES_DOWNLOAD_LOCATION.key) === + assert(initContainerProperties.getProperty(REMOTE_FILES_DOWNLOAD_LOCATION.key) === FILES_DOWNLOAD_PATH) } - test("Pod should have an appropriate init-container attached") { - val originalPodSpec = new PodBuilder() - .withNewMetadata() - .withName("driver") - .endMetadata() - .withNewSpec() - .addNewContainer() - .withName("driver") - .endContainer() - .endSpec() - val configMap = new ConfigMapBuilder() - .withNewMetadata() - .withName("config-map") - .endMetadata() - .build() - val adjustedPod = dependencyManagerUnderTest.configurePodToDownloadRemoteDependencies( - configMap, "driver", originalPodSpec).build() - val annotations = adjustedPod.getMetadata.getAnnotations - assert(annotations.size === 1) - val initContainerAnnotation = annotations.get(INIT_CONTAINER_ANNOTATION) - assert(annotations != null) - val initContainers = OBJECT_MAPPER.readValue(initContainerAnnotation, classOf[Array[Container]]) - assert(initContainers.length === 1) - val initContainer = initContainers(0) - assert(initContainer.getName === INIT_CONTAINER_REMOTE_FILES_CONTAINER_NAME) - assert(initContainer.getArgs.size() === 1) - assert(initContainer.getArgs.get(0) === INIT_CONTAINER_REMOTE_FILES_PROPERTIES_FILE_PATH) - assert(initContainer.getImage === INIT_CONTAINER_IMAGE) - assert(initContainer.getImagePullPolicy === "IfNotPresent") - val initContainerVolumeMounts = initContainer - .getVolumeMounts - .asScala - .map { mount => - (mount.getName, mount.getMountPath) - }.toSet - val expectedVolumeMounts = Set( - (INIT_CONTAINER_REMOTE_FILES_DOWNLOAD_JARS_VOLUME_NAME, JARS_DOWNLOAD_PATH), - (INIT_CONTAINER_REMOTE_FILES_DOWNLOAD_FILES_VOLUME_NAME, FILES_DOWNLOAD_PATH), - (INIT_CONTAINER_REMOTE_FILES_PROPERTIES_FILE_VOLUME, - INIT_CONTAINER_REMOTE_FILES_PROPERTIES_FILE_MOUNT_PATH)) - assert(initContainerVolumeMounts === expectedVolumeMounts) - val podVolumes = adjustedPod.getSpec.getVolumes.asScala.map { volume => - (volume.getName, volume) - }.toMap - assert(podVolumes.size === 3) - assert(podVolumes.get(INIT_CONTAINER_REMOTE_FILES_PROPERTIES_FILE_VOLUME).isDefined) - val propertiesConfigMapVolume = podVolumes(INIT_CONTAINER_REMOTE_FILES_PROPERTIES_FILE_VOLUME) - assert(propertiesConfigMapVolume.getConfigMap != null) - val configMapItems = propertiesConfigMapVolume.getConfigMap.getItems.asScala - assert(configMapItems.size === 1) - assert(configMapItems(0).getKey === INIT_CONTAINER_REMOTE_FILES_CONFIG_MAP_KEY) - assert(configMapItems(0).getPath === INIT_CONTAINER_REMOTE_FILES_PROPERTIES_FILE_NAME) - assert(podVolumes.get(INIT_CONTAINER_REMOTE_FILES_DOWNLOAD_JARS_VOLUME_NAME).isDefined) - assert(podVolumes(INIT_CONTAINER_REMOTE_FILES_DOWNLOAD_JARS_VOLUME_NAME).getEmptyDir != null) - assert(podVolumes.get(INIT_CONTAINER_REMOTE_FILES_DOWNLOAD_FILES_VOLUME_NAME).isDefined) - assert(podVolumes(INIT_CONTAINER_REMOTE_FILES_DOWNLOAD_FILES_VOLUME_NAME).getEmptyDir != null) - val addedVolumeMounts = adjustedPod - .getSpec - .getContainers - .get(0) - .getVolumeMounts - .asScala - .map { mount => (mount.getName, mount.getMountPath) } - .toSet - val expectedAddedVolumeMounts = Set( - (INIT_CONTAINER_REMOTE_FILES_DOWNLOAD_JARS_VOLUME_NAME, JARS_DOWNLOAD_PATH), - (INIT_CONTAINER_REMOTE_FILES_DOWNLOAD_FILES_VOLUME_NAME, FILES_DOWNLOAD_PATH)) - assert(addedVolumeMounts === expectedAddedVolumeMounts) - } - test("Resolving the local classpath should map remote jars to their downloaded locations") { val resolvedLocalClasspath = dependencyManagerUnderTest.resolveLocalClasspath() val expectedLocalClasspath = Set( diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencyManagerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencyManagerSuite.scala index 4b04681868d38..3311dea3986be 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencyManagerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencyManagerSuite.scala @@ -52,8 +52,8 @@ private[spark] class SubmittedDependencyManagerSuite extends SparkFunSuite with private val NAMESPACE = "namespace" private val STAGING_SERVER_URI = "http://localhost:8000" private val INIT_CONTAINER_IMAGE = "spark-driver-init:latest" - private val JARS_DOWNLOAD_PATH = DRIVER_SUBMITTED_JARS_DOWNLOAD_LOCATION.defaultValue.get - private val FILES_DOWNLOAD_PATH = DRIVER_SUBMITTED_FILES_DOWNLOAD_LOCATION.defaultValue.get + private val JARS_DOWNLOAD_PATH = SUBMITTED_JARS_DOWNLOAD_LOCATION.defaultValue.get + private val FILES_DOWNLOAD_PATH = SUBMITTED_FILES_DOWNLOAD_LOCATION.defaultValue.get private val DOWNLOAD_TIMEOUT_MINUTES = 5 private val LOCAL_JARS = Seq(createTempFile("jar"), createTempFile("jar")) private val JARS = Seq("hdfs://localhost:9000/jars/jar1.jar", @@ -146,15 +146,15 @@ private[spark] class SubmittedDependencyManagerSuite extends SparkFunSuite with }.toMap val expectedProperties = Map[String, String]( RESOURCE_STAGING_SERVER_URI.key -> STAGING_SERVER_URI, - DRIVER_SUBMITTED_JARS_DOWNLOAD_LOCATION.key -> JARS_DOWNLOAD_PATH, - DRIVER_SUBMITTED_FILES_DOWNLOAD_LOCATION.key -> FILES_DOWNLOAD_PATH, + SUBMITTED_JARS_DOWNLOAD_LOCATION.key -> JARS_DOWNLOAD_PATH, + SUBMITTED_FILES_DOWNLOAD_LOCATION.key -> FILES_DOWNLOAD_PATH, INIT_CONTAINER_DOWNLOAD_JARS_RESOURCE_IDENTIFIER.key -> JARS_RESOURCE_ID, INIT_CONTAINER_DOWNLOAD_JARS_SECRET_LOCATION.key -> INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_JARS_SECRET_PATH, INIT_CONTAINER_DOWNLOAD_FILES_RESOURCE_IDENTIFIER.key -> FILES_RESOURCE_ID, INIT_CONTAINER_DOWNLOAD_FILES_SECRET_LOCATION.key -> INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_FILES_SECRET_PATH, - DRIVER_MOUNT_DEPENDENCIES_INIT_TIMEOUT.key -> s"${DOWNLOAD_TIMEOUT_MINUTES}m", + MOUNT_DEPENDENCIES_INIT_TIMEOUT.key -> s"${DOWNLOAD_TIMEOUT_MINUTES}m", RESOURCE_STAGING_SERVER_TRUSTSTORE_FILE.key -> INIT_CONTAINER_SUBMITTED_FILES_TRUSTSTORE_PATH, RESOURCE_STAGING_SERVER_SSL_ENABLED.key -> "true", RESOURCE_STAGING_SERVER_TRUSTSTORE_PASSWORD.key -> TRUSTSTORE_PASSWORD, @@ -179,91 +179,6 @@ private[spark] class SubmittedDependencyManagerSuite extends SparkFunSuite with assert(resolvedFiles === expectedResolvedFiles) } - test("Downloading init container should be added to pod") { - val driverPod = configureDriverPod() - val podAnnotations = driverPod.getMetadata.getAnnotations - assert(podAnnotations.size === 1) - val initContainerRawAnnotation = podAnnotations.get(INIT_CONTAINER_ANNOTATION) - val initContainers = OBJECT_MAPPER.readValue( - initContainerRawAnnotation, classOf[Array[Container]]) - assert(initContainers.size === 1) - val initContainer = initContainers.head - assert(initContainer.getName === INIT_CONTAINER_SUBMITTED_FILES_CONTAINER_NAME) - assert(initContainer.getImage === INIT_CONTAINER_IMAGE) - assert(initContainer.getImagePullPolicy === "IfNotPresent") - val volumeMounts = initContainer.getVolumeMounts - .asScala - .map(mount => (mount.getName, mount.getMountPath)) - .toMap - val expectedVolumeMounts = Map[String, String]( - INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_JARS_VOLUME_NAME -> JARS_DOWNLOAD_PATH, - INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_FILES_VOLUME_NAME -> FILES_DOWNLOAD_PATH, - INIT_CONTAINER_SUBMITTED_FILES_PROPERTIES_FILE_VOLUME -> - INIT_CONTAINER_SUBMITTED_FILES_PROPERTIES_FILE_MOUNT_PATH, - INIT_CONTAINER_SUBMITTED_FILES_SECRETS_VOLUME_NAME -> - INIT_CONTAINER_SUBMITTED_FILES_SECRETS_VOLUME_MOUNT_PATH) - assert(volumeMounts === expectedVolumeMounts) - } - - test("Driver pod should have added volumes and volume mounts for file downloads") { - val driverPod = configureDriverPod() - val volumes = driverPod.getSpec.getVolumes.asScala.map(volume => (volume.getName, volume)).toMap - val initContainerPropertiesVolume = volumes( - INIT_CONTAINER_SUBMITTED_FILES_PROPERTIES_FILE_VOLUME).getConfigMap - assert(initContainerPropertiesVolume != null) - assert(initContainerPropertiesVolume.getName === "config") - assert(initContainerPropertiesVolume.getItems.asScala.exists { keyToPath => - keyToPath.getKey == INIT_CONTAINER_SUBMITTED_FILES_CONFIG_MAP_KEY && - keyToPath.getPath == INIT_CONTAINER_SUBMITTED_FILES_PROPERTIES_FILE_NAME - }) - val jarsVolume = volumes(INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_JARS_VOLUME_NAME) - assert(jarsVolume.getEmptyDir != null) - val filesVolume = volumes(INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_FILES_VOLUME_NAME) - assert(filesVolume.getEmptyDir != null) - val initContainerSecretVolume = volumes(INIT_CONTAINER_SUBMITTED_FILES_SECRETS_VOLUME_NAME) - assert(initContainerSecretVolume.getSecret != null) - assert(initContainerSecretVolume.getSecret.getSecretName === "secret") - val driverContainer = driverPod.getSpec - .getContainers - .asScala - .find(_.getName == "driver-container").get - val driverContainerVolumeMounts = driverContainer.getVolumeMounts - .asScala - .map(mount => (mount.getName, mount.getMountPath)) - .toMap - val expectedVolumeMountNamesAndPaths = Map[String, String]( - INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_JARS_VOLUME_NAME -> JARS_DOWNLOAD_PATH, - INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_FILES_VOLUME_NAME -> FILES_DOWNLOAD_PATH) - assert(driverContainerVolumeMounts === expectedVolumeMountNamesAndPaths) - } - - private def configureDriverPod(): Pod = { - val initContainerSecret = new SecretBuilder() - .withNewMetadata().withName("secret").endMetadata() - .addToData("datakey", "datavalue") - .build() - val initContainerConfigMap = new ConfigMapBuilder() - .withNewMetadata().withName("config").endMetadata() - .addToData("datakey", "datavalue") - .build() - val basePod = new PodBuilder() - .withNewMetadata() - .withName("driver-pod") - .endMetadata() - .withNewSpec() - .addNewContainer() - .withName("driver-container") - .withImage("spark-driver:latest") - .endContainer() - .endSpec() - val adjustedPod = dependencyManagerUnderTest.configurePodToMountLocalDependencies( - "driver-container", - initContainerSecret, - initContainerConfigMap, - basePod).build() - adjustedPod - } - private def testUploadSendsCorrectFiles( expectedFiles: Seq[String], capturingArgumentsAnswer: UploadDependenciesArgumentsCapturingAnswer) = { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyDownloadInitContainerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyDownloadInitContainerSuite.scala index a572981519857..838f27442158f 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyDownloadInitContainerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyDownloadInitContainerSuite.scala @@ -141,8 +141,8 @@ class KubernetesSparkDependencyDownloadInitContainerSuite .set(INIT_CONTAINER_DOWNLOAD_JARS_SECRET_LOCATION, DOWNLOAD_JARS_SECRET_LOCATION) .set(INIT_CONTAINER_DOWNLOAD_FILES_RESOURCE_IDENTIFIER, FILES_RESOURCE_ID) .set(INIT_CONTAINER_DOWNLOAD_FILES_SECRET_LOCATION, DOWNLOAD_FILES_SECRET_LOCATION) - .set(DRIVER_SUBMITTED_JARS_DOWNLOAD_LOCATION, downloadJarsDir.getAbsolutePath) - .set(DRIVER_SUBMITTED_FILES_DOWNLOAD_LOCATION, downloadFilesDir.getAbsolutePath) + .set(SUBMITTED_JARS_DOWNLOAD_LOCATION, downloadJarsDir.getAbsolutePath) + .set(SUBMITTED_FILES_DOWNLOAD_LOCATION, downloadFilesDir.getAbsolutePath) .set(RESOURCE_STAGING_SERVER_SSL_ENABLED, true) .set(RESOURCE_STAGING_SERVER_TRUSTSTORE_FILE, TRUSTSTORE_FILE.getAbsolutePath) .set(RESOURCE_STAGING_SERVER_TRUSTSTORE_PASSWORD, TRUSTSTORE_PASSWORD) @@ -155,8 +155,8 @@ class KubernetesSparkDependencyDownloadInitContainerSuite "http://localhost:9000/jar1.jar,hdfs://localhost:9000/jar2.jar") .set(INIT_CONTAINER_REMOTE_FILES, "http://localhost:9000/file.txt") - .set(DRIVER_REMOTE_JARS_DOWNLOAD_LOCATION, downloadJarsDir.getAbsolutePath) - .set(DRIVER_REMOTE_FILES_DOWNLOAD_LOCATION, downloadFilesDir.getAbsolutePath) + .set(REMOTE_JARS_DOWNLOAD_LOCATION, downloadJarsDir.getAbsolutePath) + .set(REMOTE_FILES_DOWNLOAD_LOCATION, downloadFilesDir.getAbsolutePath) } private def checkWrittenFilesAreTheSameAsOriginal( diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile index 23c6751f1b3ed..c5f1c43ff7cf4 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile @@ -37,4 +37,7 @@ ENV SPARK_HOME /opt/spark WORKDIR /opt/spark # TODO support spark.executor.extraClassPath -CMD exec ${JAVA_HOME}/bin/java -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp ${SPARK_HOME}/jars/\* org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP +CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \ + if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \ + if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ + exec ${JAVA_HOME}/bin/java -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP