From eb017f2f593aef6eed5075446c75b1f35a534223 Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 7 Sep 2017 18:58:44 -0700 Subject: [PATCH 1/8] Use emptyDir volume mounts for executor local directories. --- .../apache/spark/deploy/k8s/constants.scala | 2 + .../cluster/k8s/ExecutorPodFactory.scala | 52 +++++++++++++++++-- .../k8s/KubernetesClusterManager.scala | 15 +++--- 3 files changed, 59 insertions(+), 10 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala index d8aec7d3c5bd7..cee81b3808d7e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala @@ -71,6 +71,7 @@ package object constants { private[spark] val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY" private[spark] val ENV_JAVA_OPT_PREFIX = "SPARK_JAVA_OPT_" private[spark] val ENV_MOUNTED_FILES_FROM_SECRET_DIR = "SPARK_MOUNTED_FILES_FROM_SECRET_DIR" + private[spark] val ENV_SPARK_LOCAL_DIRS = "SPARK_LOCAL_DIRS" // Bootstrapping dependencies with the init-container private[spark] val INIT_CONTAINER_ANNOTATION = "pod.beta.kubernetes.io/init-containers" @@ -102,4 +103,5 @@ package object constants { private[spark] val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc" private[spark] val MEMORY_OVERHEAD_FACTOR = 0.10 private[spark] val MEMORY_OVERHEAD_MIN_MIB = 384L + private[spark] val GENERATED_LOCAL_DIR_MOUNT_ROOT = "/mnt/tmp/spark-local" } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala index 6f4ba1c8b888f..efbd4be7146a7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala @@ -16,7 +16,9 @@ */ package org.apache.spark.scheduler.cluster.k8s -import scala.collection.JavaConverters._ +import java.io.File +import java.nio.file.Paths +import java.util.UUID import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder} @@ -262,9 +264,53 @@ private[spark] class ExecutorPodFactoryImpl( val executorPodWithNodeAffinity = nodeAffinityExecutorPodModifier.addNodeAffinityAnnotationIfUseful( executorPodWithInitContainer, nodeToLocalTaskCount) - new PodBuilder(executorPodWithNodeAffinity) + + val (executorPodWithTempLocalDirs, executorContainerWithTempLocalDirs) = + if (shuffleManager.isEmpty) { + // If we're not using the external shuffle manager, we should use emptyDir volumes for + // shuffle directories since it's important for disk I/O for these directories to be + // performant. If the user has not provided a local directory, instead of using the + // Java temporary directory, we create one instead. This is because we want to avoid + // as much as possible mounting an emptyDir which overlaps with an existing path in + // the Docker image, which is very likely what would happen if we tried to mount the + // volume at Java's temporary directory path, which is /tmp in many JDKs. + val resolvedLocalDirs = sparkConf.get( + "spark.local.dir", s"$GENERATED_LOCAL_DIR_MOUNT_ROOT/${UUID.randomUUID()}") + .split(",") + val localDirVolumes = resolvedLocalDirs.zipWithIndex.map { case (dir, index) => + new VolumeBuilder() + .withName(s"spark-local-dir-$index-${Paths.get(dir).getFileName.toString}") + .withNewEmptyDir().endEmptyDir() + .build() + } + val localDirVolumeMounts = localDirVolumes.zip(resolvedLocalDirs).map { + case (volume, path) => + new VolumeMountBuilder() + .withName(volume.getName) + .withMountPath(path) + .build() + } + // Setting the SPARK_LOCAL_DIRS environment variable will force the executor to use the + // generated directory if the user did not provide one, as opposed to using the Java + // temporary directory. This also overrides the value of spark.local.dir in SparkConf, + // which is intended. See Utils#getConfiguredLocalDirs(). + (new PodBuilder(executorPodWithNodeAffinity) + .editSpec() + .addToVolumes(localDirVolumes: _*) + .endSpec() + .build(), + new ContainerBuilder(initBootstrappedExecutorContainer) + .addToVolumeMounts(localDirVolumeMounts: _*) + .addNewEnv() + .withName(ENV_SPARK_LOCAL_DIRS) + .withValue(resolvedLocalDirs.mkString(",")) + .endEnv() + .build()) + } else (executorPodWithNodeAffinity, initBootstrappedExecutorContainer) + + new PodBuilder(executorPodWithTempLocalDirs) .editSpec() - .addToContainers(initBootstrappedExecutorContainer) + .addToContainers(executorContainerWithTempLocalDirs) .endSpec() .build() } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index cd92df439a7e6..587420611544a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -113,15 +113,16 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)), Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH))) - val kubernetesShuffleManager = if (Utils.isDynamicAllocationEnabled(sparkConf)) { + val kubernetesShuffleManager = if (sparkConf.get( + org.apache.spark.internal.config.SHUFFLE_SERVICE_ENABLED)) { val kubernetesExternalShuffleClient = new KubernetesExternalShuffleClientImpl( - SparkTransportConf.fromSparkConf(sparkConf, "shuffle"), - sc.env.securityManager, - sc.env.securityManager.isAuthenticationEnabled()) + SparkTransportConf.fromSparkConf(sparkConf, "shuffle"), + sc.env.securityManager, + sc.env.securityManager.isAuthenticationEnabled()) Some(new KubernetesExternalShuffleManagerImpl( - sparkConf, - kubernetesClient, - kubernetesExternalShuffleClient)) + sparkConf, + kubernetesClient, + kubernetesExternalShuffleClient)) } else None val executorPodFactory = new ExecutorPodFactoryImpl( From ba3c486a52733823dcf255d1d3375d7dafb09db2 Mon Sep 17 00:00:00 2001 From: mcheah Date: Fri, 8 Sep 2017 18:37:37 -0700 Subject: [PATCH 2/8] Mount local dirs in the driver. Remove shuffle dir configuration. --- .../org/apache/spark/deploy/k8s/config.scala | 6 -- ...DriverConfigurationStepsOrchestrator.scala | 6 +- ...LocalDirectoryMountConfigurationStep.scala | 99 +++++++++++++++++++ .../cluster/k8s/ExecutorPodFactory.scala | 8 +- .../KubernetesExternalShuffleManager.scala | 4 +- ...rConfigurationStepsOrchestratorSuite.scala | 7 +- 6 files changed, 112 insertions(+), 18 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/LocalDirectoryMountConfigurationStep.scala diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala index fc08c0ad42f82..0e35e04ff5803 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala @@ -157,12 +157,6 @@ package object config extends Logging { .stringConf .createOptional - private[spark] val KUBERNETES_SHUFFLE_DIR = - ConfigBuilder("spark.kubernetes.shuffle.dir") - .doc("Path to the shared shuffle directories.") - .stringConf - .createOptional - private[spark] val KUBERNETES_SHUFFLE_APISERVER_URI = ConfigBuilder("spark.kubernetes.shuffle.apiServer.url") .doc("URL to the Kubernetes API server that the shuffle service will monitor for Spark pods.") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala index fd251637ce210..2e4c6535a46c7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala @@ -104,6 +104,9 @@ private[spark] class DriverConfigurationStepsOrchestrator( val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep( submissionSparkConf, kubernetesResourceNamePrefix) + val localDirectoryMountConfigurationStep = new LocalDirectoryMountConfigurationStep( + submissionSparkConf) + val pythonStep = mainAppResource match { case PythonMainAppResource(mainPyResource) => Option(new PythonStep(mainPyResource, additionalPythonFiles, filesDownloadPath)) @@ -181,7 +184,8 @@ private[spark] class DriverConfigurationStepsOrchestrator( initialSubmissionStep, driverAddressStep, kubernetesCredentialsStep, - dependencyResolutionStep) ++ + dependencyResolutionStep, + localDirectoryMountConfigurationStep) ++ submittedDependenciesBootstrapSteps ++ pythonStep.toSeq ++ mountSecretsStep.toSeq diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/LocalDirectoryMountConfigurationStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/LocalDirectoryMountConfigurationStep.scala new file mode 100644 index 0000000000000..bfad4e3ec7c63 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/LocalDirectoryMountConfigurationStep.scala @@ -0,0 +1,99 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.submit + +import java.nio.file.Paths +import java.util.UUID + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, VolumeBuilder, VolumeMountBuilder} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.deploy.kubernetes.submit.submitsteps.{DriverConfigurationStep, KubernetesDriverSpec} + +/** + * Configures local directories that the driver and executors should use for temporary storage. + * + * Note that we have different semantics for scratch space in Kubernetes versus the other cluster + * managers. In Kubernetes, we cannot allow the local directories to resolve to the Java temporary + * directory. This is because we will mount either emptyDir volumes for both the driver and + * executors, or hostPath volumes for the executors and an emptyDir for the driver. In either + * case, the mount paths need to be directories that do not exist in the base container images. + * But the Java temporary directory is typically a directory like /tmp which exists in most + * container images. + * + * The solution is twofold: + * - When not using an external shuffle service, a reasonable default is to create a new directory with + * a random name and set that to be the value of `spark.local.dir`. + * - When using the external shuffle service, it is risky to assume that the user intends to mount + * the JVM temporary directory into the pod as a hostPath volume. We therefore enforce that + * spark.local.dir must be set in dynamic allocation mode so that the user explicitly sets the paths + * that have to be mounted. + */ +private[spark] class LocalDirectoryMountConfigurationStep( + submissionSparkConf: SparkConf) extends DriverConfigurationStep { + + override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { + val configuredLocalDirs = submissionSparkConf.getOption("spark.local.dir") + val isUsingExternalShuffle = submissionSparkConf.get( + org.apache.spark.internal.config.SHUFFLE_SERVICE_ENABLED) + val resolvedLocalDirsSingleString = if (isUsingExternalShuffle) { + require(configuredLocalDirs.isDefined, "spark.local.dir must be provided explicitly when" + + " using the external shuffle service in Kubernetes. These directories should map to" + + " the paths that are mounted into the external shuffle service pods.") + configuredLocalDirs.get + } else { + // If we don't use the external shuffle service, local directories should be randomized if + // not provided. + configuredLocalDirs.getOrElse(s"$GENERATED_LOCAL_DIR_MOUNT_ROOT/spark-${UUID.randomUUID}") + } + val resolvedLocalDirs = resolvedLocalDirsSingleString.split(",") + // It's worth noting that we always use an emptyDir volume for the directories on the driver, + // because the driver does not need a hostPath to share its scratch space with any other pod. + // The driver itself will decide on whether to use a hostPath volume or an emptyDir volume for + // these directories on the executors. (see ExecutorPodFactory and + // KubernetesExternalClusterManager) + val localDirVolumes = resolvedLocalDirs.zipWithIndex.map { case (dir, index) => + new VolumeBuilder() + .withName(s"spark-local-dir-$index-${Paths.get(dir).getFileName.toString}") + .withNewEmptyDir().endEmptyDir() + .build() + } + val localDirVolumeMounts = localDirVolumes.zip(resolvedLocalDirs).map { + case (volume, path) => + new VolumeMountBuilder() + .withName(volume.getName) + .withMountPath(path) + .build() + } + val resolvedDriverSparkConf = driverSpec.driverSparkConf.clone().set( + "spark.local.dir", resolvedLocalDirsSingleString) + driverSpec.copy( + driverPod = new PodBuilder(driverSpec.driverPod) + .editSpec() + .addToVolumes(localDirVolumes: _*) + .endSpec() + .build(), + driverContainer = new ContainerBuilder(driverSpec.driverContainer) + .addToVolumeMounts(localDirVolumeMounts: _*) + .build(), + driverSparkConf = resolvedDriverSparkConf + ) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala index efbd4be7146a7..0690ec5c81337 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala @@ -274,9 +274,7 @@ private[spark] class ExecutorPodFactoryImpl( // as much as possible mounting an emptyDir which overlaps with an existing path in // the Docker image, which is very likely what would happen if we tried to mount the // volume at Java's temporary directory path, which is /tmp in many JDKs. - val resolvedLocalDirs = sparkConf.get( - "spark.local.dir", s"$GENERATED_LOCAL_DIR_MOUNT_ROOT/${UUID.randomUUID()}") - .split(",") + val resolvedLocalDirs = Utils.getConfiguredLocalDirs(sparkConf) val localDirVolumes = resolvedLocalDirs.zipWithIndex.map { case (dir, index) => new VolumeBuilder() .withName(s"spark-local-dir-$index-${Paths.get(dir).getFileName.toString}") @@ -301,10 +299,6 @@ private[spark] class ExecutorPodFactoryImpl( .build(), new ContainerBuilder(initBootstrappedExecutorContainer) .addToVolumeMounts(localDirVolumeMounts: _*) - .addNewEnv() - .withName(ENV_SPARK_LOCAL_DIRS) - .withValue(resolvedLocalDirs.mkString(",")) - .endEnv() .build()) } else (executorPodWithNodeAffinity, initBootstrappedExecutorContainer) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExternalShuffleManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExternalShuffleManager.scala index 181f22dfe3dcc..690ebd4e9adb8 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExternalShuffleManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExternalShuffleManager.scala @@ -67,9 +67,7 @@ private[spark] class KubernetesExternalShuffleManagerImpl( s"but no ${KUBERNETES_SHUFFLE_LABELS.key} specified") } private val externalShufflePort = sparkConf.getInt("spark.shuffle.service.port", 7337) - private val shuffleDirs = sparkConf.get(KUBERNETES_SHUFFLE_DIR).map { - _.split(",") - }.getOrElse(Utils.getConfiguredLocalDirs(sparkConf)) + private val shuffleDirs = Utils.getConfiguredLocalDirs(sparkConf) private var shufflePodCache = scala.collection.mutable.Map[String, String]() private var watcher: Watch = _ diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala index 6f5d5e571c443..d67820ccb56bd 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala @@ -52,7 +52,8 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS classOf[BaseDriverConfigurationStep], classOf[DriverAddressConfigurationStep], classOf[DriverKubernetesCredentialsStep], - classOf[DependencyResolutionStep]) + classOf[DependencyResolutionStep], + classOf[LocalDirectoryMountConfigurationStep]) } test("Submission steps with an init-container.") { @@ -76,6 +77,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS classOf[DriverAddressConfigurationStep], classOf[DriverKubernetesCredentialsStep], classOf[DependencyResolutionStep], + classOf[LocalDirectoryMountConfigurationStep], classOf[InitContainerBootstrapStep]) } @@ -98,6 +100,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS classOf[DriverAddressConfigurationStep], classOf[DriverKubernetesCredentialsStep], classOf[DependencyResolutionStep], + classOf[LocalDirectoryMountConfigurationStep], classOf[PythonStep]) } @@ -120,6 +123,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS classOf[DriverAddressConfigurationStep], classOf[DriverKubernetesCredentialsStep], classOf[DependencyResolutionStep], + classOf[LocalDirectoryMountConfigurationStep], classOf[MountSmallLocalFilesStep]) } @@ -144,6 +148,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS classOf[DriverAddressConfigurationStep], classOf[DriverKubernetesCredentialsStep], classOf[DependencyResolutionStep], + classOf[LocalDirectoryMountConfigurationStep], classOf[MountSecretsStep]) } From a6ef915afa02d9caf17d0e750f9e6a66c53d5ad5 Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 10 Oct 2017 17:18:29 -0700 Subject: [PATCH 3/8] Arrange imports --- .../DriverConfigurationStepsOrchestrator.scala | 1 + .../LocalDirectoryMountConfigurationStep.scala | 15 +++++++-------- .../cluster/k8s/ExecutorPodFactory.scala | 4 +++- ...riverConfigurationStepsOrchestratorSuite.scala | 1 + .../cluster/k8s/ExecutorPodFactorySuite.scala | 1 - 5 files changed, 12 insertions(+), 10 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala index 2e4c6535a46c7..76fde6a98a5ef 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala @@ -22,6 +22,7 @@ import org.apache.spark.deploy.k8s.config._ import org.apache.spark.deploy.k8s.constants._ import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep} import org.apache.spark.deploy.k8s.submit.submitsteps.initcontainer.InitContainerConfigurationStepsOrchestrator +import org.apache.spark.deploy.kubernetes.submit.LocalDirectoryMountConfigurationStep import org.apache.spark.launcher.SparkLauncher import org.apache.spark.util.{SystemClock, Utils} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/LocalDirectoryMountConfigurationStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/LocalDirectoryMountConfigurationStep.scala index bfad4e3ec7c63..1e2781bc3ef8c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/LocalDirectoryMountConfigurationStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/LocalDirectoryMountConfigurationStep.scala @@ -1,4 +1,3 @@ - /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -23,9 +22,9 @@ import java.util.UUID import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, VolumeBuilder, VolumeMountBuilder} import org.apache.spark.SparkConf -import org.apache.spark.deploy.kubernetes.config._ -import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.deploy.kubernetes.submit.submitsteps.{DriverConfigurationStep, KubernetesDriverSpec} +import org.apache.spark.deploy.k8s.config._ +import org.apache.spark.deploy.k8s.constants._ +import org.apache.spark.deploy.k8s.submit.submitsteps.{DriverConfigurationStep, KubernetesDriverSpec} /** * Configures local directories that the driver and executors should use for temporary storage. @@ -39,12 +38,12 @@ import org.apache.spark.deploy.kubernetes.submit.submitsteps.{DriverConfiguratio * container images. * * The solution is twofold: - * - When not using an external shuffle service, a reasonable default is to create a new directory with - * a random name and set that to be the value of `spark.local.dir`. + * - When not using an external shuffle service, a reasonable default is to create a new directory + * with a random name and set that to be the value of `spark.local.dir`. * - When using the external shuffle service, it is risky to assume that the user intends to mount * the JVM temporary directory into the pod as a hostPath volume. We therefore enforce that - * spark.local.dir must be set in dynamic allocation mode so that the user explicitly sets the paths - * that have to be mounted. + * spark.local.dir must be set in dynamic allocation mode so that the user explicitly sets the + * paths that have to be mounted. */ private[spark] class LocalDirectoryMountConfigurationStep( submissionSparkConf: SparkConf) extends DriverConfigurationStep { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala index 0690ec5c81337..c30d80d861ac1 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala @@ -20,7 +20,9 @@ import java.io.File import java.nio.file.Paths import java.util.UUID -import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder} +import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder, VolumeBuilder, VolumeMountBuilder} +import org.apache.commons.io.FilenameUtils +import scala.collection.JavaConverters._ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.deploy.k8s.{ConfigurationUtils, InitContainerResourceStagingServerSecretPlugin, PodWithDetachedInitContainer, SparkPodInitContainerBootstrap} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala index d67820ccb56bd..ed291b0ca4972 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.k8s.submit import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.config._ import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep} +import org.apache.spark.deploy.kubernetes.submit.LocalDirectoryMountConfigurationStep private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunSuite { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala index 6690217557637..a0ddaf4f71786 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala @@ -183,7 +183,6 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef val conf = baseConf.clone() conf.set(KUBERNETES_SHUFFLE_LABELS, "label=value") conf.set(KUBERNETES_SHUFFLE_NAMESPACE, "default") - conf.set(KUBERNETES_SHUFFLE_DIR, "/tmp") val shuffleManager = mock(classOf[KubernetesExternalShuffleManager]) when(shuffleManager.getExecutorShuffleDirVolumesWithMounts).thenReturn({ From f2471a6cd70f285e47009d7e650a57bdb7792cd6 Mon Sep 17 00:00:00 2001 From: mcheah Date: Fri, 8 Sep 2017 20:31:37 -0700 Subject: [PATCH 4/8] Fix style and integration tests. --- .../spark/deploy/k8s/integrationtest/KubernetesSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index d6e12df9d1ba2..27041207ffdce 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -150,6 +150,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { sparkConf.setJars(Seq(CONTAINER_LOCAL_HELPER_JAR_PATH)) sparkConf.set("spark.dynamicAllocation.enabled", "true") + sparkConf.set("spark.local.dir", "/tmp") sparkConf.set("spark.shuffle.service.enabled", "true") sparkConf.set("spark.kubernetes.shuffle.labels", "app=spark-shuffle-service") sparkConf.set("spark.kubernetes.shuffle.namespace", kubernetesTestComponents.namespace) From fb414c3054f316e1da831f9d38e9aba1889f38d5 Mon Sep 17 00:00:00 2001 From: mcheah Date: Fri, 8 Sep 2017 20:34:54 -0700 Subject: [PATCH 5/8] Add TODO note for volume types to change. --- .../cluster/k8s/KubernetesExternalShuffleManager.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExternalShuffleManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExternalShuffleManager.scala index 690ebd4e9adb8..388e2b17f4fdd 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExternalShuffleManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExternalShuffleManager.scala @@ -138,6 +138,12 @@ private[spark] class KubernetesExternalShuffleManagerImpl( } override def getExecutorShuffleDirVolumesWithMounts(): Seq[(Volume, VolumeMount)] = { + // TODO: Using hostPath for the local directory will also make it such that the + // other uses of the local directory - broadcasting and caching - will also write + // to the directory that the shuffle service is aware of. It would be better for + // these directories to be separate so that the lifetime of the non-shuffle scratch + // space is tied to an emptyDir instead of the hostPath. This requires a change in + // core Spark as well. shuffleDirs.zipWithIndex.map { case (shuffleDir, shuffleDirIndex) => val volumeName = s"$shuffleDirIndex-${FilenameUtils.getBaseName(shuffleDir)}" From 879207a1bd3f8b727b84b367ce05f7b199ae180f Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 10 Oct 2017 17:37:08 -0700 Subject: [PATCH 6/8] Add unit test and extra documentation. --- conf/kubernetes-shuffle-service.yaml | 4 +- docs/running-on-kubernetes.md | 11 ++- ...DriverConfigurationStepsOrchestrator.scala | 2 +- ...LocalDirectoryMountConfigurationStep.scala | 10 +- .../cluster/k8s/ExecutorPodFactory.scala | 8 +- ...rConfigurationStepsOrchestratorSuite.scala | 3 +- ...DirectoryMountConfigurationStepSuite.scala | 91 +++++++++++++++++++ 7 files changed, 114 insertions(+), 15 deletions(-) rename resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/{kubernetes/submit => k8s/submit/submitsteps}/LocalDirectoryMountConfigurationStep.scala (93%) create mode 100644 resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/LocalDirectoryMountConfigurationStepSuite.scala diff --git a/conf/kubernetes-shuffle-service.yaml b/conf/kubernetes-shuffle-service.yaml index 6f02106ff06c5..b22adb265fd18 100644 --- a/conf/kubernetes-shuffle-service.yaml +++ b/conf/kubernetes-shuffle-service.yaml @@ -32,7 +32,7 @@ spec: volumes: - name: temp-volume hostPath: - path: '/tmp' # change this path according to your cluster configuration. + path: '/tmp/spark-local' # change this path according to your cluster configuration. containers: - name: shuffle # This is an official image that is built @@ -41,7 +41,7 @@ spec: image: kubespark/spark-shuffle:v2.2.0-kubernetes-0.4.0 imagePullPolicy: IfNotPresent volumeMounts: - - mountPath: '/tmp' + - mountPath: '/tmp/spark-local' name: temp-volume # more volumes can be mounted here. # The spark job must be configured to use these diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 3e9096d681642..99d356044b146 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -222,7 +222,7 @@ Below is an example submission: local:///opt/spark/examples/src/main/python/pi.py 100 ``` -## Dynamic Executor Scaling +## Dynamic Allocation in Kubernetes Spark on Kubernetes supports Dynamic Allocation with cluster mode. This mode requires running an external shuffle service. This is typically a [daemonset](https://kubernetes.io/docs/concepts/workloads/controllers/daemonset/) @@ -245,6 +245,7 @@ the command may then look like the following: --class org.apache.spark.examples.GroupByTest \ --master k8s://: \ --kubernetes-namespace default \ + --conf spark.local.dir=/tmp/spark-local --conf spark.app.name=group-by-test \ --conf spark.kubernetes.driver.docker.image=kubespark/spark-driver:latest \ --conf spark.kubernetes.executor.docker.image=kubespark/spark-executor:latest \ @@ -254,6 +255,14 @@ the command may then look like the following: --conf spark.kubernetes.shuffle.labels="app=spark-shuffle-service,spark-version=2.2.0" \ local:///opt/spark/examples/jars/spark-examples_2.11-2.2.0-k8s-0.3.0.jar 10 400000 2 +The external shuffle service has to mount directories that can be shared with the executor pods. The provided example +YAML spec mounts a hostPath volume to the external shuffle service pods, but these hostPath volumes must also be mounted +into the executors. When using the external shuffle service, the directories specified in the `spark.local.dir` +configuration are mounted as hostPath volumes into all of the executor containers. To ensure that one does not +accidentally mount the incorrect hostPath volumes, the value of `spark.local.dir` must be specified in your +application's configuration when using Kubernetes, even though it defaults to the JVM's temporary directory when using +other cluster managers. + ## Advanced ### Securing the Resource Staging Server with TLS diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala index 76fde6a98a5ef..92ec8e5d85260 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala @@ -22,7 +22,7 @@ import org.apache.spark.deploy.k8s.config._ import org.apache.spark.deploy.k8s.constants._ import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep} import org.apache.spark.deploy.k8s.submit.submitsteps.initcontainer.InitContainerConfigurationStepsOrchestrator -import org.apache.spark.deploy.kubernetes.submit.LocalDirectoryMountConfigurationStep +import org.apache.spark.deploy.k8s.submit.submitsteps.LocalDirectoryMountConfigurationStep import org.apache.spark.launcher.SparkLauncher import org.apache.spark.util.{SystemClock, Utils} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/LocalDirectoryMountConfigurationStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/LocalDirectoryMountConfigurationStep.scala similarity index 93% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/LocalDirectoryMountConfigurationStep.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/LocalDirectoryMountConfigurationStep.scala index 1e2781bc3ef8c..3f9ba8af74162 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/LocalDirectoryMountConfigurationStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/LocalDirectoryMountConfigurationStep.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.deploy.kubernetes.submit +package org.apache.spark.deploy.k8s.submit.submitsteps import java.nio.file.Paths import java.util.UUID @@ -22,9 +22,7 @@ import java.util.UUID import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, VolumeBuilder, VolumeMountBuilder} import org.apache.spark.SparkConf -import org.apache.spark.deploy.k8s.config._ import org.apache.spark.deploy.k8s.constants._ -import org.apache.spark.deploy.k8s.submit.submitsteps.{DriverConfigurationStep, KubernetesDriverSpec} /** * Configures local directories that the driver and executors should use for temporary storage. @@ -46,7 +44,9 @@ import org.apache.spark.deploy.k8s.submit.submitsteps.{DriverConfigurationStep, * paths that have to be mounted. */ private[spark] class LocalDirectoryMountConfigurationStep( - submissionSparkConf: SparkConf) extends DriverConfigurationStep { + submissionSparkConf: SparkConf, + randomDirProvider: () => String = () => s"spark-${UUID.randomUUID()}") + extends DriverConfigurationStep { override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { val configuredLocalDirs = submissionSparkConf.getOption("spark.local.dir") @@ -60,7 +60,7 @@ private[spark] class LocalDirectoryMountConfigurationStep( } else { // If we don't use the external shuffle service, local directories should be randomized if // not provided. - configuredLocalDirs.getOrElse(s"$GENERATED_LOCAL_DIR_MOUNT_ROOT/spark-${UUID.randomUUID}") + configuredLocalDirs.getOrElse(s"$GENERATED_LOCAL_DIR_MOUNT_ROOT/${randomDirProvider()}") } val resolvedLocalDirs = resolvedLocalDirsSingleString.split(",") // It's worth noting that we always use an emptyDir volume for the directories on the driver, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala index c30d80d861ac1..d0a39193c634c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala @@ -272,10 +272,10 @@ private[spark] class ExecutorPodFactoryImpl( // If we're not using the external shuffle manager, we should use emptyDir volumes for // shuffle directories since it's important for disk I/O for these directories to be // performant. If the user has not provided a local directory, instead of using the - // Java temporary directory, we create one instead. This is because we want to avoid - // as much as possible mounting an emptyDir which overlaps with an existing path in - // the Docker image, which is very likely what would happen if we tried to mount the - // volume at Java's temporary directory path, which is /tmp in many JDKs. + // Java temporary directory, we create one instead, because we want to avoid + // mounting an emptyDir which overlaps with an existing path in the Docker image. + // Java's temporary directory path is typically /tmp or a similar path, which is + // likely to exist in most images. val resolvedLocalDirs = Utils.getConfiguredLocalDirs(sparkConf) val localDirVolumes = resolvedLocalDirs.zipWithIndex.map { case (dir, index) => new VolumeBuilder() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala index ed291b0ca4972..1199f033cf06a 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala @@ -18,8 +18,7 @@ package org.apache.spark.deploy.k8s.submit import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.config._ -import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep} -import org.apache.spark.deploy.kubernetes.submit.LocalDirectoryMountConfigurationStep +import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, LocalDirectoryMountConfigurationStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep} private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunSuite { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/LocalDirectoryMountConfigurationStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/LocalDirectoryMountConfigurationStepSuite.scala new file mode 100644 index 0000000000000..5ce199a5df857 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/LocalDirectoryMountConfigurationStepSuite.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.submit.submitsteps + +import java.nio.file.Paths + +import scala.collection.JavaConverters._ + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.constants._ +import org.apache.spark.deploy.k8s.submit.submitsteps.{KubernetesDriverSpec, LocalDirectoryMountConfigurationStep} + +private[spark] class LocalDirectoryMountConfigurationStepSuite extends SparkFunSuite { + + test("When using the external shuffle service, the local directories must be provided.") { + val sparkConf = new SparkConf(false) + .set(org.apache.spark.internal.config.SHUFFLE_SERVICE_ENABLED, true) + val configurationStep = new LocalDirectoryMountConfigurationStep(sparkConf) + try { + configurationStep.configureDriver(KubernetesDriverSpec.initialSpec(sparkConf)) + fail("The configuration step should have failed without local dirs.") + } catch { + case e: Throwable => + assert(e.getMessage === "requirement failed: spark.local.dir must be provided explicitly" + + " when using the external shuffle service in Kubernetes. These directories should map" + + " to the paths that are mounted into the external shuffle service pods.") + } + } + + test("When not using the external shuffle service, a random directory should be set" + + " for local dirs if one is not provided.") { + val sparkConf = new SparkConf(false) + .set(org.apache.spark.internal.config.SHUFFLE_SERVICE_ENABLED, false) + val configurationStep = new LocalDirectoryMountConfigurationStep( + sparkConf, () => "local-dir") + val resolvedDriverSpec = configurationStep.configureDriver( + KubernetesDriverSpec.initialSpec(sparkConf)) + testLocalDirsMatch(resolvedDriverSpec, Seq(s"$GENERATED_LOCAL_DIR_MOUNT_ROOT/local-dir")) + } + + test("When not using the external shuffle service, provided local dirs should be mounted as" + + " emptyDirs.") { + val sparkConf = new SparkConf(false) + .set(org.apache.spark.internal.config.SHUFFLE_SERVICE_ENABLED, false) + .set("spark.local.dir", "/mnt/tmp/spark-local,/var/tmp/spark-local") + val configurationStep = new LocalDirectoryMountConfigurationStep( + sparkConf) + val resolvedDriverSpec = configurationStep.configureDriver( + KubernetesDriverSpec.initialSpec(sparkConf)) + testLocalDirsMatch(resolvedDriverSpec, Seq("/mnt/tmp/spark-local", "/var/tmp/spark-local")) + } + + private def testLocalDirsMatch( + resolvedDriverSpec: KubernetesDriverSpec, expectedLocalDirs: Seq[String]): Unit = { + assert(resolvedDriverSpec.driverSparkConf.get("spark.local.dir").split(",") === + expectedLocalDirs) + expectedLocalDirs + .zip(resolvedDriverSpec.driverPod.getSpec.getVolumes.asScala) + .zipWithIndex + .foreach { + case ((dir, volume), index) => + assert(volume.getEmptyDir != null) + val fileName = Paths.get(dir).getFileName.toString + assert(volume.getName === s"spark-local-dir-$index-$fileName") + } + + expectedLocalDirs + .zip(resolvedDriverSpec.driverContainer.getVolumeMounts.asScala) + .zipWithIndex + .foreach { + case ((dir, volumeMount), index) => + val fileName = Paths.get(dir).getFileName.toString + assert(volumeMount.getName === s"spark-local-dir-$index-$fileName") + assert(volumeMount.getMountPath === dir) + } + } +} From 02b283e09c9fc15db8c0e1090be417eeff0d2029 Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 16 Oct 2017 14:39:34 -0700 Subject: [PATCH 7/8] Fix existing unit tests and add tests for empty dir volumes --- .../k8s/ExecutorLocalDirVolumeProvider.scala | 61 +++++++++ .../cluster/k8s/ExecutorPodFactory.scala | 53 +------- .../k8s/KubernetesClusterManager.scala | 4 +- .../ExecutorLocalDirVolumeProviderSuite.scala | 64 +++++++++ .../cluster/k8s/ExecutorPodFactorySuite.scala | 122 +++++++++--------- 5 files changed, 192 insertions(+), 112 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLocalDirVolumeProvider.scala create mode 100644 resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLocalDirVolumeProviderSuite.scala diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLocalDirVolumeProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLocalDirVolumeProvider.scala new file mode 100644 index 0000000000000..2b35fd6a513f5 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLocalDirVolumeProvider.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.nio.file.Paths + +import io.fabric8.kubernetes.api.model.{Volume, VolumeBuilder, VolumeMount, VolumeMountBuilder} + +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +private[spark] trait ExecutorLocalDirVolumeProvider { + def getExecutorLocalDirVolumesWithMounts: Seq[(Volume, VolumeMount)] +} + +private[spark] class ExecutorLocalDirVolumeProviderImpl( + sparkConf: SparkConf, + kubernetesExternalShuffleManager: Option[KubernetesExternalShuffleManager]) + extends ExecutorLocalDirVolumeProvider { + override def getExecutorLocalDirVolumesWithMounts: Seq[(Volume, VolumeMount)] = { + kubernetesExternalShuffleManager.map(_.getExecutorShuffleDirVolumesWithMounts) + .getOrElse { + // If we're not using the external shuffle manager, we should use emptyDir volumes for + // shuffle directories since it's important for disk I/O for these directories to be + // performant. If the user has not provided a local directory, instead of using the + // Java temporary directory, we create one instead, because we want to avoid + // mounting an emptyDir which overlaps with an existing path in the Docker image. + // Java's temporary directory path is typically /tmp or a similar path, which is + // likely to exist in most images. + val resolvedLocalDirs = Utils.getConfiguredLocalDirs(sparkConf) + val localDirVolumes = resolvedLocalDirs.zipWithIndex.map { case (dir, index) => + new VolumeBuilder() + .withName(s"spark-local-dir-$index-${Paths.get(dir).getFileName.toString}") + .withNewEmptyDir().endEmptyDir() + .build() + } + val localDirVolumeMounts = localDirVolumes.zip(resolvedLocalDirs).map { + case (volume, path) => + new VolumeMountBuilder() + .withName(volume.getName) + .withMountPath(path) + .build() + } + localDirVolumes.zip(localDirVolumeMounts) + } + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala index d0a39193c634c..98a0d879b6a58 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala @@ -16,12 +16,7 @@ */ package org.apache.spark.scheduler.cluster.k8s -import java.io.File -import java.nio.file.Paths -import java.util.UUID - import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder, VolumeBuilder, VolumeMountBuilder} -import org.apache.commons.io.FilenameUtils import scala.collection.JavaConverters._ import org.apache.spark.{SparkConf, SparkException} @@ -50,7 +45,7 @@ private[spark] class ExecutorPodFactoryImpl( mountSmallFilesBootstrap: Option[MountSmallFilesBootstrap], executorInitContainerBootstrap: Option[SparkPodInitContainerBootstrap], executorMountInitContainerSecretPlugin: Option[InitContainerResourceStagingServerSecretPlugin], - shuffleManager: Option[KubernetesExternalShuffleManager]) + executorLocalDirVolumeProvider: ExecutorLocalDirVolumeProvider) extends ExecutorPodFactory { import ExecutorPodFactoryImpl._ @@ -179,9 +174,8 @@ private[spark] class ExecutorPodFactoryImpl( .withContainerPort(port._2) .build() }) - val shuffleVolumesWithMounts = - shuffleManager.map(_.getExecutorShuffleDirVolumesWithMounts) - .getOrElse(Seq.empty) + val shuffleVolumesWithMounts = executorLocalDirVolumeProvider + .getExecutorLocalDirVolumesWithMounts val executorContainer = new ContainerBuilder() .withName(s"executor") @@ -267,46 +261,9 @@ private[spark] class ExecutorPodFactoryImpl( nodeAffinityExecutorPodModifier.addNodeAffinityAnnotationIfUseful( executorPodWithInitContainer, nodeToLocalTaskCount) - val (executorPodWithTempLocalDirs, executorContainerWithTempLocalDirs) = - if (shuffleManager.isEmpty) { - // If we're not using the external shuffle manager, we should use emptyDir volumes for - // shuffle directories since it's important for disk I/O for these directories to be - // performant. If the user has not provided a local directory, instead of using the - // Java temporary directory, we create one instead, because we want to avoid - // mounting an emptyDir which overlaps with an existing path in the Docker image. - // Java's temporary directory path is typically /tmp or a similar path, which is - // likely to exist in most images. - val resolvedLocalDirs = Utils.getConfiguredLocalDirs(sparkConf) - val localDirVolumes = resolvedLocalDirs.zipWithIndex.map { case (dir, index) => - new VolumeBuilder() - .withName(s"spark-local-dir-$index-${Paths.get(dir).getFileName.toString}") - .withNewEmptyDir().endEmptyDir() - .build() - } - val localDirVolumeMounts = localDirVolumes.zip(resolvedLocalDirs).map { - case (volume, path) => - new VolumeMountBuilder() - .withName(volume.getName) - .withMountPath(path) - .build() - } - // Setting the SPARK_LOCAL_DIRS environment variable will force the executor to use the - // generated directory if the user did not provide one, as opposed to using the Java - // temporary directory. This also overrides the value of spark.local.dir in SparkConf, - // which is intended. See Utils#getConfiguredLocalDirs(). - (new PodBuilder(executorPodWithNodeAffinity) - .editSpec() - .addToVolumes(localDirVolumes: _*) - .endSpec() - .build(), - new ContainerBuilder(initBootstrappedExecutorContainer) - .addToVolumeMounts(localDirVolumeMounts: _*) - .build()) - } else (executorPodWithNodeAffinity, initBootstrappedExecutorContainer) - - new PodBuilder(executorPodWithTempLocalDirs) + new PodBuilder(executorPodWithNodeAffinity) .editSpec() - .addToContainers(executorContainerWithTempLocalDirs) + .addToContainers(initBootstrappedExecutorContainer) .endSpec() .build() } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index 587420611544a..60260e2931c29 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -125,6 +125,8 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit kubernetesExternalShuffleClient)) } else None + val executorLocalDirVolumeProvider = new ExecutorLocalDirVolumeProviderImpl( + sparkConf, kubernetesShuffleManager) val executorPodFactory = new ExecutorPodFactoryImpl( sparkConf, NodeAffinityExecutorPodModifierImpl, @@ -132,7 +134,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit mountSmallFilesBootstrap, executorInitContainerBootstrap, executorInitContainerSecretVolumePlugin, - kubernetesShuffleManager) + executorLocalDirVolumeProvider) val allocatorExecutor = ThreadUtils .newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator") val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool( diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLocalDirVolumeProviderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLocalDirVolumeProviderSuite.scala new file mode 100644 index 0000000000000..f3baf5b9f739a --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLocalDirVolumeProviderSuite.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import io.fabric8.kubernetes.api.model.{VolumeBuilder, VolumeMountBuilder} +import org.mockito.Mockito.{verify, verifyNoMoreInteractions, when} +import org.scalatest.mock.MockitoSugar.mock + +import org.apache.spark.{SparkConf, SparkFunSuite} + +class ExecutorLocalDirVolumeProviderSuite extends SparkFunSuite { + + test("Delegates to the external shuffle manager implementation if present.") { + val externalShuffleManager = mock[KubernetesExternalShuffleManager] + val mockVolume = new VolumeBuilder() + .withName("local-dir") + .withNewHostPath("/tmp/spark-local-dirs") + .build() + val mockVolumeMount = new VolumeMountBuilder() + .withName("local-dir") + .withMountPath("/tmp/spark-local-dirs-mount") + .build() + when(externalShuffleManager.getExecutorShuffleDirVolumesWithMounts) + .thenReturn(Seq((mockVolume, mockVolumeMount))) + val volumeProvider = new ExecutorLocalDirVolumeProviderImpl( + new SparkConf(false), Some(externalShuffleManager)) + assert(volumeProvider.getExecutorLocalDirVolumesWithMounts === + Seq((mockVolume, mockVolumeMount))) + verify(externalShuffleManager).getExecutorShuffleDirVolumesWithMounts + verifyNoMoreInteractions(externalShuffleManager) + } + + test("Provides EmptyDir volumes for each local dir in spark.local.dirs.") { + val localDirs = Seq("/tmp/test-local-dir-1", "/tmp/test-local-dir-2") + val sparkConf = new SparkConf(false).set("spark.local.dir", localDirs.mkString(",")) + val volumeProvider = new ExecutorLocalDirVolumeProviderImpl(sparkConf, None) + val localDirVolumesWithMounts = volumeProvider.getExecutorLocalDirVolumesWithMounts + assert(localDirVolumesWithMounts.size === 2) + localDirVolumesWithMounts.zip(localDirs).zipWithIndex.foreach { + case (((localDirVolume, localDirVolumeMount), expectedDirMountPath), index) => + val dirName = expectedDirMountPath.substring( + expectedDirMountPath.lastIndexOf('/') + 1, expectedDirMountPath.length) + assert(localDirVolume.getName === s"spark-local-dir-$index-$dirName") + assert(localDirVolume.getEmptyDir != null) + assert(localDirVolumeMount.getName === localDirVolume.getName) + assert(localDirVolumeMount.getMountPath === expectedDirMountPath) + case unknown => throw new IllegalArgumentException("Unexpected object: $unknown") + } + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala index a0ddaf4f71786..bb09cb801b5a9 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala @@ -16,21 +16,18 @@ */ package org.apache.spark.scheduler.cluster.k8s -import scala.collection.JavaConverters._ - -import io.fabric8.kubernetes.api.model.{Pod, VolumeBuilder, VolumeMountBuilder, _} +import io.fabric8.kubernetes.api.model.{Pod, PodBuilder, VolumeBuilder, VolumeMountBuilder} import io.fabric8.kubernetes.client.KubernetesClient -import org.apache.commons.io.FilenameUtils -import org.mockito.{AdditionalAnswers, MockitoAnnotations} -import org.mockito.Matchers.{any, eq => mockitoEq} +import org.mockito.{AdditionalAnswers, Mock, Mockito, MockitoAnnotations} +import org.mockito.Matchers.any import org.mockito.Mockito._ -import org.mockito.invocation.InvocationOnMock -import org.mockito.stubbing.Answer import org.scalatest.{BeforeAndAfter, BeforeAndAfterEach} +import scala.collection.JavaConverters._ import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.k8s.{constants, PodWithDetachedInitContainer, SparkPodInitContainerBootstrap} +import org.apache.spark.deploy.k8s.{PodWithDetachedInitContainer, SparkPodInitContainerBootstrap} import org.apache.spark.deploy.k8s.config._ +import org.apache.spark.deploy.k8s.constants._ import org.apache.spark.deploy.k8s.submit.{MountSecretsBootstrapImpl, MountSmallFilesBootstrap, MountSmallFilesBootstrapImpl} class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with BeforeAndAfterEach { @@ -52,7 +49,12 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef .endStatus() .build() private var baseConf: SparkConf = _ - private val nodeAffinityExecutorPodModifier = mock(classOf[NodeAffinityExecutorPodModifier]) + + @Mock + private var nodeAffinityExecutorPodModifier: NodeAffinityExecutorPodModifier = _ + + @Mock + private var executorLocalDirVolumeProvider: ExecutorLocalDirVolumeProvider = _ before { MockitoAnnotations.initMocks(this) @@ -60,15 +62,12 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef .set(KUBERNETES_DRIVER_POD_NAME, driverPodName) .set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, executorPrefix) .set(EXECUTOR_DOCKER_IMAGE, executorImage) - } - private var kubernetesClient: KubernetesClient = _ - - override def beforeEach(cmap: org.scalatest.ConfigMap) { - reset(nodeAffinityExecutorPodModifier) when(nodeAffinityExecutorPodModifier.addNodeAffinityAnnotationIfUseful( any(classOf[Pod]), any(classOf[Map[String, Int]]))).thenAnswer(AdditionalAnswers.returnsFirstArg()) + when(executorLocalDirVolumeProvider.getExecutorLocalDirVolumesWithMounts).thenReturn(Seq.empty) } + private var kubernetesClient: KubernetesClient = _ test("basic executor pod has reasonable defaults") { val factory = new ExecutorPodFactoryImpl( @@ -78,7 +77,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, None, None, - None) + executorLocalDirVolumeProvider) val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) @@ -112,7 +111,13 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef "loremipsumdolorsitametvimatelitrefficiendisuscipianturvixlegeresple") val factory = new ExecutorPodFactoryImpl( - conf, nodeAffinityExecutorPodModifier, None, None, None, None, None) + conf, + nodeAffinityExecutorPodModifier, + None, + None, + None, + None, + executorLocalDirVolumeProvider) val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) @@ -133,7 +138,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, None, None, - None) + executorLocalDirVolumeProvider) val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) @@ -167,7 +172,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, Some(initContainerBootstrap), None, - None) + executorLocalDirVolumeProvider) val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) @@ -175,52 +180,37 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef .addNodeAffinityAnnotationIfUseful(any(classOf[Pod]), any(classOf[Map[String, Int]])) assert(executor.getMetadata.getAnnotations.size() === 1) - assert(executor.getMetadata.getAnnotations.containsKey(constants.INIT_CONTAINER_ANNOTATION)) + assert(executor.getMetadata.getAnnotations.containsKey(INIT_CONTAINER_ANNOTATION)) checkOwnerReferences(executor, driverPodUid) } - test("the shuffle-service adds a volume mount") { - val conf = baseConf.clone() - conf.set(KUBERNETES_SHUFFLE_LABELS, "label=value") - conf.set(KUBERNETES_SHUFFLE_NAMESPACE, "default") - - val shuffleManager = mock(classOf[KubernetesExternalShuffleManager]) - when(shuffleManager.getExecutorShuffleDirVolumesWithMounts).thenReturn({ - val shuffleDirs = Seq("/tmp") - shuffleDirs.zipWithIndex.map { case (shuffleDir, shuffleDirIndex) => - val volumeName = s"$shuffleDirIndex-${FilenameUtils.getBaseName(shuffleDir)}" - val volume = new VolumeBuilder() - .withName(volumeName) - .withNewHostPath(shuffleDir) - .build() - val volumeMount = new VolumeMountBuilder() - .withName(volumeName) - .withMountPath(shuffleDir) - .build() - (volume, volumeMount) - } - }) - + test("The local dir volume provider's returned volumes and volume mounts should be added.") { + Mockito.reset(executorLocalDirVolumeProvider) + val localDirVolume = new VolumeBuilder() + .withName("local-dir") + .withNewEmptyDir().endEmptyDir() + .build() + val localDirVolumeMount = new VolumeMountBuilder() + .withName("local-dir") + .withMountPath("/tmp") + .build() + when(executorLocalDirVolumeProvider.getExecutorLocalDirVolumesWithMounts) + .thenReturn(Seq((localDirVolume, localDirVolumeMount))) val factory = new ExecutorPodFactoryImpl( - conf, + baseConf, nodeAffinityExecutorPodModifier, None, None, None, None, - Some(shuffleManager)) + executorLocalDirVolumeProvider) val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) - - verify(nodeAffinityExecutorPodModifier, times(1)) - .addNodeAffinityAnnotationIfUseful(any(classOf[Pod]), any(classOf[Map[String, Int]])) - + assert(executor.getSpec.getVolumes.size === 1) + assert(executor.getSpec.getVolumes.contains(localDirVolume)) assert(executor.getSpec.getContainers.size() === 1) - assert(executor.getSpec.getContainers.get(0).getVolumeMounts.size() === 1) - assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0).getName === "0-tmp") - assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0) - .getMountPath === "/tmp") - checkOwnerReferences(executor, driverPodUid) + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.size === 1) + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.contains(localDirVolumeMount)) } test("Small-files add a secret & secret volume mount to the container") { @@ -234,7 +224,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef Some(smallFiles), None, None, - None) + executorLocalDirVolumeProvider) val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) @@ -261,7 +251,13 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef conf.set(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH, "bar=baz") val factory = new ExecutorPodFactoryImpl( - conf, nodeAffinityExecutorPodModifier, None, None, None, None, None) + conf, + nodeAffinityExecutorPodModifier, + None, + None, + None, + None, + executorLocalDirVolumeProvider) val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)]("qux" -> "quux"), driverPod, Map[String, Int]()) @@ -285,14 +281,14 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef // Check that the expected environment variables are present. private def checkEnv(executor: Pod, additionalEnvVars: Map[String, String]): Unit = { val defaultEnvs = Map( - constants.ENV_EXECUTOR_ID -> "1", - constants.ENV_DRIVER_URL -> "dummy", - constants.ENV_EXECUTOR_CORES -> "1", - constants.ENV_EXECUTOR_MEMORY -> "1g", - constants.ENV_APPLICATION_ID -> "dummy", - constants.ENV_MOUNTED_CLASSPATH -> "/var/spark-data/spark-jars/*", - constants.ENV_EXECUTOR_POD_IP -> null, - constants.ENV_EXECUTOR_PORT -> "10000") ++ additionalEnvVars + ENV_EXECUTOR_ID -> "1", + ENV_DRIVER_URL -> "dummy", + ENV_EXECUTOR_CORES -> "1", + ENV_EXECUTOR_MEMORY -> "1g", + ENV_APPLICATION_ID -> "dummy", + ENV_MOUNTED_CLASSPATH -> "/var/spark-data/spark-jars/*", + ENV_EXECUTOR_POD_IP -> null, + ENV_EXECUTOR_PORT -> "10000") ++ additionalEnvVars assert(executor.getSpec.getContainers.size() === 1) assert(executor.getSpec.getContainers.get(0).getEnv().size() === defaultEnvs.size) From 463cf3fa55e8a7f751533173a10b6e6f21fd9f66 Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 16 Oct 2017 14:42:12 -0700 Subject: [PATCH 8/8] Remove extraneous constant --- .../src/main/scala/org/apache/spark/deploy/k8s/constants.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala index cee81b3808d7e..95d7f284f86da 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala @@ -71,7 +71,6 @@ package object constants { private[spark] val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY" private[spark] val ENV_JAVA_OPT_PREFIX = "SPARK_JAVA_OPT_" private[spark] val ENV_MOUNTED_FILES_FROM_SECRET_DIR = "SPARK_MOUNTED_FILES_FROM_SECRET_DIR" - private[spark] val ENV_SPARK_LOCAL_DIRS = "SPARK_LOCAL_DIRS" // Bootstrapping dependencies with the init-container private[spark] val INIT_CONTAINER_ANNOTATION = "pod.beta.kubernetes.io/init-containers"