Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions conf/kubernetes-shuffle-service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ spec:
volumes:
- name: temp-volume
hostPath:
path: '/tmp' # change this path according to your cluster configuration.
path: '/tmp/spark-local' # change this path according to your cluster configuration.
containers:
- name: shuffle
# This is an official image that is built
Expand All @@ -41,7 +41,7 @@ spec:
image: kubespark/spark-shuffle:v2.2.0-kubernetes-0.4.0
imagePullPolicy: IfNotPresent
volumeMounts:
- mountPath: '/tmp'
- mountPath: '/tmp/spark-local'
name: temp-volume
# more volumes can be mounted here.
# The spark job must be configured to use these
Expand Down
11 changes: 10 additions & 1 deletion docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ Below is an example submission:
local:///opt/spark/examples/src/main/python/pi.py 100
```

## Dynamic Executor Scaling
## Dynamic Allocation in Kubernetes

Spark on Kubernetes supports Dynamic Allocation with cluster mode. This mode requires running
an external shuffle service. This is typically a [daemonset](https://kubernetes.io/docs/concepts/workloads/controllers/daemonset/)
Expand All @@ -245,6 +245,7 @@ the command may then look like the following:
--class org.apache.spark.examples.GroupByTest \
--master k8s://<k8s-master>:<port> \
--kubernetes-namespace default \
--conf spark.local.dir=/tmp/spark-local
--conf spark.app.name=group-by-test \
--conf spark.kubernetes.driver.docker.image=kubespark/spark-driver:latest \
--conf spark.kubernetes.executor.docker.image=kubespark/spark-executor:latest \
Expand All @@ -254,6 +255,14 @@ the command may then look like the following:
--conf spark.kubernetes.shuffle.labels="app=spark-shuffle-service,spark-version=2.2.0" \
local:///opt/spark/examples/jars/spark-examples_2.11-2.2.0-k8s-0.3.0.jar 10 400000 2

The external shuffle service has to mount directories that can be shared with the executor pods. The provided example
YAML spec mounts a hostPath volume to the external shuffle service pods, but these hostPath volumes must also be mounted
into the executors. When using the external shuffle service, the directories specified in the `spark.local.dir`
configuration are mounted as hostPath volumes into all of the executor containers. To ensure that one does not
accidentally mount the incorrect hostPath volumes, the value of `spark.local.dir` must be specified in your
application's configuration when using Kubernetes, even though it defaults to the JVM's temporary directory when using
other cluster managers.

## Advanced

### Securing the Resource Staging Server with TLS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,6 @@ package object config extends Logging {
.stringConf
.createOptional

private[spark] val KUBERNETES_SHUFFLE_DIR =
ConfigBuilder("spark.kubernetes.shuffle.dir")
.doc("Path to the shared shuffle directories.")
.stringConf
.createOptional

private[spark] val KUBERNETES_SHUFFLE_APISERVER_URI =
ConfigBuilder("spark.kubernetes.shuffle.apiServer.url")
.doc("URL to the Kubernetes API server that the shuffle service will monitor for Spark pods.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,5 @@ package object constants {
private[spark] val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
private[spark] val MEMORY_OVERHEAD_FACTOR = 0.10
private[spark] val MEMORY_OVERHEAD_MIN_MIB = 384L
private[spark] val GENERATED_LOCAL_DIR_MOUNT_ROOT = "/mnt/tmp/spark-local"
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.deploy.k8s.config._
import org.apache.spark.deploy.k8s.constants._
import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep}
import org.apache.spark.deploy.k8s.submit.submitsteps.initcontainer.InitContainerConfigurationStepsOrchestrator
import org.apache.spark.deploy.k8s.submit.submitsteps.LocalDirectoryMountConfigurationStep
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util.{SystemClock, Utils}

Expand Down Expand Up @@ -104,6 +105,9 @@ private[spark] class DriverConfigurationStepsOrchestrator(
val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep(
submissionSparkConf, kubernetesResourceNamePrefix)

val localDirectoryMountConfigurationStep = new LocalDirectoryMountConfigurationStep(
submissionSparkConf)

val pythonStep = mainAppResource match {
case PythonMainAppResource(mainPyResource) =>
Option(new PythonStep(mainPyResource, additionalPythonFiles, filesDownloadPath))
Expand Down Expand Up @@ -181,7 +185,8 @@ private[spark] class DriverConfigurationStepsOrchestrator(
initialSubmissionStep,
driverAddressStep,
kubernetesCredentialsStep,
dependencyResolutionStep) ++
dependencyResolutionStep,
localDirectoryMountConfigurationStep) ++
submittedDependenciesBootstrapSteps ++
pythonStep.toSeq ++
mountSecretsStep.toSeq
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit.submitsteps

import java.nio.file.Paths
import java.util.UUID

import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, VolumeBuilder, VolumeMountBuilder}

import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.constants._

/**
* Configures local directories that the driver and executors should use for temporary storage.
*
* Note that we have different semantics for scratch space in Kubernetes versus the other cluster
* managers. In Kubernetes, we cannot allow the local directories to resolve to the Java temporary
* directory. This is because we will mount either emptyDir volumes for both the driver and
* executors, or hostPath volumes for the executors and an emptyDir for the driver. In either
* case, the mount paths need to be directories that do not exist in the base container images.
* But the Java temporary directory is typically a directory like /tmp which exists in most
* container images.
*
* The solution is twofold:
* - When not using an external shuffle service, a reasonable default is to create a new directory
* with a random name and set that to be the value of `spark.local.dir`.
* - When using the external shuffle service, it is risky to assume that the user intends to mount
* the JVM temporary directory into the pod as a hostPath volume. We therefore enforce that
* spark.local.dir must be set in dynamic allocation mode so that the user explicitly sets the
* paths that have to be mounted.
*/
private[spark] class LocalDirectoryMountConfigurationStep(
submissionSparkConf: SparkConf,
randomDirProvider: () => String = () => s"spark-${UUID.randomUUID()}")
extends DriverConfigurationStep {

override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
val configuredLocalDirs = submissionSparkConf.getOption("spark.local.dir")
val isUsingExternalShuffle = submissionSparkConf.get(
org.apache.spark.internal.config.SHUFFLE_SERVICE_ENABLED)
val resolvedLocalDirsSingleString = if (isUsingExternalShuffle) {
require(configuredLocalDirs.isDefined, "spark.local.dir must be provided explicitly when" +
" using the external shuffle service in Kubernetes. These directories should map to" +
" the paths that are mounted into the external shuffle service pods.")
configuredLocalDirs.get
} else {
// If we don't use the external shuffle service, local directories should be randomized if
// not provided.
configuredLocalDirs.getOrElse(s"$GENERATED_LOCAL_DIR_MOUNT_ROOT/${randomDirProvider()}")
}
val resolvedLocalDirs = resolvedLocalDirsSingleString.split(",")
// It's worth noting that we always use an emptyDir volume for the directories on the driver,
// because the driver does not need a hostPath to share its scratch space with any other pod.
// The driver itself will decide on whether to use a hostPath volume or an emptyDir volume for
// these directories on the executors. (see ExecutorPodFactory and
// KubernetesExternalClusterManager)
val localDirVolumes = resolvedLocalDirs.zipWithIndex.map { case (dir, index) =>
new VolumeBuilder()
.withName(s"spark-local-dir-$index-${Paths.get(dir).getFileName.toString}")
.withNewEmptyDir().endEmptyDir()
.build()
}
val localDirVolumeMounts = localDirVolumes.zip(resolvedLocalDirs).map {
case (volume, path) =>
new VolumeMountBuilder()
.withName(volume.getName)
.withMountPath(path)
.build()
}
val resolvedDriverSparkConf = driverSpec.driverSparkConf.clone().set(
"spark.local.dir", resolvedLocalDirsSingleString)
driverSpec.copy(
driverPod = new PodBuilder(driverSpec.driverPod)
.editSpec()
.addToVolumes(localDirVolumes: _*)
.endSpec()
.build(),
driverContainer = new ContainerBuilder(driverSpec.driverContainer)
.addToVolumeMounts(localDirVolumeMounts: _*)
.build(),
driverSparkConf = resolvedDriverSparkConf
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.k8s

import java.nio.file.Paths

import io.fabric8.kubernetes.api.model.{Volume, VolumeBuilder, VolumeMount, VolumeMountBuilder}

import org.apache.spark.SparkConf
import org.apache.spark.util.Utils

private[spark] trait ExecutorLocalDirVolumeProvider {
def getExecutorLocalDirVolumesWithMounts: Seq[(Volume, VolumeMount)]
}

private[spark] class ExecutorLocalDirVolumeProviderImpl(
sparkConf: SparkConf,
kubernetesExternalShuffleManager: Option[KubernetesExternalShuffleManager])
extends ExecutorLocalDirVolumeProvider {
override def getExecutorLocalDirVolumesWithMounts: Seq[(Volume, VolumeMount)] = {
kubernetesExternalShuffleManager.map(_.getExecutorShuffleDirVolumesWithMounts)
.getOrElse {
// If we're not using the external shuffle manager, we should use emptyDir volumes for
// shuffle directories since it's important for disk I/O for these directories to be
// performant. If the user has not provided a local directory, instead of using the
// Java temporary directory, we create one instead, because we want to avoid
// mounting an emptyDir which overlaps with an existing path in the Docker image.
// Java's temporary directory path is typically /tmp or a similar path, which is
// likely to exist in most images.
val resolvedLocalDirs = Utils.getConfiguredLocalDirs(sparkConf)
val localDirVolumes = resolvedLocalDirs.zipWithIndex.map { case (dir, index) =>
new VolumeBuilder()
.withName(s"spark-local-dir-$index-${Paths.get(dir).getFileName.toString}")
.withNewEmptyDir().endEmptyDir()
.build()
}
val localDirVolumeMounts = localDirVolumes.zip(resolvedLocalDirs).map {
case (volume, path) =>
new VolumeMountBuilder()
.withName(volume.getName)
.withMountPath(path)
.build()
}
localDirVolumes.zip(localDirVolumeMounts)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@
*/
package org.apache.spark.scheduler.cluster.k8s

import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder, VolumeBuilder, VolumeMountBuilder}
import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder}

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.k8s.{ConfigurationUtils, InitContainerResourceStagingServerSecretPlugin, PodWithDetachedInitContainer, SparkPodInitContainerBootstrap}
import org.apache.spark.deploy.k8s.config._
Expand All @@ -46,7 +45,7 @@ private[spark] class ExecutorPodFactoryImpl(
mountSmallFilesBootstrap: Option[MountSmallFilesBootstrap],
executorInitContainerBootstrap: Option[SparkPodInitContainerBootstrap],
executorMountInitContainerSecretPlugin: Option[InitContainerResourceStagingServerSecretPlugin],
shuffleManager: Option[KubernetesExternalShuffleManager])
executorLocalDirVolumeProvider: ExecutorLocalDirVolumeProvider)
extends ExecutorPodFactory {

import ExecutorPodFactoryImpl._
Expand Down Expand Up @@ -175,9 +174,8 @@ private[spark] class ExecutorPodFactoryImpl(
.withContainerPort(port._2)
.build()
})
val shuffleVolumesWithMounts =
shuffleManager.map(_.getExecutorShuffleDirVolumesWithMounts)
.getOrElse(Seq.empty)
val shuffleVolumesWithMounts = executorLocalDirVolumeProvider
.getExecutorLocalDirVolumesWithMounts

val executorContainer = new ContainerBuilder()
.withName(s"executor")
Expand Down Expand Up @@ -262,6 +260,7 @@ private[spark] class ExecutorPodFactoryImpl(
val executorPodWithNodeAffinity =
nodeAffinityExecutorPodModifier.addNodeAffinityAnnotationIfUseful(
executorPodWithInitContainer, nodeToLocalTaskCount)

new PodBuilder(executorPodWithNodeAffinity)
.editSpec()
.addToContainers(initBootstrappedExecutorContainer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,25 +113,28 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)),
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)))

val kubernetesShuffleManager = if (Utils.isDynamicAllocationEnabled(sparkConf)) {
val kubernetesShuffleManager = if (sparkConf.get(
org.apache.spark.internal.config.SHUFFLE_SERVICE_ENABLED)) {
val kubernetesExternalShuffleClient = new KubernetesExternalShuffleClientImpl(
SparkTransportConf.fromSparkConf(sparkConf, "shuffle"),
sc.env.securityManager,
sc.env.securityManager.isAuthenticationEnabled())
SparkTransportConf.fromSparkConf(sparkConf, "shuffle"),
sc.env.securityManager,
sc.env.securityManager.isAuthenticationEnabled())
Some(new KubernetesExternalShuffleManagerImpl(
sparkConf,
kubernetesClient,
kubernetesExternalShuffleClient))
sparkConf,
kubernetesClient,
kubernetesExternalShuffleClient))
} else None

val executorLocalDirVolumeProvider = new ExecutorLocalDirVolumeProviderImpl(
sparkConf, kubernetesShuffleManager)
val executorPodFactory = new ExecutorPodFactoryImpl(
sparkConf,
NodeAffinityExecutorPodModifierImpl,
mountSecretBootstrap,
mountSmallFilesBootstrap,
executorInitContainerBootstrap,
executorInitContainerSecretVolumePlugin,
kubernetesShuffleManager)
executorLocalDirVolumeProvider)
val allocatorExecutor = ThreadUtils
.newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator")
val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,7 @@ private[spark] class KubernetesExternalShuffleManagerImpl(
s"but no ${KUBERNETES_SHUFFLE_LABELS.key} specified")
}
private val externalShufflePort = sparkConf.getInt("spark.shuffle.service.port", 7337)
private val shuffleDirs = sparkConf.get(KUBERNETES_SHUFFLE_DIR).map {
_.split(",")
}.getOrElse(Utils.getConfiguredLocalDirs(sparkConf))
private val shuffleDirs = Utils.getConfiguredLocalDirs(sparkConf)
private var shufflePodCache = scala.collection.mutable.Map[String, String]()
private var watcher: Watch = _

Expand Down Expand Up @@ -140,6 +138,12 @@ private[spark] class KubernetesExternalShuffleManagerImpl(
}

override def getExecutorShuffleDirVolumesWithMounts(): Seq[(Volume, VolumeMount)] = {
// TODO: Using hostPath for the local directory will also make it such that the
// other uses of the local directory - broadcasting and caching - will also write
// to the directory that the shuffle service is aware of. It would be better for
// these directories to be separate so that the lifetime of the non-shuffle scratch
// space is tied to an emptyDir instead of the hostPath. This requires a change in
// core Spark as well.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was trying to think through if there are any security concerns here, but I don't think there are any additional ones. Any data in the broadcast/cache files would with high probability also be accessible in shuffle files, so there's no new data exposed. We have other methods to secure the shuffle dirs between peer executors on the same node

shuffleDirs.zipWithIndex.map {
case (shuffleDir, shuffleDirIndex) =>
val volumeName = s"$shuffleDirIndex-${FilenameUtils.getBaseName(shuffleDir)}"
Expand Down
Loading