diff --git a/conf/kubernetes-shuffle-service.yaml b/conf/kubernetes-shuffle-service.yaml index 6f02106ff06c5..b22adb265fd18 100644 --- a/conf/kubernetes-shuffle-service.yaml +++ b/conf/kubernetes-shuffle-service.yaml @@ -32,7 +32,7 @@ spec: volumes: - name: temp-volume hostPath: - path: '/tmp' # change this path according to your cluster configuration. + path: '/tmp/spark-local' # change this path according to your cluster configuration. containers: - name: shuffle # This is an official image that is built @@ -41,7 +41,7 @@ spec: image: kubespark/spark-shuffle:v2.2.0-kubernetes-0.4.0 imagePullPolicy: IfNotPresent volumeMounts: - - mountPath: '/tmp' + - mountPath: '/tmp/spark-local' name: temp-volume # more volumes can be mounted here. # The spark job must be configured to use these diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 3e9096d681642..99d356044b146 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -222,7 +222,7 @@ Below is an example submission: local:///opt/spark/examples/src/main/python/pi.py 100 ``` -## Dynamic Executor Scaling +## Dynamic Allocation in Kubernetes Spark on Kubernetes supports Dynamic Allocation with cluster mode. This mode requires running an external shuffle service. This is typically a [daemonset](https://kubernetes.io/docs/concepts/workloads/controllers/daemonset/) @@ -245,6 +245,7 @@ the command may then look like the following: --class org.apache.spark.examples.GroupByTest \ --master k8s://: \ --kubernetes-namespace default \ + --conf spark.local.dir=/tmp/spark-local --conf spark.app.name=group-by-test \ --conf spark.kubernetes.driver.docker.image=kubespark/spark-driver:latest \ --conf spark.kubernetes.executor.docker.image=kubespark/spark-executor:latest \ @@ -254,6 +255,14 @@ the command may then look like the following: --conf spark.kubernetes.shuffle.labels="app=spark-shuffle-service,spark-version=2.2.0" \ local:///opt/spark/examples/jars/spark-examples_2.11-2.2.0-k8s-0.3.0.jar 10 400000 2 +The external shuffle service has to mount directories that can be shared with the executor pods. The provided example +YAML spec mounts a hostPath volume to the external shuffle service pods, but these hostPath volumes must also be mounted +into the executors. When using the external shuffle service, the directories specified in the `spark.local.dir` +configuration are mounted as hostPath volumes into all of the executor containers. To ensure that one does not +accidentally mount the incorrect hostPath volumes, the value of `spark.local.dir` must be specified in your +application's configuration when using Kubernetes, even though it defaults to the JVM's temporary directory when using +other cluster managers. + ## Advanced ### Securing the Resource Staging Server with TLS diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala index fc08c0ad42f82..0e35e04ff5803 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala @@ -157,12 +157,6 @@ package object config extends Logging { .stringConf .createOptional - private[spark] val KUBERNETES_SHUFFLE_DIR = - ConfigBuilder("spark.kubernetes.shuffle.dir") - .doc("Path to the shared shuffle directories.") - .stringConf - .createOptional - private[spark] val KUBERNETES_SHUFFLE_APISERVER_URI = ConfigBuilder("spark.kubernetes.shuffle.apiServer.url") .doc("URL to the Kubernetes API server that the shuffle service will monitor for Spark pods.") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala index d8aec7d3c5bd7..95d7f284f86da 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala @@ -102,4 +102,5 @@ package object constants { private[spark] val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc" private[spark] val MEMORY_OVERHEAD_FACTOR = 0.10 private[spark] val MEMORY_OVERHEAD_MIN_MIB = 384L + private[spark] val GENERATED_LOCAL_DIR_MOUNT_ROOT = "/mnt/tmp/spark-local" } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala index fd251637ce210..92ec8e5d85260 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala @@ -22,6 +22,7 @@ import org.apache.spark.deploy.k8s.config._ import org.apache.spark.deploy.k8s.constants._ import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep} import org.apache.spark.deploy.k8s.submit.submitsteps.initcontainer.InitContainerConfigurationStepsOrchestrator +import org.apache.spark.deploy.k8s.submit.submitsteps.LocalDirectoryMountConfigurationStep import org.apache.spark.launcher.SparkLauncher import org.apache.spark.util.{SystemClock, Utils} @@ -104,6 +105,9 @@ private[spark] class DriverConfigurationStepsOrchestrator( val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep( submissionSparkConf, kubernetesResourceNamePrefix) + val localDirectoryMountConfigurationStep = new LocalDirectoryMountConfigurationStep( + submissionSparkConf) + val pythonStep = mainAppResource match { case PythonMainAppResource(mainPyResource) => Option(new PythonStep(mainPyResource, additionalPythonFiles, filesDownloadPath)) @@ -181,7 +185,8 @@ private[spark] class DriverConfigurationStepsOrchestrator( initialSubmissionStep, driverAddressStep, kubernetesCredentialsStep, - dependencyResolutionStep) ++ + dependencyResolutionStep, + localDirectoryMountConfigurationStep) ++ submittedDependenciesBootstrapSteps ++ pythonStep.toSeq ++ mountSecretsStep.toSeq diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/LocalDirectoryMountConfigurationStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/LocalDirectoryMountConfigurationStep.scala new file mode 100644 index 0000000000000..3f9ba8af74162 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/LocalDirectoryMountConfigurationStep.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.submitsteps + +import java.nio.file.Paths +import java.util.UUID + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, VolumeBuilder, VolumeMountBuilder} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.constants._ + +/** + * Configures local directories that the driver and executors should use for temporary storage. + * + * Note that we have different semantics for scratch space in Kubernetes versus the other cluster + * managers. In Kubernetes, we cannot allow the local directories to resolve to the Java temporary + * directory. This is because we will mount either emptyDir volumes for both the driver and + * executors, or hostPath volumes for the executors and an emptyDir for the driver. In either + * case, the mount paths need to be directories that do not exist in the base container images. + * But the Java temporary directory is typically a directory like /tmp which exists in most + * container images. + * + * The solution is twofold: + * - When not using an external shuffle service, a reasonable default is to create a new directory + * with a random name and set that to be the value of `spark.local.dir`. + * - When using the external shuffle service, it is risky to assume that the user intends to mount + * the JVM temporary directory into the pod as a hostPath volume. We therefore enforce that + * spark.local.dir must be set in dynamic allocation mode so that the user explicitly sets the + * paths that have to be mounted. + */ +private[spark] class LocalDirectoryMountConfigurationStep( + submissionSparkConf: SparkConf, + randomDirProvider: () => String = () => s"spark-${UUID.randomUUID()}") + extends DriverConfigurationStep { + + override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { + val configuredLocalDirs = submissionSparkConf.getOption("spark.local.dir") + val isUsingExternalShuffle = submissionSparkConf.get( + org.apache.spark.internal.config.SHUFFLE_SERVICE_ENABLED) + val resolvedLocalDirsSingleString = if (isUsingExternalShuffle) { + require(configuredLocalDirs.isDefined, "spark.local.dir must be provided explicitly when" + + " using the external shuffle service in Kubernetes. These directories should map to" + + " the paths that are mounted into the external shuffle service pods.") + configuredLocalDirs.get + } else { + // If we don't use the external shuffle service, local directories should be randomized if + // not provided. + configuredLocalDirs.getOrElse(s"$GENERATED_LOCAL_DIR_MOUNT_ROOT/${randomDirProvider()}") + } + val resolvedLocalDirs = resolvedLocalDirsSingleString.split(",") + // It's worth noting that we always use an emptyDir volume for the directories on the driver, + // because the driver does not need a hostPath to share its scratch space with any other pod. + // The driver itself will decide on whether to use a hostPath volume or an emptyDir volume for + // these directories on the executors. (see ExecutorPodFactory and + // KubernetesExternalClusterManager) + val localDirVolumes = resolvedLocalDirs.zipWithIndex.map { case (dir, index) => + new VolumeBuilder() + .withName(s"spark-local-dir-$index-${Paths.get(dir).getFileName.toString}") + .withNewEmptyDir().endEmptyDir() + .build() + } + val localDirVolumeMounts = localDirVolumes.zip(resolvedLocalDirs).map { + case (volume, path) => + new VolumeMountBuilder() + .withName(volume.getName) + .withMountPath(path) + .build() + } + val resolvedDriverSparkConf = driverSpec.driverSparkConf.clone().set( + "spark.local.dir", resolvedLocalDirsSingleString) + driverSpec.copy( + driverPod = new PodBuilder(driverSpec.driverPod) + .editSpec() + .addToVolumes(localDirVolumes: _*) + .endSpec() + .build(), + driverContainer = new ContainerBuilder(driverSpec.driverContainer) + .addToVolumeMounts(localDirVolumeMounts: _*) + .build(), + driverSparkConf = resolvedDriverSparkConf + ) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLocalDirVolumeProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLocalDirVolumeProvider.scala new file mode 100644 index 0000000000000..2b35fd6a513f5 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLocalDirVolumeProvider.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.nio.file.Paths + +import io.fabric8.kubernetes.api.model.{Volume, VolumeBuilder, VolumeMount, VolumeMountBuilder} + +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +private[spark] trait ExecutorLocalDirVolumeProvider { + def getExecutorLocalDirVolumesWithMounts: Seq[(Volume, VolumeMount)] +} + +private[spark] class ExecutorLocalDirVolumeProviderImpl( + sparkConf: SparkConf, + kubernetesExternalShuffleManager: Option[KubernetesExternalShuffleManager]) + extends ExecutorLocalDirVolumeProvider { + override def getExecutorLocalDirVolumesWithMounts: Seq[(Volume, VolumeMount)] = { + kubernetesExternalShuffleManager.map(_.getExecutorShuffleDirVolumesWithMounts) + .getOrElse { + // If we're not using the external shuffle manager, we should use emptyDir volumes for + // shuffle directories since it's important for disk I/O for these directories to be + // performant. If the user has not provided a local directory, instead of using the + // Java temporary directory, we create one instead, because we want to avoid + // mounting an emptyDir which overlaps with an existing path in the Docker image. + // Java's temporary directory path is typically /tmp or a similar path, which is + // likely to exist in most images. + val resolvedLocalDirs = Utils.getConfiguredLocalDirs(sparkConf) + val localDirVolumes = resolvedLocalDirs.zipWithIndex.map { case (dir, index) => + new VolumeBuilder() + .withName(s"spark-local-dir-$index-${Paths.get(dir).getFileName.toString}") + .withNewEmptyDir().endEmptyDir() + .build() + } + val localDirVolumeMounts = localDirVolumes.zip(resolvedLocalDirs).map { + case (volume, path) => + new VolumeMountBuilder() + .withName(volume.getName) + .withMountPath(path) + .build() + } + localDirVolumes.zip(localDirVolumeMounts) + } + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala index 6f4ba1c8b888f..98a0d879b6a58 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala @@ -16,10 +16,9 @@ */ package org.apache.spark.scheduler.cluster.k8s +import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder, VolumeBuilder, VolumeMountBuilder} import scala.collection.JavaConverters._ -import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder} - import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.deploy.k8s.{ConfigurationUtils, InitContainerResourceStagingServerSecretPlugin, PodWithDetachedInitContainer, SparkPodInitContainerBootstrap} import org.apache.spark.deploy.k8s.config._ @@ -46,7 +45,7 @@ private[spark] class ExecutorPodFactoryImpl( mountSmallFilesBootstrap: Option[MountSmallFilesBootstrap], executorInitContainerBootstrap: Option[SparkPodInitContainerBootstrap], executorMountInitContainerSecretPlugin: Option[InitContainerResourceStagingServerSecretPlugin], - shuffleManager: Option[KubernetesExternalShuffleManager]) + executorLocalDirVolumeProvider: ExecutorLocalDirVolumeProvider) extends ExecutorPodFactory { import ExecutorPodFactoryImpl._ @@ -175,9 +174,8 @@ private[spark] class ExecutorPodFactoryImpl( .withContainerPort(port._2) .build() }) - val shuffleVolumesWithMounts = - shuffleManager.map(_.getExecutorShuffleDirVolumesWithMounts) - .getOrElse(Seq.empty) + val shuffleVolumesWithMounts = executorLocalDirVolumeProvider + .getExecutorLocalDirVolumesWithMounts val executorContainer = new ContainerBuilder() .withName(s"executor") @@ -262,6 +260,7 @@ private[spark] class ExecutorPodFactoryImpl( val executorPodWithNodeAffinity = nodeAffinityExecutorPodModifier.addNodeAffinityAnnotationIfUseful( executorPodWithInitContainer, nodeToLocalTaskCount) + new PodBuilder(executorPodWithNodeAffinity) .editSpec() .addToContainers(initBootstrappedExecutorContainer) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index cd92df439a7e6..60260e2931c29 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -113,17 +113,20 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)), Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH))) - val kubernetesShuffleManager = if (Utils.isDynamicAllocationEnabled(sparkConf)) { + val kubernetesShuffleManager = if (sparkConf.get( + org.apache.spark.internal.config.SHUFFLE_SERVICE_ENABLED)) { val kubernetesExternalShuffleClient = new KubernetesExternalShuffleClientImpl( - SparkTransportConf.fromSparkConf(sparkConf, "shuffle"), - sc.env.securityManager, - sc.env.securityManager.isAuthenticationEnabled()) + SparkTransportConf.fromSparkConf(sparkConf, "shuffle"), + sc.env.securityManager, + sc.env.securityManager.isAuthenticationEnabled()) Some(new KubernetesExternalShuffleManagerImpl( - sparkConf, - kubernetesClient, - kubernetesExternalShuffleClient)) + sparkConf, + kubernetesClient, + kubernetesExternalShuffleClient)) } else None + val executorLocalDirVolumeProvider = new ExecutorLocalDirVolumeProviderImpl( + sparkConf, kubernetesShuffleManager) val executorPodFactory = new ExecutorPodFactoryImpl( sparkConf, NodeAffinityExecutorPodModifierImpl, @@ -131,7 +134,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit mountSmallFilesBootstrap, executorInitContainerBootstrap, executorInitContainerSecretVolumePlugin, - kubernetesShuffleManager) + executorLocalDirVolumeProvider) val allocatorExecutor = ThreadUtils .newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator") val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool( diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExternalShuffleManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExternalShuffleManager.scala index 181f22dfe3dcc..388e2b17f4fdd 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExternalShuffleManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExternalShuffleManager.scala @@ -67,9 +67,7 @@ private[spark] class KubernetesExternalShuffleManagerImpl( s"but no ${KUBERNETES_SHUFFLE_LABELS.key} specified") } private val externalShufflePort = sparkConf.getInt("spark.shuffle.service.port", 7337) - private val shuffleDirs = sparkConf.get(KUBERNETES_SHUFFLE_DIR).map { - _.split(",") - }.getOrElse(Utils.getConfiguredLocalDirs(sparkConf)) + private val shuffleDirs = Utils.getConfiguredLocalDirs(sparkConf) private var shufflePodCache = scala.collection.mutable.Map[String, String]() private var watcher: Watch = _ @@ -140,6 +138,12 @@ private[spark] class KubernetesExternalShuffleManagerImpl( } override def getExecutorShuffleDirVolumesWithMounts(): Seq[(Volume, VolumeMount)] = { + // TODO: Using hostPath for the local directory will also make it such that the + // other uses of the local directory - broadcasting and caching - will also write + // to the directory that the shuffle service is aware of. It would be better for + // these directories to be separate so that the lifetime of the non-shuffle scratch + // space is tied to an emptyDir instead of the hostPath. This requires a change in + // core Spark as well. shuffleDirs.zipWithIndex.map { case (shuffleDir, shuffleDirIndex) => val volumeName = s"$shuffleDirIndex-${FilenameUtils.getBaseName(shuffleDir)}" diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala index 6f5d5e571c443..1199f033cf06a 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.submit import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.config._ -import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep} +import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, LocalDirectoryMountConfigurationStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep} private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunSuite { @@ -52,7 +52,8 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS classOf[BaseDriverConfigurationStep], classOf[DriverAddressConfigurationStep], classOf[DriverKubernetesCredentialsStep], - classOf[DependencyResolutionStep]) + classOf[DependencyResolutionStep], + classOf[LocalDirectoryMountConfigurationStep]) } test("Submission steps with an init-container.") { @@ -76,6 +77,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS classOf[DriverAddressConfigurationStep], classOf[DriverKubernetesCredentialsStep], classOf[DependencyResolutionStep], + classOf[LocalDirectoryMountConfigurationStep], classOf[InitContainerBootstrapStep]) } @@ -98,6 +100,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS classOf[DriverAddressConfigurationStep], classOf[DriverKubernetesCredentialsStep], classOf[DependencyResolutionStep], + classOf[LocalDirectoryMountConfigurationStep], classOf[PythonStep]) } @@ -120,6 +123,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS classOf[DriverAddressConfigurationStep], classOf[DriverKubernetesCredentialsStep], classOf[DependencyResolutionStep], + classOf[LocalDirectoryMountConfigurationStep], classOf[MountSmallLocalFilesStep]) } @@ -144,6 +148,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS classOf[DriverAddressConfigurationStep], classOf[DriverKubernetesCredentialsStep], classOf[DependencyResolutionStep], + classOf[LocalDirectoryMountConfigurationStep], classOf[MountSecretsStep]) } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/LocalDirectoryMountConfigurationStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/LocalDirectoryMountConfigurationStepSuite.scala new file mode 100644 index 0000000000000..5ce199a5df857 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/LocalDirectoryMountConfigurationStepSuite.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.submit.submitsteps + +import java.nio.file.Paths + +import scala.collection.JavaConverters._ + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.constants._ +import org.apache.spark.deploy.k8s.submit.submitsteps.{KubernetesDriverSpec, LocalDirectoryMountConfigurationStep} + +private[spark] class LocalDirectoryMountConfigurationStepSuite extends SparkFunSuite { + + test("When using the external shuffle service, the local directories must be provided.") { + val sparkConf = new SparkConf(false) + .set(org.apache.spark.internal.config.SHUFFLE_SERVICE_ENABLED, true) + val configurationStep = new LocalDirectoryMountConfigurationStep(sparkConf) + try { + configurationStep.configureDriver(KubernetesDriverSpec.initialSpec(sparkConf)) + fail("The configuration step should have failed without local dirs.") + } catch { + case e: Throwable => + assert(e.getMessage === "requirement failed: spark.local.dir must be provided explicitly" + + " when using the external shuffle service in Kubernetes. These directories should map" + + " to the paths that are mounted into the external shuffle service pods.") + } + } + + test("When not using the external shuffle service, a random directory should be set" + + " for local dirs if one is not provided.") { + val sparkConf = new SparkConf(false) + .set(org.apache.spark.internal.config.SHUFFLE_SERVICE_ENABLED, false) + val configurationStep = new LocalDirectoryMountConfigurationStep( + sparkConf, () => "local-dir") + val resolvedDriverSpec = configurationStep.configureDriver( + KubernetesDriverSpec.initialSpec(sparkConf)) + testLocalDirsMatch(resolvedDriverSpec, Seq(s"$GENERATED_LOCAL_DIR_MOUNT_ROOT/local-dir")) + } + + test("When not using the external shuffle service, provided local dirs should be mounted as" + + " emptyDirs.") { + val sparkConf = new SparkConf(false) + .set(org.apache.spark.internal.config.SHUFFLE_SERVICE_ENABLED, false) + .set("spark.local.dir", "/mnt/tmp/spark-local,/var/tmp/spark-local") + val configurationStep = new LocalDirectoryMountConfigurationStep( + sparkConf) + val resolvedDriverSpec = configurationStep.configureDriver( + KubernetesDriverSpec.initialSpec(sparkConf)) + testLocalDirsMatch(resolvedDriverSpec, Seq("/mnt/tmp/spark-local", "/var/tmp/spark-local")) + } + + private def testLocalDirsMatch( + resolvedDriverSpec: KubernetesDriverSpec, expectedLocalDirs: Seq[String]): Unit = { + assert(resolvedDriverSpec.driverSparkConf.get("spark.local.dir").split(",") === + expectedLocalDirs) + expectedLocalDirs + .zip(resolvedDriverSpec.driverPod.getSpec.getVolumes.asScala) + .zipWithIndex + .foreach { + case ((dir, volume), index) => + assert(volume.getEmptyDir != null) + val fileName = Paths.get(dir).getFileName.toString + assert(volume.getName === s"spark-local-dir-$index-$fileName") + } + + expectedLocalDirs + .zip(resolvedDriverSpec.driverContainer.getVolumeMounts.asScala) + .zipWithIndex + .foreach { + case ((dir, volumeMount), index) => + val fileName = Paths.get(dir).getFileName.toString + assert(volumeMount.getName === s"spark-local-dir-$index-$fileName") + assert(volumeMount.getMountPath === dir) + } + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLocalDirVolumeProviderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLocalDirVolumeProviderSuite.scala new file mode 100644 index 0000000000000..f3baf5b9f739a --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLocalDirVolumeProviderSuite.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import io.fabric8.kubernetes.api.model.{VolumeBuilder, VolumeMountBuilder} +import org.mockito.Mockito.{verify, verifyNoMoreInteractions, when} +import org.scalatest.mock.MockitoSugar.mock + +import org.apache.spark.{SparkConf, SparkFunSuite} + +class ExecutorLocalDirVolumeProviderSuite extends SparkFunSuite { + + test("Delegates to the external shuffle manager implementation if present.") { + val externalShuffleManager = mock[KubernetesExternalShuffleManager] + val mockVolume = new VolumeBuilder() + .withName("local-dir") + .withNewHostPath("/tmp/spark-local-dirs") + .build() + val mockVolumeMount = new VolumeMountBuilder() + .withName("local-dir") + .withMountPath("/tmp/spark-local-dirs-mount") + .build() + when(externalShuffleManager.getExecutorShuffleDirVolumesWithMounts) + .thenReturn(Seq((mockVolume, mockVolumeMount))) + val volumeProvider = new ExecutorLocalDirVolumeProviderImpl( + new SparkConf(false), Some(externalShuffleManager)) + assert(volumeProvider.getExecutorLocalDirVolumesWithMounts === + Seq((mockVolume, mockVolumeMount))) + verify(externalShuffleManager).getExecutorShuffleDirVolumesWithMounts + verifyNoMoreInteractions(externalShuffleManager) + } + + test("Provides EmptyDir volumes for each local dir in spark.local.dirs.") { + val localDirs = Seq("/tmp/test-local-dir-1", "/tmp/test-local-dir-2") + val sparkConf = new SparkConf(false).set("spark.local.dir", localDirs.mkString(",")) + val volumeProvider = new ExecutorLocalDirVolumeProviderImpl(sparkConf, None) + val localDirVolumesWithMounts = volumeProvider.getExecutorLocalDirVolumesWithMounts + assert(localDirVolumesWithMounts.size === 2) + localDirVolumesWithMounts.zip(localDirs).zipWithIndex.foreach { + case (((localDirVolume, localDirVolumeMount), expectedDirMountPath), index) => + val dirName = expectedDirMountPath.substring( + expectedDirMountPath.lastIndexOf('/') + 1, expectedDirMountPath.length) + assert(localDirVolume.getName === s"spark-local-dir-$index-$dirName") + assert(localDirVolume.getEmptyDir != null) + assert(localDirVolumeMount.getName === localDirVolume.getName) + assert(localDirVolumeMount.getMountPath === expectedDirMountPath) + case unknown => throw new IllegalArgumentException("Unexpected object: $unknown") + } + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala index 6690217557637..bb09cb801b5a9 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala @@ -16,21 +16,18 @@ */ package org.apache.spark.scheduler.cluster.k8s -import scala.collection.JavaConverters._ - -import io.fabric8.kubernetes.api.model.{Pod, VolumeBuilder, VolumeMountBuilder, _} +import io.fabric8.kubernetes.api.model.{Pod, PodBuilder, VolumeBuilder, VolumeMountBuilder} import io.fabric8.kubernetes.client.KubernetesClient -import org.apache.commons.io.FilenameUtils -import org.mockito.{AdditionalAnswers, MockitoAnnotations} -import org.mockito.Matchers.{any, eq => mockitoEq} +import org.mockito.{AdditionalAnswers, Mock, Mockito, MockitoAnnotations} +import org.mockito.Matchers.any import org.mockito.Mockito._ -import org.mockito.invocation.InvocationOnMock -import org.mockito.stubbing.Answer import org.scalatest.{BeforeAndAfter, BeforeAndAfterEach} +import scala.collection.JavaConverters._ import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.k8s.{constants, PodWithDetachedInitContainer, SparkPodInitContainerBootstrap} +import org.apache.spark.deploy.k8s.{PodWithDetachedInitContainer, SparkPodInitContainerBootstrap} import org.apache.spark.deploy.k8s.config._ +import org.apache.spark.deploy.k8s.constants._ import org.apache.spark.deploy.k8s.submit.{MountSecretsBootstrapImpl, MountSmallFilesBootstrap, MountSmallFilesBootstrapImpl} class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with BeforeAndAfterEach { @@ -52,7 +49,12 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef .endStatus() .build() private var baseConf: SparkConf = _ - private val nodeAffinityExecutorPodModifier = mock(classOf[NodeAffinityExecutorPodModifier]) + + @Mock + private var nodeAffinityExecutorPodModifier: NodeAffinityExecutorPodModifier = _ + + @Mock + private var executorLocalDirVolumeProvider: ExecutorLocalDirVolumeProvider = _ before { MockitoAnnotations.initMocks(this) @@ -60,15 +62,12 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef .set(KUBERNETES_DRIVER_POD_NAME, driverPodName) .set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, executorPrefix) .set(EXECUTOR_DOCKER_IMAGE, executorImage) - } - private var kubernetesClient: KubernetesClient = _ - - override def beforeEach(cmap: org.scalatest.ConfigMap) { - reset(nodeAffinityExecutorPodModifier) when(nodeAffinityExecutorPodModifier.addNodeAffinityAnnotationIfUseful( any(classOf[Pod]), any(classOf[Map[String, Int]]))).thenAnswer(AdditionalAnswers.returnsFirstArg()) + when(executorLocalDirVolumeProvider.getExecutorLocalDirVolumesWithMounts).thenReturn(Seq.empty) } + private var kubernetesClient: KubernetesClient = _ test("basic executor pod has reasonable defaults") { val factory = new ExecutorPodFactoryImpl( @@ -78,7 +77,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, None, None, - None) + executorLocalDirVolumeProvider) val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) @@ -112,7 +111,13 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef "loremipsumdolorsitametvimatelitrefficiendisuscipianturvixlegeresple") val factory = new ExecutorPodFactoryImpl( - conf, nodeAffinityExecutorPodModifier, None, None, None, None, None) + conf, + nodeAffinityExecutorPodModifier, + None, + None, + None, + None, + executorLocalDirVolumeProvider) val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) @@ -133,7 +138,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, None, None, - None) + executorLocalDirVolumeProvider) val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) @@ -167,7 +172,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, Some(initContainerBootstrap), None, - None) + executorLocalDirVolumeProvider) val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) @@ -175,53 +180,37 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef .addNodeAffinityAnnotationIfUseful(any(classOf[Pod]), any(classOf[Map[String, Int]])) assert(executor.getMetadata.getAnnotations.size() === 1) - assert(executor.getMetadata.getAnnotations.containsKey(constants.INIT_CONTAINER_ANNOTATION)) + assert(executor.getMetadata.getAnnotations.containsKey(INIT_CONTAINER_ANNOTATION)) checkOwnerReferences(executor, driverPodUid) } - test("the shuffle-service adds a volume mount") { - val conf = baseConf.clone() - conf.set(KUBERNETES_SHUFFLE_LABELS, "label=value") - conf.set(KUBERNETES_SHUFFLE_NAMESPACE, "default") - conf.set(KUBERNETES_SHUFFLE_DIR, "/tmp") - - val shuffleManager = mock(classOf[KubernetesExternalShuffleManager]) - when(shuffleManager.getExecutorShuffleDirVolumesWithMounts).thenReturn({ - val shuffleDirs = Seq("/tmp") - shuffleDirs.zipWithIndex.map { case (shuffleDir, shuffleDirIndex) => - val volumeName = s"$shuffleDirIndex-${FilenameUtils.getBaseName(shuffleDir)}" - val volume = new VolumeBuilder() - .withName(volumeName) - .withNewHostPath(shuffleDir) - .build() - val volumeMount = new VolumeMountBuilder() - .withName(volumeName) - .withMountPath(shuffleDir) - .build() - (volume, volumeMount) - } - }) - + test("The local dir volume provider's returned volumes and volume mounts should be added.") { + Mockito.reset(executorLocalDirVolumeProvider) + val localDirVolume = new VolumeBuilder() + .withName("local-dir") + .withNewEmptyDir().endEmptyDir() + .build() + val localDirVolumeMount = new VolumeMountBuilder() + .withName("local-dir") + .withMountPath("/tmp") + .build() + when(executorLocalDirVolumeProvider.getExecutorLocalDirVolumesWithMounts) + .thenReturn(Seq((localDirVolume, localDirVolumeMount))) val factory = new ExecutorPodFactoryImpl( - conf, + baseConf, nodeAffinityExecutorPodModifier, None, None, None, None, - Some(shuffleManager)) + executorLocalDirVolumeProvider) val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) - - verify(nodeAffinityExecutorPodModifier, times(1)) - .addNodeAffinityAnnotationIfUseful(any(classOf[Pod]), any(classOf[Map[String, Int]])) - + assert(executor.getSpec.getVolumes.size === 1) + assert(executor.getSpec.getVolumes.contains(localDirVolume)) assert(executor.getSpec.getContainers.size() === 1) - assert(executor.getSpec.getContainers.get(0).getVolumeMounts.size() === 1) - assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0).getName === "0-tmp") - assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0) - .getMountPath === "/tmp") - checkOwnerReferences(executor, driverPodUid) + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.size === 1) + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.contains(localDirVolumeMount)) } test("Small-files add a secret & secret volume mount to the container") { @@ -235,7 +224,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef Some(smallFiles), None, None, - None) + executorLocalDirVolumeProvider) val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) @@ -262,7 +251,13 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef conf.set(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH, "bar=baz") val factory = new ExecutorPodFactoryImpl( - conf, nodeAffinityExecutorPodModifier, None, None, None, None, None) + conf, + nodeAffinityExecutorPodModifier, + None, + None, + None, + None, + executorLocalDirVolumeProvider) val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)]("qux" -> "quux"), driverPod, Map[String, Int]()) @@ -286,14 +281,14 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef // Check that the expected environment variables are present. private def checkEnv(executor: Pod, additionalEnvVars: Map[String, String]): Unit = { val defaultEnvs = Map( - constants.ENV_EXECUTOR_ID -> "1", - constants.ENV_DRIVER_URL -> "dummy", - constants.ENV_EXECUTOR_CORES -> "1", - constants.ENV_EXECUTOR_MEMORY -> "1g", - constants.ENV_APPLICATION_ID -> "dummy", - constants.ENV_MOUNTED_CLASSPATH -> "/var/spark-data/spark-jars/*", - constants.ENV_EXECUTOR_POD_IP -> null, - constants.ENV_EXECUTOR_PORT -> "10000") ++ additionalEnvVars + ENV_EXECUTOR_ID -> "1", + ENV_DRIVER_URL -> "dummy", + ENV_EXECUTOR_CORES -> "1", + ENV_EXECUTOR_MEMORY -> "1g", + ENV_APPLICATION_ID -> "dummy", + ENV_MOUNTED_CLASSPATH -> "/var/spark-data/spark-jars/*", + ENV_EXECUTOR_POD_IP -> null, + ENV_EXECUTOR_PORT -> "10000") ++ additionalEnvVars assert(executor.getSpec.getContainers.size() === 1) assert(executor.getSpec.getContainers.get(0).getEnv().size() === defaultEnvs.size) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index d6e12df9d1ba2..27041207ffdce 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -150,6 +150,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { sparkConf.setJars(Seq(CONTAINER_LOCAL_HELPER_JAR_PATH)) sparkConf.set("spark.dynamicAllocation.enabled", "true") + sparkConf.set("spark.local.dir", "/tmp") sparkConf.set("spark.shuffle.service.enabled", "true") sparkConf.set("spark.kubernetes.shuffle.labels", "app=spark-shuffle-service") sparkConf.set("spark.kubernetes.shuffle.namespace", kubernetesTestComponents.namespace)