From 0e1238a21019b9de24315056c9361e02a4764ded Mon Sep 17 00:00:00 2001 From: Rob Vesse Date: Tue, 28 Aug 2018 14:52:07 +0100 Subject: [PATCH 1/2] [SPARK-25262][K8S] Allow SPARK_LOCAL_DIRS to be tmpfs backed on K8S Adds a configuration option that enables SPARK_LOCAL_DIRS to be backed by Memory backed emptyDir volumes rather than the default which is whatever the kubelet's node storage happens to be --- docs/running-on-kubernetes.md | 13 ++++++++ .../org/apache/spark/deploy/k8s/Config.scala | 9 ++++++ .../k8s/features/LocalDirsFeatureStep.scala | 6 ++++ .../features/LocalDirsFeatureStepSuite.scala | 30 +++++++++++++++++++ 4 files changed, 58 insertions(+) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index c83dad6df1e7..320c28849e90 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -215,6 +215,19 @@ spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.options.clai The configuration properties for mounting volumes into the executor pods use prefix `spark.kubernetes.executor.` instead of `spark.kubernetes.driver.`. For a complete list of available options for each supported type of volumes, please refer to the [Spark Properties](#spark-properties) section below. +## Local Storage + +Spark uses temporary scratch space to spill data to disk during shuffles and other operations. When using Kubernetes as the resource manager the pods will be created with an [emptyDir](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir) volume mounted for each directory listed in `SPARK_LOCAL_DIRS`. If no directories are explicitly specified then a default directory is created and configured appropriately. + +`emptyDir` volumes use the ephemeral storage feature of Kubernetes and do not persist beyond the life of the pod. + +### Using RAM for local storage + +As `emptyDir` volumes use the nodes backing storage for ephemeral storage this default behaviour may not be appropriate for some compute environments. For example if you have diskless nodes with remote storage mounted over a network having lots of executors doing IO to this remote storage may actually degrade performance. + +In this case it may be desirable to set `spark.kubernetes.local.dirs.tmpfs=true` in your configuration which will cause the `emptyDir` volumes to be configured as `tmpfs` i.e. RAM backed volumes. When configured like this Sparks local storage usage will count towards your pods memory usage therefore you may wish to increase your memory requests via the normal `spark.driver.memory` and `spark.executor.memory` configuration properties. + + ## Introspection and Debugging These are the different ways in which you can investigate a running/completed Spark application, monitor progress, and diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 1b582fe53624..cc8da906f976 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -225,6 +225,15 @@ private[spark] object Config extends Logging { "Ensure that major Python version is either Python2 or Python3") .createWithDefault("2") + val KUBERNETES_LOCAL_DIRS_TMPFS = + ConfigBuilder("spark.kubernetes.local.dirs.tmpfs") + .doc("If set to true then emptyDir volumes created to back spark.local.dirs will have " + + "their medium set to Memory so that they will be created as tmpfs (i.e. RAM) backed " + + "volumes. This may improve performance but scratch space usage will count towards " + + "your pods memory limit so you may wish to request more memory.") + .booleanConf + .createWithDefault(false) + val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX = "spark.kubernetes.authenticate.submission" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala index 70b307303d14..c4085859f7f4 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala @@ -22,6 +22,7 @@ import java.util.UUID import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder, VolumeBuilder, VolumeMountBuilder} import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, SparkPod} +import org.apache.spark.deploy.k8s.Config._ private[spark] class LocalDirsFeatureStep( conf: KubernetesConf[_ <: KubernetesRoleSpecificConf], @@ -37,6 +38,7 @@ private[spark] class LocalDirsFeatureStep( .orElse(conf.getOption("spark.local.dir")) .getOrElse(defaultLocalDir) .split(",") + private val useLocalDirTmpFs = conf.get(KUBERNETES_LOCAL_DIRS_TMPFS) override def configurePod(pod: SparkPod): SparkPod = { val localDirVolumes = resolvedLocalDirs @@ -45,6 +47,10 @@ private[spark] class LocalDirsFeatureStep( new VolumeBuilder() .withName(s"spark-local-dir-${index + 1}") .withNewEmptyDir() + .withMedium(useLocalDirTmpFs match { + case true => "Memory" // Use tmpfs + case false => null // Default - use nodes backing storage + }) .endEmptyDir() .build() } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala index a339827b819a..acdd07bc594b 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala @@ -18,10 +18,12 @@ package org.apache.spark.deploy.k8s.features import io.fabric8.kubernetes.api.model.{EnvVarBuilder, VolumeBuilder, VolumeMountBuilder} import org.mockito.Mockito +import org.scalatest._ import org.scalatest.BeforeAndAfter import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, SparkPod} +import org.apache.spark.deploy.k8s.Config._ class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { private val defaultLocalDir = "/var/data/default-local-dir" @@ -111,4 +113,32 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { .withValue("/var/data/my-local-dir-1,/var/data/my-local-dir-2") .build()) } + + test("Use tmpfs to back default local dir") { + Mockito.doReturn(null).when(sparkConf).get("spark.local.dir") + Mockito.doReturn(null).when(sparkConf).getenv("SPARK_LOCAL_DIRS") + Mockito.doReturn(true).when(sparkConf).get(KUBERNETES_LOCAL_DIRS_TMPFS) + val stepUnderTest = new LocalDirsFeatureStep(kubernetesConf, defaultLocalDir) + val configuredPod = stepUnderTest.configurePod(SparkPod.initialPod()) + assert(configuredPod.pod.getSpec.getVolumes.size === 1) + assert(configuredPod.pod.getSpec.getVolumes.get(0) === + new VolumeBuilder() + .withName(s"spark-local-dir-1") + .withNewEmptyDir() + .withMedium("Memory") + .endEmptyDir() + .build()) + assert(configuredPod.container.getVolumeMounts.size === 1) + assert(configuredPod.container.getVolumeMounts.get(0) === + new VolumeMountBuilder() + .withName(s"spark-local-dir-1") + .withMountPath(defaultLocalDir) + .build()) + assert(configuredPod.container.getEnv.size === 1) + assert(configuredPod.container.getEnv.get(0) === + new EnvVarBuilder() + .withName("SPARK_LOCAL_DIRS") + .withValue(defaultLocalDir) + .build()) + } } From 8bc9f0e750fd1418eb8f14310e519daec663f713 Mon Sep 17 00:00:00 2001 From: Rob Vesse Date: Thu, 6 Sep 2018 14:30:28 +0100 Subject: [PATCH 2/2] [SPARK-25262][K8S] Address PR comments - Simplify syntax - Indentation fixes - Doc improvements and clarifications --- docs/running-on-kubernetes.md | 12 ++++++++++-- .../scala/org/apache/spark/deploy/k8s/Config.scala | 6 +++--- .../deploy/k8s/features/LocalDirsFeatureStep.scala | 5 +---- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 320c28849e90..4ae7acaae231 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -223,9 +223,9 @@ Spark uses temporary scratch space to spill data to disk during shuffles and oth ### Using RAM for local storage -As `emptyDir` volumes use the nodes backing storage for ephemeral storage this default behaviour may not be appropriate for some compute environments. For example if you have diskless nodes with remote storage mounted over a network having lots of executors doing IO to this remote storage may actually degrade performance. +`emptyDir` volumes use the nodes backing storage for ephemeral storage by default, this behaviour may not be appropriate for some compute environments. For example if you have diskless nodes with remote storage mounted over a network, having lots of executors doing IO to this remote storage may actually degrade performance. -In this case it may be desirable to set `spark.kubernetes.local.dirs.tmpfs=true` in your configuration which will cause the `emptyDir` volumes to be configured as `tmpfs` i.e. RAM backed volumes. When configured like this Sparks local storage usage will count towards your pods memory usage therefore you may wish to increase your memory requests via the normal `spark.driver.memory` and `spark.executor.memory` configuration properties. +In this case it may be desirable to set `spark.kubernetes.local.dirs.tmpfs=true` in your configuration which will cause the `emptyDir` volumes to be configured as `tmpfs` i.e. RAM backed volumes. When configured like this Sparks local storage usage will count towards your pods memory usage therefore you may wish to increase your memory requests by increasing the value of `spark.kubernetes.memoryOverheadFactor` as appropriate. ## Introspection and Debugging @@ -797,6 +797,14 @@ specific to Spark on Kubernetes. spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.options.claimName=spark-pvc-claim. + + spark.kubernetes.local.dirs.tmpfs + false + + Configure the emptyDir volumes used to back SPARK_LOCAL_DIRS within the Spark driver and executor pods to use tmpfs backing i.e. RAM. See Local Storage earlier on this page + for more discussion of this. + + spark.kubernetes.memoryOverheadFactor 0.1 diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index cc8da906f976..c5f4d6c53b7f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -227,12 +227,12 @@ private[spark] object Config extends Logging { val KUBERNETES_LOCAL_DIRS_TMPFS = ConfigBuilder("spark.kubernetes.local.dirs.tmpfs") - .doc("If set to true then emptyDir volumes created to back spark.local.dirs will have " + + .doc("If set to true then emptyDir volumes created to back SPARK_LOCAL_DIRS will have " + "their medium set to Memory so that they will be created as tmpfs (i.e. RAM) backed " + "volumes. This may improve performance but scratch space usage will count towards " + "your pods memory limit so you may wish to request more memory.") - .booleanConf - .createWithDefault(false) + .booleanConf + .createWithDefault(false) val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX = "spark.kubernetes.authenticate.submission" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala index c4085859f7f4..be386e119d46 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala @@ -47,10 +47,7 @@ private[spark] class LocalDirsFeatureStep( new VolumeBuilder() .withName(s"spark-local-dir-${index + 1}") .withNewEmptyDir() - .withMedium(useLocalDirTmpFs match { - case true => "Memory" // Use tmpfs - case false => null // Default - use nodes backing storage - }) + .withMedium(if (useLocalDirTmpFs) "Memory" else null) .endEmptyDir() .build() }