From 51a877a7d86b6c0f18d381ba9300de18ccef9189 Mon Sep 17 00:00:00 2001 From: madanadit Date: Wed, 4 Apr 2018 18:02:25 -0700 Subject: [PATCH 01/18] Support mounting hostPath volumes for executors --- .../org/apache/spark/deploy/k8s/Config.scala | 7 +++++ .../spark/deploy/k8s/KubernetesUtils.scala | 31 ++++++++++++++++++- .../features/BasicExecutorFeatureStep.scala | 14 ++++++--- 3 files changed, 47 insertions(+), 5 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 4086970ffb256..639f227b5874f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -117,6 +117,13 @@ private[spark] object Config extends Logging { .stringConf .createWithDefault("spark") + val KUBERNETES_EXECUTOR_VOLUMES = + ConfigBuilder("spark.kubernetes.executor.volumes") + .doc("List of volumes mounted into the executor container. The format of this property is " + + "a comma-separated list of mappings following the form hostPath:containerPath:name") + .stringConf + .createWithDefault("") + val KUBERNETES_ALLOCATION_BATCH_SIZE = ConfigBuilder("spark.kubernetes.allocation.batch.size") .doc("Number of pods to launch at once in each round of executor allocation.") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index ee629068ad90d..bebeea1edeca3 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.deploy.k8s -import io.fabric8.kubernetes.api.model.LocalObjectReference +import io.fabric8.kubernetes.api.model._ import org.apache.spark.SparkConf import org.apache.spark.util.Utils @@ -37,6 +37,35 @@ private[spark] object KubernetesUtils { sparkConf.getAllWithPrefix(prefix).toMap } + /** + * Parse a comma-delimited list of volume specs, each of which + * takes the form hostPath:containerPath:name and add to pod. + * + * @param pod original specification of the pod + * @param container original specification of the container + * @param volumes list of volume specs + * @return the pod with the init-container added to the list of InitContainers + */ + def addVolumes(pod: Pod, container : Container, volumes: String): (Pod, Container) = { + val podBuilder = new PodBuilder(pod).editOrNewSpec() + val containerBuilder = new ContainerBuilder(container) + volumes.split(",").map(_.split(":")).map { spec => + spec match { + case Array(hostPath, containerPath, name) => + podBuilder + .withVolumes(new VolumeBuilder() + .withHostPath(new HostPathVolumeSource(hostPath)).withName(name).build()) + containerBuilder.addToVolumeMounts(new VolumeMountBuilder() + .withMountPath(containerPath) + .withName(name) + .build()) + case spec => + None + } + } + (podBuilder.endSpec().build(), containerBuilder.build()) + } + def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = { opt1.foreach { _ => require(opt2.isEmpty, errMessage) } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index d22097587aafe..41e2ba11ab8b6 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -17,11 +17,10 @@ package org.apache.spark.deploy.k8s.features import scala.collection.JavaConverters._ - -import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, HasMetadata, PodBuilder, QuantityBuilder} +import io.fabric8.kubernetes.api.model._ import org.apache.spark.SparkException -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, SparkPod} +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, KubernetesUtils, SparkPod} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.config.{EXECUTOR_CLASS_PATH, EXECUTOR_JAVA_OPTIONS, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD} @@ -170,7 +169,14 @@ private[spark] class BasicExecutorFeatureStep( .addToImagePullSecrets(kubernetesConf.imagePullSecrets(): _*) .endSpec() .build() - SparkPod(executorPod, containerWithLimitCores) + + val volumes = kubernetesConf.get(KUBERNETES_EXECUTOR_VOLUMES) + val executorPodAndContainerWithVolumes = + KubernetesUtils.addVolumes(executorPod, containerWithLimitCores, volumes) + val executorPodWithVolumes = executorPodAndContainerWithVolumes._1 + val executorContainerWithVolumes = executorPodAndContainerWithVolumes._2 + + SparkPod(executorPodWithVolumes, executorContainerWithVolumes) } override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty From 4ada724ab2f6e79553fa05475b216884f9ba127a Mon Sep 17 00:00:00 2001 From: madanadit Date: Thu, 5 Apr 2018 13:57:45 -0700 Subject: [PATCH 02/18] Read mode for mounted volumes --- .../org/apache/spark/deploy/k8s/Config.scala | 2 +- .../spark/deploy/k8s/KubernetesUtils.scala | 38 ++++++++++++++++--- 2 files changed, 34 insertions(+), 6 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 639f227b5874f..1e41d0f001bcf 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -120,7 +120,7 @@ private[spark] object Config extends Logging { val KUBERNETES_EXECUTOR_VOLUMES = ConfigBuilder("spark.kubernetes.executor.volumes") .doc("List of volumes mounted into the executor container. The format of this property is " + - "a comma-separated list of mappings following the form hostPath:containerPath:name") + "a comma-separated list of mappings following the form hostPath:containerPath[:ro|rw]") .stringConf .createWithDefault("") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index bebeea1edeca3..b068da02ea9eb 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -38,8 +38,8 @@ private[spark] object KubernetesUtils { } /** - * Parse a comma-delimited list of volume specs, each of which - * takes the form hostPath:containerPath:name and add to pod. + * Parse a comma-delimited list of volume specs, each of which takes the form + * hostPath:containerPath[:ro|rw]; and add volume to pod and volume mount to container. * * @param pod original specification of the pod * @param container original specification of the container @@ -49,16 +49,44 @@ private[spark] object KubernetesUtils { def addVolumes(pod: Pod, container : Container, volumes: String): (Pod, Container) = { val podBuilder = new PodBuilder(pod).editOrNewSpec() val containerBuilder = new ContainerBuilder(container) + var volumeCount = 0 volumes.split(",").map(_.split(":")).map { spec => spec match { - case Array(hostPath, containerPath, name) => + case Array(hostPath, containerPath) => podBuilder .withVolumes(new VolumeBuilder() - .withHostPath(new HostPathVolumeSource(hostPath)).withName(name).build()) + .withHostPath(new HostPathVolumeSource(hostPath)) + .withName(s"executor-volume-$volumeCount") + .build()) containerBuilder.addToVolumeMounts(new VolumeMountBuilder() .withMountPath(containerPath) - .withName(name) + .withName(s"executor-volume-$volumeCount") .build()) + volumeCount += 1 + case Array(hostPath, containerPath, "ro") => + podBuilder + .withVolumes(new VolumeBuilder() + .withHostPath(new HostPathVolumeSource(hostPath)) + .withName(s"executor-volume-$volumeCount") + .build()) + containerBuilder.addToVolumeMounts(new VolumeMountBuilder() + .withMountPath(containerPath) + .withName(s"executor-volume-$volumeCount") + .withReadOnly(true) + .build()) + volumeCount += 1 + case Array(hostPath, containerPath, "rw") => + podBuilder + .withVolumes(new VolumeBuilder() + .withHostPath(new HostPathVolumeSource(hostPath)) + .withName(s"executor-volume-$volumeCount") + .build()) + containerBuilder.addToVolumeMounts(new VolumeMountBuilder() + .withMountPath(containerPath) + .withName(s"executor-volume-$volumeCount") + .withReadOnly(false) + .build()) + volumeCount += 1 case spec => None } From 17258a3967b6d8b2170c3c7d7186449c3155fef1 Mon Sep 17 00:00:00 2001 From: madanadit Date: Thu, 5 Apr 2018 15:05:07 -0700 Subject: [PATCH 03/18] Refactor --- .../deploy/k8s/features/BasicExecutorFeatureStep.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index 41e2ba11ab8b6..7cf2e3f3c7fb7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -170,11 +170,10 @@ private[spark] class BasicExecutorFeatureStep( .endSpec() .build() - val volumes = kubernetesConf.get(KUBERNETES_EXECUTOR_VOLUMES) - val executorPodAndContainerWithVolumes = - KubernetesUtils.addVolumes(executorPod, containerWithLimitCores, volumes) - val executorPodWithVolumes = executorPodAndContainerWithVolumes._1 - val executorContainerWithVolumes = executorPodAndContainerWithVolumes._2 + val (executorPodWithVolumes, executorContainerWithVolumes) = + KubernetesUtils.addVolumes(executorPod, + executorContainer, + kubernetesConf.get(KUBERNETES_EXECUTOR_VOLUMES)) SparkPod(executorPodWithVolumes, executorContainerWithVolumes) } From 6fff716cbe42e184059660cf2009f594048a6420 Mon Sep 17 00:00:00 2001 From: madanadit Date: Fri, 6 Apr 2018 11:18:09 -0700 Subject: [PATCH 04/18] Fix style --- .../spark/deploy/k8s/KubernetesUtils.scala | 77 +++++++++---------- 1 file changed, 35 insertions(+), 42 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index b068da02ea9eb..545d9ee0bc0c3 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -38,57 +38,50 @@ private[spark] object KubernetesUtils { } /** - * Parse a comma-delimited list of volume specs, each of which takes the form - * hostPath:containerPath[:ro|rw]; and add volume to pod and volume mount to container. - * - * @param pod original specification of the pod - * @param container original specification of the container - * @param volumes list of volume specs - * @return the pod with the init-container added to the list of InitContainers + * Parse a comma-delimited list of volume specs, each of which takes the form + * hostPath:containerPath[:ro|rw]; and add volume to pod and volume mount to container. + * + * @param pod original specification of the pod + * @param container original specification of the container + * @param volumes list of volume specs + * @return the pod with the init-container added to the list of InitContainers */ def addVolumes(pod: Pod, container : Container, volumes: String): (Pod, Container) = { val podBuilder = new PodBuilder(pod).editOrNewSpec() val containerBuilder = new ContainerBuilder(container) var volumeCount = 0 volumes.split(",").map(_.split(":")).map { spec => + var hostPath: Option[String] = None + var containerPath: Option[String] = None + var readOnly: Option[Boolean] = None spec match { - case Array(hostPath, containerPath) => - podBuilder - .withVolumes(new VolumeBuilder() - .withHostPath(new HostPathVolumeSource(hostPath)) - .withName(s"executor-volume-$volumeCount") - .build()) - containerBuilder.addToVolumeMounts(new VolumeMountBuilder() - .withMountPath(containerPath) - .withName(s"executor-volume-$volumeCount") - .build()) - volumeCount += 1 - case Array(hostPath, containerPath, "ro") => - podBuilder - .withVolumes(new VolumeBuilder() - .withHostPath(new HostPathVolumeSource(hostPath)) - .withName(s"executor-volume-$volumeCount") - .build()) - containerBuilder.addToVolumeMounts(new VolumeMountBuilder() - .withMountPath(containerPath) - .withName(s"executor-volume-$volumeCount") - .withReadOnly(true) - .build()) - volumeCount += 1 - case Array(hostPath, containerPath, "rw") => - podBuilder - .withVolumes(new VolumeBuilder() - .withHostPath(new HostPathVolumeSource(hostPath)) - .withName(s"executor-volume-$volumeCount") - .build()) - containerBuilder.addToVolumeMounts(new VolumeMountBuilder() - .withMountPath(containerPath) + case Array(hostPathV, containerPathV) => + hostPath = Some(hostPathV) + containerPath = Some(containerPathV) + case Array(hostPathV, containerPathV, "ro") => + hostPath = Some(hostPathV) + containerPath = Some(containerPathV) + readOnly = Some(true) + case Array(hostPathV, containerPathV, "rw") => + hostPath = Some(hostPathV) + containerPath = Some(containerPathV) + readOnly = Some(false) + } + if (hostPath.isDefined && containerPath.isDefined) { + podBuilder + .withVolumes(new VolumeBuilder() + .withHostPath(new HostPathVolumeSource(hostPath.get)) .withName(s"executor-volume-$volumeCount") - .withReadOnly(false) .build()) - volumeCount += 1 - case spec => - None + val volumeBuilder = new VolumeMountBuilder() + .withMountPath(containerPath.get) + .withName(s"executor-volume-$volumeCount") + if (readOnly.isDefined) { + containerBuilder.addToVolumeMounts(volumeBuilder.withReadOnly(readOnly.get).build()) + } else { + containerBuilder.addToVolumeMounts(volumeBuilder.build()) + } + volumeCount += 1 } } (podBuilder.endSpec().build(), containerBuilder.build()) From f961b33aa34ad7908ac6d77ded251e18bf6e835a Mon Sep 17 00:00:00 2001 From: madanadit Date: Fri, 6 Apr 2018 12:57:45 -0700 Subject: [PATCH 05/18] Add unit tests --- .../spark/deploy/k8s/KubernetesUtils.scala | 4 +- .../BasicExecutorFeatureStepSuite.scala | 55 +++++++++++++++++++ 2 files changed, 57 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index 545d9ee0bc0c3..b25e34209c5cf 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -71,11 +71,11 @@ private[spark] object KubernetesUtils { podBuilder .withVolumes(new VolumeBuilder() .withHostPath(new HostPathVolumeSource(hostPath.get)) - .withName(s"executor-volume-$volumeCount") + .withName(s"hostPath-volume-$volumeCount") .build()) val volumeBuilder = new VolumeMountBuilder() .withMountPath(containerPath.get) - .withName(s"executor-volume-$volumeCount") + .withName(s"hostPath-volume-$volumeCount") if (readOnly.isDefined) { containerBuilder.addToVolumeMounts(volumeBuilder.withReadOnly(readOnly.get).build()) } else { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index a764f7630b5c8..ef5806835cc18 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -152,6 +152,61 @@ class BasicExecutorFeatureStepSuite checkOwnerReferences(executor.pod, DRIVER_POD_UID) } + test("single executor hostPath volume gets mounted") { + val conf = baseConf.clone() + conf.set(KUBERNETES_EXECUTOR_VOLUMES, "/tmp/mount:/opt/mount") + + val step = new BasicExecutorFeatureStep( + KubernetesConf( + conf, + KubernetesExecutorSpecificConf("1", DRIVER_POD), + RESOURCE_NAME_PREFIX, + APP_ID, + LABELS, + ANNOTATIONS, + Map.empty, + Map.empty)) + val executor = step.configurePod(SparkPod.initialPod()) + + assert(executor.container.getImage === EXECUTOR_IMAGE) + assert(executor.container.getVolumeMounts.size() === 1) + assert(executor.container.getVolumeMounts.get(0).getName === "hostPath-volume-1") + assert(executor.container.getVolumeMounts.get(0).getMountPath === "/opt/mount") + + assert(executor.pod.getSpec.getVolumes.size() === 1) + assert(executor.pod.getSpec.getVolumes.get(0).getHostPath === "/tmp/mount") + + checkOwnerReferences(executor.pod, DRIVER_POD_UID) + } + + test("multiple executor hostPath volumes get mounted") { + val conf = baseConf.clone() + conf.set(KUBERNETES_EXECUTOR_VOLUMES, "/tmp/mount1:/opt/mount1,/tmp/mount2:/opt/mount2") + val step = new BasicExecutorFeatureStep( + KubernetesConf( + conf, + KubernetesExecutorSpecificConf("1", DRIVER_POD), + RESOURCE_NAME_PREFIX, + APP_ID, + LABELS, + ANNOTATIONS, + Map.empty, + Map.empty)) + val executor = step.configurePod(SparkPod.initialPod()) + + assert(executor.container.getImage === EXECUTOR_IMAGE) + assert(executor.container.getVolumeMounts.size() === 2) + assert(executor.container.getVolumeMounts.get(0).getName === "hostPath-volume-1") + assert(executor.container.getVolumeMounts.get(0).getMountPath === "/opt/mount1") + assert(executor.container.getVolumeMounts.get(1).getName === "hostPath-volume-2") + assert(executor.container.getVolumeMounts.get(1).getMountPath === "/opt/mount2") + + assert(executor.pod.getSpec.getVolumes.size() === 2) + assert(executor.pod.getSpec.getVolumes.get(0).getHostPath === "/tmp/mount1") + assert(executor.pod.getSpec.getVolumes.get(1).getHostPath === "/tmp/mount2") + checkOwnerReferences(executor.pod, DRIVER_POD_UID) + } + // There is always exactly one controller reference, and it points to the driver pod. private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = { assert(executor.getMetadata.getOwnerReferences.size() === 1) From af4d9baae8d637215d2be645b377fa5dbd714e94 Mon Sep 17 00:00:00 2001 From: madanadit Date: Fri, 6 Apr 2018 13:00:18 -0700 Subject: [PATCH 06/18] Update comment --- .../scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index b25e34209c5cf..8d882ddcdb321 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -44,7 +44,7 @@ private[spark] object KubernetesUtils { * @param pod original specification of the pod * @param container original specification of the container * @param volumes list of volume specs - * @return the pod with the init-container added to the list of InitContainers + * @return a tuple of (pod with the volume(s) added, container with mount(s) added) */ def addVolumes(pod: Pod, container : Container, volumes: String): (Pod, Container) = { val podBuilder = new PodBuilder(pod).editOrNewSpec() From 13e10493fa6f3b40c3eb406d0dc2f88d09ce20b8 Mon Sep 17 00:00:00 2001 From: madanadit Date: Fri, 6 Apr 2018 16:56:31 -0700 Subject: [PATCH 07/18] Fix unit tests --- .../scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala | 5 +++-- .../deploy/k8s/features/BasicExecutorFeatureStepSuite.scala | 6 +++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index 8d882ddcdb321..b2f5b6801b8bf 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -66,10 +66,11 @@ private[spark] object KubernetesUtils { hostPath = Some(hostPathV) containerPath = Some(containerPathV) readOnly = Some(false) + case spec => + None } if (hostPath.isDefined && containerPath.isDefined) { - podBuilder - .withVolumes(new VolumeBuilder() + podBuilder.addToVolumes(new VolumeBuilder() .withHostPath(new HostPathVolumeSource(hostPath.get)) .withName(s"hostPath-volume-$volumeCount") .build()) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index ef5806835cc18..9204e21de7424 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -170,7 +170,7 @@ class BasicExecutorFeatureStepSuite assert(executor.container.getImage === EXECUTOR_IMAGE) assert(executor.container.getVolumeMounts.size() === 1) - assert(executor.container.getVolumeMounts.get(0).getName === "hostPath-volume-1") + assert(executor.container.getVolumeMounts.get(0).getName === "hostPath-volume-0") assert(executor.container.getVolumeMounts.get(0).getMountPath === "/opt/mount") assert(executor.pod.getSpec.getVolumes.size() === 1) @@ -196,9 +196,9 @@ class BasicExecutorFeatureStepSuite assert(executor.container.getImage === EXECUTOR_IMAGE) assert(executor.container.getVolumeMounts.size() === 2) - assert(executor.container.getVolumeMounts.get(0).getName === "hostPath-volume-1") + assert(executor.container.getVolumeMounts.get(0).getName === "hostPath-volume-0") assert(executor.container.getVolumeMounts.get(0).getMountPath === "/opt/mount1") - assert(executor.container.getVolumeMounts.get(1).getName === "hostPath-volume-2") + assert(executor.container.getVolumeMounts.get(1).getName === "hostPath-volume-1") assert(executor.container.getVolumeMounts.get(1).getMountPath === "/opt/mount2") assert(executor.pod.getSpec.getVolumes.size() === 2) From 7a25d76e386c0d8157eba0ccc0517bea3b7f7f0e Mon Sep 17 00:00:00 2001 From: madanadit Date: Mon, 16 Apr 2018 15:41:26 -0700 Subject: [PATCH 08/18] Fix typo --- .../spark/deploy/k8s/features/BasicExecutorFeatureStep.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index 7cf2e3f3c7fb7..3e65c50c0b3f4 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -172,7 +172,7 @@ private[spark] class BasicExecutorFeatureStep( val (executorPodWithVolumes, executorContainerWithVolumes) = KubernetesUtils.addVolumes(executorPod, - executorContainer, + containerWithLimitCores, kubernetesConf.get(KUBERNETES_EXECUTOR_VOLUMES)) SparkPod(executorPodWithVolumes, executorContainerWithVolumes) From a5a277a065dec156e976478de7767062bcf1da13 Mon Sep 17 00:00:00 2001 From: madanadit Date: Tue, 17 Apr 2018 14:51:58 -0700 Subject: [PATCH 09/18] Change configuration format --- .../org/apache/spark/deploy/k8s/Config.scala | 15 +-- .../spark/deploy/k8s/KubernetesUtils.scala | 109 ++++++++++++------ .../deploy/k8s/KubernetesVolumeSpec.scala | 26 +++++ .../features/BasicExecutorFeatureStep.scala | 10 +- 4 files changed, 113 insertions(+), 47 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 1e41d0f001bcf..96f2a0a88d467 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -117,13 +117,6 @@ private[spark] object Config extends Logging { .stringConf .createWithDefault("spark") - val KUBERNETES_EXECUTOR_VOLUMES = - ConfigBuilder("spark.kubernetes.executor.volumes") - .doc("List of volumes mounted into the executor container. The format of this property is " + - "a comma-separated list of mappings following the form hostPath:containerPath[:ro|rw]") - .stringConf - .createWithDefault("") - val KUBERNETES_ALLOCATION_BATCH_SIZE = ConfigBuilder("spark.kubernetes.allocation.batch.size") .doc("Number of pods to launch at once in each round of executor allocation.") @@ -169,10 +162,18 @@ private[spark] object Config extends Logging { val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label." val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation." val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets." + val KUBERNETES_DRIVER_VOLUMES_PREFIX = "spark.kubernetes.driver.volumes." val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label." val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation." val KUBERNETES_EXECUTOR_SECRETS_PREFIX = "spark.kubernetes.executor.secrets." + val KUBERNETES_EXECUTOR_VOLUMES_PREFIX = "spark.kubernetes.executor.volumes." + + val KUBERNETES_VOLUMES_HOSTPATH_KEY = "hostPath" + val KUBERNETES_VOLUMES_MOUNT_KEY = "mount" + val KUBERNETES_VOLUMES_PATH_KEY = "path" + val KUBERNETES_VOLUMES_READONLY_KEY = "readOnly" + val KUBERNETES_VOLUMES_OPTIONS_KEY = "options" val KUBERNETES_DRIVER_ENV_PREFIX = "spark.kubernetes.driverEnv." } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index b2f5b6801b8bf..9eb8008704780 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -16,9 +16,12 @@ */ package org.apache.spark.deploy.k8s +import scala.collection.immutable.HashMap + import io.fabric8.kubernetes.api.model._ import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.util.Utils private[spark] object KubernetesUtils { @@ -38,52 +41,86 @@ private[spark] object KubernetesUtils { } /** - * Parse a comma-delimited list of volume specs, each of which takes the form - * hostPath:containerPath[:ro|rw]; and add volume to pod and volume mount to container. + * Extract Spark hostPath volume configuration properties with a given name prefix and + * return the result as a Map. + * + * @param sparkConf Spark configuration + * @param prefix the given property name prefix + * @return a Map storing with volume name as key and spec as value + */ + def parseHostPathVolumesWithPrefix( + sparkConf: SparkConf, + prefix: String): Map[String, KubernetesVolumeSpec] = { + val volumes : Map[String, KubernetesVolumeSpec] = HashMap[String, KubernetesVolumeSpec]() + val properties = sparkConf.getAllWithPrefix(s"$prefix$KUBERNETES_VOLUMES_HOSTPATH_KEY") + // Extract volume names + properties foreach { + case (k, _) => + val keys = k.split(".") + if (keys.nonEmpty && !volumes.contains(keys(0))) { + volumes(keys(0)) = KubernetesVolumeSpec.emptySpec() + } + } + // Populate spec + volumes foreach { + case (name, spec) => + properties foreach { + case (k, v) => + k.split(".") match { + case Array(`name`, KUBERNETES_VOLUMES_MOUNT_KEY, KUBERNETES_VOLUMES_PATH_KEY) => + spec.mountPath = Some(v) + case Array(`name`, KUBERNETES_VOLUMES_MOUNT_KEY, KUBERNETES_VOLUMES_READONLY_KEY) => + spec.mountReadOnly = Some(v.toBoolean) + case Array(`name`, KUBERNETES_VOLUMES_OPTIONS_KEY, option) => + if (!spec.optionsSpec.isDefined) { + spec.optionsSpec = Some(Map.empty) + } + spec.optionsSpec(option) = Some(v) + case _ => + None + } + } + } + volumes + } + + /** + * Given hostPath volume specs, add volume to pod and volume mount to container. * * @param pod original specification of the pod * @param container original specification of the container - * @param volumes list of volume specs + * @param volumes list of named volume specs * @return a tuple of (pod with the volume(s) added, container with mount(s) added) */ - def addVolumes(pod: Pod, container : Container, volumes: String): (Pod, Container) = { + def addHostPathVolumes( + pod: Pod, + container: Container, + volumes: Map[String, KubernetesVolumeSpec]): (Pod, Container) = { val podBuilder = new PodBuilder(pod).editOrNewSpec() val containerBuilder = new ContainerBuilder(container) - var volumeCount = 0 - volumes.split(",").map(_.split(":")).map { spec => - var hostPath: Option[String] = None - var containerPath: Option[String] = None - var readOnly: Option[Boolean] = None - spec match { - case Array(hostPathV, containerPathV) => - hostPath = Some(hostPathV) - containerPath = Some(containerPathV) - case Array(hostPathV, containerPathV, "ro") => - hostPath = Some(hostPathV) - containerPath = Some(containerPathV) - readOnly = Some(true) - case Array(hostPathV, containerPathV, "rw") => - hostPath = Some(hostPathV) - containerPath = Some(containerPathV) - readOnly = Some(false) - case spec => - None - } - if (hostPath.isDefined && containerPath.isDefined) { - podBuilder.addToVolumes(new VolumeBuilder() + volumes foreach { + case (name, spec) => + var hostPath: Option[String] = None + if (spec.optionsSpec.isDefined && spec.optionsSpec.contains(KUBERNETES_VOLUMES_PATH_KEY)) { + hostPath = Some(spec.optionsSpec(KUBERNETES_VOLUMES_PATH_KEY)) + } + if (hostPath.isDefined && spec.mountPath.isDefined) { + podBuilder.addToVolumes(new VolumeBuilder() .withHostPath(new HostPathVolumeSource(hostPath.get)) - .withName(s"hostPath-volume-$volumeCount") + .withName(name) .build()) - val volumeBuilder = new VolumeMountBuilder() - .withMountPath(containerPath.get) - .withName(s"hostPath-volume-$volumeCount") - if (readOnly.isDefined) { - containerBuilder.addToVolumeMounts(volumeBuilder.withReadOnly(readOnly.get).build()) - } else { - containerBuilder.addToVolumeMounts(volumeBuilder.build()) + val volumeBuilder = new VolumeMountBuilder() + .withMountPath(spec.mountPath.get) + .withName(name) + if (spec.mountReadOnly.isDefined) { + containerBuilder + .addToVolumeMounts(volumeBuilder + .withReadOnly(spec.mountReadOnly.get) + .build()) + } else { + containerBuilder.addToVolumeMounts(volumeBuilder.build()) + } } - volumeCount += 1 - } } (podBuilder.endSpec().build(), containerBuilder.build()) } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala new file mode 100644 index 0000000000000..5eaa63a089653 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +private[spark] case class KubernetesVolumeSpec( + var mountPath: Option[String], + var mountReadOnly: Option[Boolean], + var optionsSpec: Option[Map[String, String]]) + +private[spark] object KubernetesVolumeSpec { + def emptySpec(): KubernetesVolumeSpec = new KubernetesVolumeSpec(None, None, None) +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index 3e65c50c0b3f4..581f60b59a4ab 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -170,12 +170,14 @@ private[spark] class BasicExecutorFeatureStep( .endSpec() .build() - val (executorPodWithVolumes, executorContainerWithVolumes) = - KubernetesUtils.addVolumes(executorPod, + val executorHostPathVolumesSpec = KubernetesUtils.parseHostPathVolumesWithPrefix( + kubernetesConf.sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX) + val (executorPodWithHostPathVolumes, executorContainerWithHostPathVolumes) = + KubernetesUtils.addHostPathVolumes(executorPod, containerWithLimitCores, - kubernetesConf.get(KUBERNETES_EXECUTOR_VOLUMES)) + executorHostPathVolumesSpec) - SparkPod(executorPodWithVolumes, executorContainerWithVolumes) + SparkPod(executorPodWithHostPathVolumes, executorContainerWithHostPathVolumes) } override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty From 772128e13895d6f313efa3cbc38947db51c3e493 Mon Sep 17 00:00:00 2001 From: madanadit Date: Tue, 17 Apr 2018 16:34:37 -0700 Subject: [PATCH 10/18] Fix build --- .../apache/spark/deploy/k8s/KubernetesUtils.scala | 15 ++++++--------- .../spark/deploy/k8s/KubernetesVolumeSpec.scala | 6 ++++-- .../k8s/features/BasicExecutorFeatureStep.scala | 1 + 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index 9eb8008704780..55e729ea71acd 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.deploy.k8s -import scala.collection.immutable.HashMap +import scala.collection.mutable.HashMap import io.fabric8.kubernetes.api.model._ @@ -51,14 +51,14 @@ private[spark] object KubernetesUtils { def parseHostPathVolumesWithPrefix( sparkConf: SparkConf, prefix: String): Map[String, KubernetesVolumeSpec] = { - val volumes : Map[String, KubernetesVolumeSpec] = HashMap[String, KubernetesVolumeSpec]() + val volumes = HashMap[String, KubernetesVolumeSpec]() val properties = sparkConf.getAllWithPrefix(s"$prefix$KUBERNETES_VOLUMES_HOSTPATH_KEY") // Extract volume names properties foreach { case (k, _) => val keys = k.split(".") if (keys.nonEmpty && !volumes.contains(keys(0))) { - volumes(keys(0)) = KubernetesVolumeSpec.emptySpec() + volumes.update(keys(0), KubernetesVolumeSpec.emptySpec()) } } // Populate spec @@ -72,16 +72,13 @@ private[spark] object KubernetesUtils { case Array(`name`, KUBERNETES_VOLUMES_MOUNT_KEY, KUBERNETES_VOLUMES_READONLY_KEY) => spec.mountReadOnly = Some(v.toBoolean) case Array(`name`, KUBERNETES_VOLUMES_OPTIONS_KEY, option) => - if (!spec.optionsSpec.isDefined) { - spec.optionsSpec = Some(Map.empty) - } - spec.optionsSpec(option) = Some(v) + spec.optionsSpec.update(option, v) case _ => None } } } - volumes + volumes.toMap } /** @@ -101,7 +98,7 @@ private[spark] object KubernetesUtils { volumes foreach { case (name, spec) => var hostPath: Option[String] = None - if (spec.optionsSpec.isDefined && spec.optionsSpec.contains(KUBERNETES_VOLUMES_PATH_KEY)) { + if (spec.optionsSpec.contains(KUBERNETES_VOLUMES_PATH_KEY)) { hostPath = Some(spec.optionsSpec(KUBERNETES_VOLUMES_PATH_KEY)) } if (hostPath.isDefined && spec.mountPath.isDefined) { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala index 5eaa63a089653..afd6524aed548 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala @@ -16,11 +16,13 @@ */ package org.apache.spark.deploy.k8s +import scala.collection.mutable.Map + private[spark] case class KubernetesVolumeSpec( var mountPath: Option[String], var mountReadOnly: Option[Boolean], - var optionsSpec: Option[Map[String, String]]) + var optionsSpec: Map[String, String]) private[spark] object KubernetesVolumeSpec { - def emptySpec(): KubernetesVolumeSpec = new KubernetesVolumeSpec(None, None, None) + def emptySpec(): KubernetesVolumeSpec = new KubernetesVolumeSpec(None, None, Map()) } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index 581f60b59a4ab..14acabb39ee51 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -17,6 +17,7 @@ package org.apache.spark.deploy.k8s.features import scala.collection.JavaConverters._ + import io.fabric8.kubernetes.api.model._ import org.apache.spark.SparkException From a393b92c93d60bdcf96025ae24d6c1fbecf17f9b Mon Sep 17 00:00:00 2001 From: madanadit Date: Tue, 17 Apr 2018 16:44:08 -0700 Subject: [PATCH 11/18] Fix test --- .../k8s/features/BasicExecutorFeatureStepSuite.scala | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index 9204e21de7424..1d528fcc91118 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -154,7 +154,8 @@ class BasicExecutorFeatureStepSuite test("single executor hostPath volume gets mounted") { val conf = baseConf.clone() - conf.set(KUBERNETES_EXECUTOR_VOLUMES, "/tmp/mount:/opt/mount") + conf.set("spark.kubernetes.executor.volumes.hostPath.hostPath-1.mount.path", "/opt/mount") + conf.set("spark.kubernetes.executor.volumes.hostPath.hostPath-1.options.path", "/tmp/mount") val step = new BasicExecutorFeatureStep( KubernetesConf( @@ -181,7 +182,10 @@ class BasicExecutorFeatureStepSuite test("multiple executor hostPath volumes get mounted") { val conf = baseConf.clone() - conf.set(KUBERNETES_EXECUTOR_VOLUMES, "/tmp/mount1:/opt/mount1,/tmp/mount2:/opt/mount2") + conf.set("spark.kubernetes.executor.volumes.hostPath.hostPath-1.mount.path", "/opt/mount1") + conf.set("spark.kubernetes.executor.volumes.hostPath.hostPath-1.options.path", "/tmp/mount1") + conf.set("spark.kubernetes.executor.volumes.hostPath.hostPath-2.mount.path", "/opt/mount2") + conf.set("spark.kubernetes.executor.volumes.hostPath.hostPath-2.options.path", "/tmp/mount2") val step = new BasicExecutorFeatureStep( KubernetesConf( conf, @@ -196,9 +200,9 @@ class BasicExecutorFeatureStepSuite assert(executor.container.getImage === EXECUTOR_IMAGE) assert(executor.container.getVolumeMounts.size() === 2) - assert(executor.container.getVolumeMounts.get(0).getName === "hostPath-volume-0") + assert(executor.container.getVolumeMounts.get(0).getName === "hostPath-1") assert(executor.container.getVolumeMounts.get(0).getMountPath === "/opt/mount1") - assert(executor.container.getVolumeMounts.get(1).getName === "hostPath-volume-1") + assert(executor.container.getVolumeMounts.get(1).getName === "hostPath-2") assert(executor.container.getVolumeMounts.get(1).getMountPath === "/opt/mount2") assert(executor.pod.getSpec.getVolumes.size() === 2) From b9b3dcb22ded628f0abe1caa0beb2b15da6ccc49 Mon Sep 17 00:00:00 2001 From: madanadit Date: Tue, 17 Apr 2018 17:39:14 -0700 Subject: [PATCH 12/18] Fetch properties correctly --- .../scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index 55e729ea71acd..6ab19ba4f5f00 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -52,7 +52,7 @@ private[spark] object KubernetesUtils { sparkConf: SparkConf, prefix: String): Map[String, KubernetesVolumeSpec] = { val volumes = HashMap[String, KubernetesVolumeSpec]() - val properties = sparkConf.getAllWithPrefix(s"$prefix$KUBERNETES_VOLUMES_HOSTPATH_KEY") + val properties = sparkConf.getAllWithPrefix(s"$prefix$KUBERNETES_VOLUMES_HOSTPATH_KEY.") // Extract volume names properties foreach { case (k, _) => From 81a7811b06a0195e84ffeddc135875da6c500a7e Mon Sep 17 00:00:00 2001 From: madanadit Date: Tue, 17 Apr 2018 22:21:11 -0700 Subject: [PATCH 13/18] Fix test cases --- .../spark/deploy/k8s/KubernetesUtils.scala | 22 +++++++++---------- .../BasicExecutorFeatureStepSuite.scala | 8 +++---- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index 6ab19ba4f5f00..821b6f42585d3 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -52,27 +52,27 @@ private[spark] object KubernetesUtils { sparkConf: SparkConf, prefix: String): Map[String, KubernetesVolumeSpec] = { val volumes = HashMap[String, KubernetesVolumeSpec]() - val properties = sparkConf.getAllWithPrefix(s"$prefix$KUBERNETES_VOLUMES_HOSTPATH_KEY.") + val properties = sparkConf.getAllWithPrefix(s"$prefix$KUBERNETES_VOLUMES_HOSTPATH_KEY.").toList // Extract volume names - properties foreach { - case (k, _) => - val keys = k.split(".") + properties.foreach { + k => + val keys = k._1.split("\\.") if (keys.nonEmpty && !volumes.contains(keys(0))) { volumes.update(keys(0), KubernetesVolumeSpec.emptySpec()) } } // Populate spec - volumes foreach { + volumes.foreach { case (name, spec) => - properties foreach { - case (k, v) => - k.split(".") match { + properties.foreach { + k => + k._1.split("\\.") match { case Array(`name`, KUBERNETES_VOLUMES_MOUNT_KEY, KUBERNETES_VOLUMES_PATH_KEY) => - spec.mountPath = Some(v) + spec.mountPath = Some(k._2) case Array(`name`, KUBERNETES_VOLUMES_MOUNT_KEY, KUBERNETES_VOLUMES_READONLY_KEY) => - spec.mountReadOnly = Some(v.toBoolean) + spec.mountReadOnly = Some(k._2.toBoolean) case Array(`name`, KUBERNETES_VOLUMES_OPTIONS_KEY, option) => - spec.optionsSpec.update(option, v) + spec.optionsSpec.update(option, k._2) case _ => None } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index 1d528fcc91118..520e5fe597688 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -171,11 +171,11 @@ class BasicExecutorFeatureStepSuite assert(executor.container.getImage === EXECUTOR_IMAGE) assert(executor.container.getVolumeMounts.size() === 1) - assert(executor.container.getVolumeMounts.get(0).getName === "hostPath-volume-0") + assert(executor.container.getVolumeMounts.get(0).getName === "hostPath-1") assert(executor.container.getVolumeMounts.get(0).getMountPath === "/opt/mount") assert(executor.pod.getSpec.getVolumes.size() === 1) - assert(executor.pod.getSpec.getVolumes.get(0).getHostPath === "/tmp/mount") + assert(executor.pod.getSpec.getVolumes.get(0).getHostPath.getPath === "/tmp/mount") checkOwnerReferences(executor.pod, DRIVER_POD_UID) } @@ -206,8 +206,8 @@ class BasicExecutorFeatureStepSuite assert(executor.container.getVolumeMounts.get(1).getMountPath === "/opt/mount2") assert(executor.pod.getSpec.getVolumes.size() === 2) - assert(executor.pod.getSpec.getVolumes.get(0).getHostPath === "/tmp/mount1") - assert(executor.pod.getSpec.getVolumes.get(1).getHostPath === "/tmp/mount2") + assert(executor.pod.getSpec.getVolumes.get(0).getHostPath.getPath === "/tmp/mount1") + assert(executor.pod.getSpec.getVolumes.get(1).getHostPath.getPath === "/tmp/mount2") checkOwnerReferences(executor.pod, DRIVER_POD_UID) } From 95d1b0d8c681cd46f47c8ab1692172d0b3b0aba8 Mon Sep 17 00:00:00 2001 From: madanadit Date: Tue, 17 Apr 2018 23:42:56 -0700 Subject: [PATCH 14/18] Abstract tests --- .../BasicExecutorFeatureStepSuite.scala | 60 +++++++------------ 1 file changed, 22 insertions(+), 38 deletions(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index 520e5fe597688..865a2d1dcc251 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -153,39 +153,21 @@ class BasicExecutorFeatureStepSuite } test("single executor hostPath volume gets mounted") { - val conf = baseConf.clone() - conf.set("spark.kubernetes.executor.volumes.hostPath.hostPath-1.mount.path", "/opt/mount") - conf.set("spark.kubernetes.executor.volumes.hostPath.hostPath-1.options.path", "/tmp/mount") - - val step = new BasicExecutorFeatureStep( - KubernetesConf( - conf, - KubernetesExecutorSpecificConf("1", DRIVER_POD), - RESOURCE_NAME_PREFIX, - APP_ID, - LABELS, - ANNOTATIONS, - Map.empty, - Map.empty)) - val executor = step.configurePod(SparkPod.initialPod()) - - assert(executor.container.getImage === EXECUTOR_IMAGE) - assert(executor.container.getVolumeMounts.size() === 1) - assert(executor.container.getVolumeMounts.get(0).getName === "hostPath-1") - assert(executor.container.getVolumeMounts.get(0).getMountPath === "/opt/mount") - - assert(executor.pod.getSpec.getVolumes.size() === 1) - assert(executor.pod.getSpec.getVolumes.get(0).getHostPath.getPath === "/tmp/mount") - - checkOwnerReferences(executor.pod, DRIVER_POD_UID) + hostPathVolumeTest(1) } test("multiple executor hostPath volumes get mounted") { + hostPathVolumeTest(2) + } + + private def hostPathVolumeTest(numVolumes: Int): Unit = { val conf = baseConf.clone() - conf.set("spark.kubernetes.executor.volumes.hostPath.hostPath-1.mount.path", "/opt/mount1") - conf.set("spark.kubernetes.executor.volumes.hostPath.hostPath-1.options.path", "/tmp/mount1") - conf.set("spark.kubernetes.executor.volumes.hostPath.hostPath-2.mount.path", "/opt/mount2") - conf.set("spark.kubernetes.executor.volumes.hostPath.hostPath-2.options.path", "/tmp/mount2") + for (i <- 0 until numVolumes) { + conf.set(s"spark.kubernetes.executor.volumes.hostPath.hostPath-$i.mount.path", + s"/opt/mount$i") + conf.set(s"spark.kubernetes.executor.volumes.hostPath.hostPath-$i.options.path", + s"/tmp/mount$i") + } val step = new BasicExecutorFeatureStep( KubernetesConf( conf, @@ -199,15 +181,17 @@ class BasicExecutorFeatureStepSuite val executor = step.configurePod(SparkPod.initialPod()) assert(executor.container.getImage === EXECUTOR_IMAGE) - assert(executor.container.getVolumeMounts.size() === 2) - assert(executor.container.getVolumeMounts.get(0).getName === "hostPath-1") - assert(executor.container.getVolumeMounts.get(0).getMountPath === "/opt/mount1") - assert(executor.container.getVolumeMounts.get(1).getName === "hostPath-2") - assert(executor.container.getVolumeMounts.get(1).getMountPath === "/opt/mount2") - - assert(executor.pod.getSpec.getVolumes.size() === 2) - assert(executor.pod.getSpec.getVolumes.get(0).getHostPath.getPath === "/tmp/mount1") - assert(executor.pod.getSpec.getVolumes.get(1).getHostPath.getPath === "/tmp/mount2") + assert(executor.container.getVolumeMounts.size() === numVolumes) + assert(executor.pod.getSpec.getVolumes.size() === numVolumes) + for (i <- 0 until numVolumes) { + assert(executor.container.getVolumeMounts.asScala + .exists(_.getName == s"hostPath-$i")) + assert(executor.container.getVolumeMounts.asScala + .exists(_.getMountPath == s"/opt/mount$i")) + assert(executor.pod.getSpec.getVolumes.asScala + .exists(_.getHostPath.getPath == s"/tmp/mount$i")) + } + checkOwnerReferences(executor.pod, DRIVER_POD_UID) } From facde97b365a7acb02c41e5ef076a9ea0f1edff9 Mon Sep 17 00:00:00 2001 From: madanadit Date: Tue, 17 Apr 2018 23:59:43 -0700 Subject: [PATCH 15/18] Add readOnly option --- .../BasicExecutorFeatureStepSuite.scala | 27 ++++++++++++++----- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index 865a2d1dcc251..d568ea6e1896b 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -153,20 +153,32 @@ class BasicExecutorFeatureStepSuite } test("single executor hostPath volume gets mounted") { - hostPathVolumeTest(1) + hostPathVolumeTest(1, false) } test("multiple executor hostPath volumes get mounted") { - hostPathVolumeTest(2) + hostPathVolumeTest(2, false) } - private def hostPathVolumeTest(numVolumes: Int): Unit = { + test("single executor hostPath volume gets mounted w/ readOnly option") { + hostPathVolumeTest(1, true) + } + + test("multiple executor hostPath volumes get mounted w/ readOnly option") { + hostPathVolumeTest(2, true) + } + + private def hostPathVolumeTest(numVolumes: Int, readOnly: Boolean): Unit = { val conf = baseConf.clone() for (i <- 0 until numVolumes) { conf.set(s"spark.kubernetes.executor.volumes.hostPath.hostPath-$i.mount.path", s"/opt/mount$i") conf.set(s"spark.kubernetes.executor.volumes.hostPath.hostPath-$i.options.path", s"/tmp/mount$i") + if (readOnly) { + conf.set(s"spark.kubernetes.executor.volumes.hostPath.hostPath-$i.options.readOnly", + "true") + } } val step = new BasicExecutorFeatureStep( KubernetesConf( @@ -185,11 +197,12 @@ class BasicExecutorFeatureStepSuite assert(executor.pod.getSpec.getVolumes.size() === numVolumes) for (i <- 0 until numVolumes) { assert(executor.container.getVolumeMounts.asScala - .exists(_.getName == s"hostPath-$i")) - assert(executor.container.getVolumeMounts.asScala - .exists(_.getMountPath == s"/opt/mount$i")) + .exists(v => (v.getName == s"hostPath-$i" && v.getMountPath == s"/opt/mount$i"))) assert(executor.pod.getSpec.getVolumes.asScala - .exists(_.getHostPath.getPath == s"/tmp/mount$i")) + .exists(v => (v.getName == s"hostPath-$i" && v.getHostPath.getPath == s"/tmp/mount$i"))) + if (readOnly) { + assert(executor.container.getVolumeMounts.get(i).getReadOnly == true) + } } checkOwnerReferences(executor.pod, DRIVER_POD_UID) From ccdc7990ca8995ff86f46647f8a2949848f06380 Mon Sep 17 00:00:00 2001 From: madanadit Date: Wed, 18 Apr 2018 00:06:17 -0700 Subject: [PATCH 16/18] Fix test --- .../deploy/k8s/features/BasicExecutorFeatureStepSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index d568ea6e1896b..387b28483a699 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -176,7 +176,7 @@ class BasicExecutorFeatureStepSuite conf.set(s"spark.kubernetes.executor.volumes.hostPath.hostPath-$i.options.path", s"/tmp/mount$i") if (readOnly) { - conf.set(s"spark.kubernetes.executor.volumes.hostPath.hostPath-$i.options.readOnly", + conf.set(s"spark.kubernetes.executor.volumes.hostPath.hostPath-$i.mount.readOnly", "true") } } From 7c1be8aff51a462bf96012fafbbfec765424de53 Mon Sep 17 00:00:00 2001 From: madanadit Date: Wed, 18 Apr 2018 00:24:55 -0700 Subject: [PATCH 17/18] Driver hostPath volumes with tests --- .../k8s/features/BasicDriverFeatureStep.scala | 8 ++- .../BasicDriverFeatureStepSuite.scala | 59 +++++++++++++++++++ 2 files changed, 66 insertions(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index 07bdccbe0479d..0707c7af4a236 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -109,7 +109,13 @@ private[spark] class BasicDriverFeatureStep( .addToImagePullSecrets(conf.imagePullSecrets(): _*) .endSpec() .build() - SparkPod(driverPod, driverContainer) + + val driverHostPathVolumesSpec = KubernetesUtils.parseHostPathVolumesWithPrefix( + conf.sparkConf, KUBERNETES_DRIVER_VOLUMES_PREFIX) + val (driverPodWithHostPathVolumes, driverContainerWithHostPathVolumes) = + KubernetesUtils.addHostPathVolumes(driverPod, driverContainer, driverHostPathVolumesSpec) + + SparkPod(driverPodWithHostPathVolumes, driverContainerWithHostPathVolumes) } override def getAdditionalPodSystemProperties(): Map[String, String] = { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index eee85b8baa730..cd419ead9e199 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -150,4 +150,63 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { "spark.files" -> "https://localhost:9000/file1.txt,/opt/spark/file2.txt") assert(additionalProperties === expectedSparkConf) } + + test("single driver hostPath volume gets mounted") { + hostPathVolumeTest(1, false) + } + + test("multiple driver hostPath volumes get mounted") { + hostPathVolumeTest(2, false) + } + + test("single driver hostPath volume gets mounted w/ readOnly option") { + hostPathVolumeTest(1, true) + } + + test("multiple driver hostPath volumes get mounted w/ readOnly option") { + hostPathVolumeTest(2, true) + } + + private def hostPathVolumeTest(numVolumes: Int, readOnly: Boolean): Unit = { + val sparkConf = new SparkConf() + .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod") + .set(CONTAINER_IMAGE, "spark-driver:latest") + for (i <- 0 until numVolumes) { + sparkConf.set(s"spark.kubernetes.driver.volumes.hostPath.hostPath-$i.mount.path", + s"/opt/mount$i") + sparkConf.set(s"spark.kubernetes.driver.volumes.hostPath.hostPath-$i.options.path", + s"/tmp/mount$i") + if (readOnly) { + sparkConf.set(s"spark.kubernetes.driver.volumes.hostPath.hostPath-$i.mount.readOnly", + "true") + } + } + val kubernetesConf = KubernetesConf( + sparkConf, + KubernetesDriverSpecificConf( + None, + APP_NAME, + MAIN_CLASS, + APP_ARGS), + RESOURCE_NAME_PREFIX, + APP_ID, + DRIVER_LABELS, + DRIVER_ANNOTATIONS, + Map.empty, + Map.empty) + val step = new BasicDriverFeatureStep(kubernetesConf) + val driver = step.configurePod(SparkPod.initialPod()) + + assert(driver.container.getVolumeMounts.size() === numVolumes) + assert(driver.pod.getSpec.getVolumes.size() === numVolumes) + for (i <- 0 until numVolumes) { + assert(driver.container.getVolumeMounts.asScala + .exists(v => (v.getName == s"hostPath-$i" && v.getMountPath == s"/opt/mount$i"))) + assert(driver.pod.getSpec.getVolumes.asScala + .exists(v => (v.getName == s"hostPath-$i" && v.getHostPath.getPath == s"/tmp/mount$i"))) + if (readOnly) { + assert(driver.container.getVolumeMounts.get(i).getReadOnly == true) + } + } + } } From f482dfc370bace0c5315a2514604be2965bcbbaf Mon Sep 17 00:00:00 2001 From: madanadit Date: Thu, 19 Apr 2018 16:56:58 -0700 Subject: [PATCH 18/18] Refactor --- .../spark/deploy/k8s/KubernetesUtils.scala | 87 ----------- .../deploy/k8s/KubernetesVolumeUtils.scala | 142 ++++++++++++++++++ .../k8s/features/BasicDriverFeatureStep.scala | 16 +- .../features/BasicExecutorFeatureStep.scala | 13 +- 4 files changed, 157 insertions(+), 101 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index 821b6f42585d3..5bc070147d3a8 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -16,12 +16,7 @@ */ package org.apache.spark.deploy.k8s -import scala.collection.mutable.HashMap - -import io.fabric8.kubernetes.api.model._ - import org.apache.spark.SparkConf -import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.util.Utils private[spark] object KubernetesUtils { @@ -40,88 +35,6 @@ private[spark] object KubernetesUtils { sparkConf.getAllWithPrefix(prefix).toMap } - /** - * Extract Spark hostPath volume configuration properties with a given name prefix and - * return the result as a Map. - * - * @param sparkConf Spark configuration - * @param prefix the given property name prefix - * @return a Map storing with volume name as key and spec as value - */ - def parseHostPathVolumesWithPrefix( - sparkConf: SparkConf, - prefix: String): Map[String, KubernetesVolumeSpec] = { - val volumes = HashMap[String, KubernetesVolumeSpec]() - val properties = sparkConf.getAllWithPrefix(s"$prefix$KUBERNETES_VOLUMES_HOSTPATH_KEY.").toList - // Extract volume names - properties.foreach { - k => - val keys = k._1.split("\\.") - if (keys.nonEmpty && !volumes.contains(keys(0))) { - volumes.update(keys(0), KubernetesVolumeSpec.emptySpec()) - } - } - // Populate spec - volumes.foreach { - case (name, spec) => - properties.foreach { - k => - k._1.split("\\.") match { - case Array(`name`, KUBERNETES_VOLUMES_MOUNT_KEY, KUBERNETES_VOLUMES_PATH_KEY) => - spec.mountPath = Some(k._2) - case Array(`name`, KUBERNETES_VOLUMES_MOUNT_KEY, KUBERNETES_VOLUMES_READONLY_KEY) => - spec.mountReadOnly = Some(k._2.toBoolean) - case Array(`name`, KUBERNETES_VOLUMES_OPTIONS_KEY, option) => - spec.optionsSpec.update(option, k._2) - case _ => - None - } - } - } - volumes.toMap - } - - /** - * Given hostPath volume specs, add volume to pod and volume mount to container. - * - * @param pod original specification of the pod - * @param container original specification of the container - * @param volumes list of named volume specs - * @return a tuple of (pod with the volume(s) added, container with mount(s) added) - */ - def addHostPathVolumes( - pod: Pod, - container: Container, - volumes: Map[String, KubernetesVolumeSpec]): (Pod, Container) = { - val podBuilder = new PodBuilder(pod).editOrNewSpec() - val containerBuilder = new ContainerBuilder(container) - volumes foreach { - case (name, spec) => - var hostPath: Option[String] = None - if (spec.optionsSpec.contains(KUBERNETES_VOLUMES_PATH_KEY)) { - hostPath = Some(spec.optionsSpec(KUBERNETES_VOLUMES_PATH_KEY)) - } - if (hostPath.isDefined && spec.mountPath.isDefined) { - podBuilder.addToVolumes(new VolumeBuilder() - .withHostPath(new HostPathVolumeSource(hostPath.get)) - .withName(name) - .build()) - val volumeBuilder = new VolumeMountBuilder() - .withMountPath(spec.mountPath.get) - .withName(name) - if (spec.mountReadOnly.isDefined) { - containerBuilder - .addToVolumeMounts(volumeBuilder - .withReadOnly(spec.mountReadOnly.get) - .build()) - } else { - containerBuilder.addToVolumeMounts(volumeBuilder.build()) - } - } - } - (podBuilder.endSpec().build(), containerBuilder.build()) - } - def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = { opt1.foreach { _ => require(opt2.isEmpty, errMessage) } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala new file mode 100644 index 0000000000000..ed05acd23f5a9 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +import scala.collection.mutable.HashMap + +import io.fabric8.kubernetes.api.model._ + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ + +private[spark] object KubernetesVolumeUtils { + + /** + * Given hostPath volume specs, add volume to pod and volume mount to container. + * + * @param pod original specification of the pod + * @param container original specification of the container + * @param sparkConf Spark configuration + * @param prefix the prefix for volume configuration + * @return a tuple of (pod with the volume(s) added, container with mount(s) added) + */ + def addVolumes( + pod: Pod, + container: Container, + sparkConf: SparkConf, + prefix : String): (Pod, Container) = { + val hostPathVolumeSpecs = parseHostPathVolumesWithPrefix(sparkConf, prefix) + addHostPathVolumes(pod, container, hostPathVolumeSpecs) + } + + /** + * Extract Spark volume configuration properties with a given name prefix. + * + * @param sparkConf Spark configuration + * @param prefix the given property name prefix + * @param volumeTypeKey the given property name prefix + * @return a Map storing with volume name as key and spec as value + */ + def parseVolumesWithPrefix( + sparkConf: SparkConf, + prefix: String, + volumeTypeKey: String): Map[String, KubernetesVolumeSpec] = { + val volumes = HashMap[String, KubernetesVolumeSpec]() + val properties = sparkConf.getAllWithPrefix(s"$prefix$volumeTypeKey.").toList + // Extract volume names + properties.foreach { + k => + val keys = k._1.split("\\.") + if (keys.nonEmpty && !volumes.contains(keys(0))) { + volumes.update(keys(0), KubernetesVolumeSpec.emptySpec()) + } + } + // Populate spec + volumes.foreach { + case (name, spec) => + properties.foreach { + k => + k._1.split("\\.") match { + case Array(`name`, KUBERNETES_VOLUMES_MOUNT_KEY, KUBERNETES_VOLUMES_PATH_KEY) => + spec.mountPath = Some(k._2) + case Array(`name`, KUBERNETES_VOLUMES_MOUNT_KEY, KUBERNETES_VOLUMES_READONLY_KEY) => + spec.mountReadOnly = Some(k._2.toBoolean) + case Array(`name`, KUBERNETES_VOLUMES_OPTIONS_KEY, option) => + spec.optionsSpec.update(option, k._2) + case _ => + None + } + } + } + volumes.toMap + } + + /** + * Extract Spark hostPath volume configuration properties with a given name prefix and + * return the result as a Map. + * + * @param sparkConf Spark configuration + * @param prefix the given property name prefix + * @return a Map storing with volume name as key and spec as value + */ + def parseHostPathVolumesWithPrefix( + sparkConf: SparkConf, + prefix: String): Map[String, KubernetesVolumeSpec] = { + parseVolumesWithPrefix(sparkConf, prefix, KUBERNETES_VOLUMES_HOSTPATH_KEY) + } + + /** + * Given hostPath volume specs, add volume to pod and volume mount to container. + * + * @param pod original specification of the pod + * @param container original specification of the container + * @param volumes list of named volume specs + * @return a tuple of (pod with the volume(s) added, container with mount(s) added) + */ + def addHostPathVolumes( + pod: Pod, + container: Container, + volumes: Map[String, KubernetesVolumeSpec]): (Pod, Container) = { + val podBuilder = new PodBuilder(pod).editOrNewSpec() + val containerBuilder = new ContainerBuilder(container) + volumes foreach { + case (name, spec) => + var hostPath: Option[String] = None + if (spec.optionsSpec.contains(KUBERNETES_VOLUMES_PATH_KEY)) { + hostPath = Some(spec.optionsSpec(KUBERNETES_VOLUMES_PATH_KEY)) + } + if (hostPath.isDefined && spec.mountPath.isDefined) { + podBuilder.addToVolumes(new VolumeBuilder() + .withHostPath(new HostPathVolumeSource(hostPath.get)) + .withName(name) + .build()) + val volumeBuilder = new VolumeMountBuilder() + .withMountPath(spec.mountPath.get) + .withName(name) + if (spec.mountReadOnly.isDefined) { + containerBuilder + .addToVolumeMounts(volumeBuilder + .withReadOnly(spec.mountReadOnly.get) + .build()) + } else { + containerBuilder.addToVolumeMounts(volumeBuilder.build()) + } + } + } + (podBuilder.endSpec().build(), containerBuilder.build()) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index 0707c7af4a236..8f94f1620ead8 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -19,10 +19,10 @@ package org.apache.spark.deploy.k8s.features import scala.collection.JavaConverters._ import scala.collection.mutable -import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, HasMetadata, PodBuilder, QuantityBuilder} +import io.fabric8.kubernetes.api.model._ import org.apache.spark.SparkException -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesUtils, SparkPod} +import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.config._ @@ -110,12 +110,14 @@ private[spark] class BasicDriverFeatureStep( .endSpec() .build() - val driverHostPathVolumesSpec = KubernetesUtils.parseHostPathVolumesWithPrefix( - conf.sparkConf, KUBERNETES_DRIVER_VOLUMES_PREFIX) - val (driverPodWithHostPathVolumes, driverContainerWithHostPathVolumes) = - KubernetesUtils.addHostPathVolumes(driverPod, driverContainer, driverHostPathVolumesSpec) + val (driverPodWithVolumes, driverContainerVolumes) = + KubernetesVolumeUtils.addVolumes( + driverPod, + driverContainer, + conf.sparkConf, + KUBERNETES_DRIVER_VOLUMES_PREFIX) - SparkPod(driverPodWithHostPathVolumes, driverContainerWithHostPathVolumes) + SparkPod(driverPodWithVolumes, driverContainerVolumes) } override def getAdditionalPodSystemProperties(): Map[String, String] = { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index 14acabb39ee51..f4db6cc91c6db 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -21,7 +21,7 @@ import scala.collection.JavaConverters._ import io.fabric8.kubernetes.api.model._ import org.apache.spark.SparkException -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, KubernetesUtils, SparkPod} +import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.config.{EXECUTOR_CLASS_PATH, EXECUTOR_JAVA_OPTIONS, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD} @@ -171,14 +171,13 @@ private[spark] class BasicExecutorFeatureStep( .endSpec() .build() - val executorHostPathVolumesSpec = KubernetesUtils.parseHostPathVolumesWithPrefix( - kubernetesConf.sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX) - val (executorPodWithHostPathVolumes, executorContainerWithHostPathVolumes) = - KubernetesUtils.addHostPathVolumes(executorPod, + val (executorPodWithVolumes, executorContainerWithVolumes) = + KubernetesVolumeUtils.addVolumes(executorPod, containerWithLimitCores, - executorHostPathVolumesSpec) + kubernetesConf.sparkConf, + KUBERNETES_EXECUTOR_VOLUMES_PREFIX) - SparkPod(executorPodWithHostPathVolumes, executorContainerWithHostPathVolumes) + SparkPod(executorPodWithVolumes, executorContainerWithVolumes) } override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty