diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 4086970ffb256..96f2a0a88d467 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -162,10 +162,18 @@ private[spark] object Config extends Logging { val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label." val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation." val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets." + val KUBERNETES_DRIVER_VOLUMES_PREFIX = "spark.kubernetes.driver.volumes." val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label." val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation." val KUBERNETES_EXECUTOR_SECRETS_PREFIX = "spark.kubernetes.executor.secrets." + val KUBERNETES_EXECUTOR_VOLUMES_PREFIX = "spark.kubernetes.executor.volumes." + + val KUBERNETES_VOLUMES_HOSTPATH_KEY = "hostPath" + val KUBERNETES_VOLUMES_MOUNT_KEY = "mount" + val KUBERNETES_VOLUMES_PATH_KEY = "path" + val KUBERNETES_VOLUMES_READONLY_KEY = "readOnly" + val KUBERNETES_VOLUMES_OPTIONS_KEY = "options" val KUBERNETES_DRIVER_ENV_PREFIX = "spark.kubernetes.driverEnv." } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index ee629068ad90d..5bc070147d3a8 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -16,8 +16,6 @@ */ package org.apache.spark.deploy.k8s -import io.fabric8.kubernetes.api.model.LocalObjectReference - import org.apache.spark.SparkConf import org.apache.spark.util.Utils diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala new file mode 100644 index 0000000000000..afd6524aed548 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +import scala.collection.mutable.Map + +private[spark] case class KubernetesVolumeSpec( + var mountPath: Option[String], + var mountReadOnly: Option[Boolean], + var optionsSpec: Map[String, String]) + +private[spark] object KubernetesVolumeSpec { + def emptySpec(): KubernetesVolumeSpec = new KubernetesVolumeSpec(None, None, Map()) +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala new file mode 100644 index 0000000000000..ed05acd23f5a9 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +import scala.collection.mutable.HashMap + +import io.fabric8.kubernetes.api.model._ + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ + +private[spark] object KubernetesVolumeUtils { + + /** + * Given hostPath volume specs, add volume to pod and volume mount to container. + * + * @param pod original specification of the pod + * @param container original specification of the container + * @param sparkConf Spark configuration + * @param prefix the prefix for volume configuration + * @return a tuple of (pod with the volume(s) added, container with mount(s) added) + */ + def addVolumes( + pod: Pod, + container: Container, + sparkConf: SparkConf, + prefix : String): (Pod, Container) = { + val hostPathVolumeSpecs = parseHostPathVolumesWithPrefix(sparkConf, prefix) + addHostPathVolumes(pod, container, hostPathVolumeSpecs) + } + + /** + * Extract Spark volume configuration properties with a given name prefix. + * + * @param sparkConf Spark configuration + * @param prefix the given property name prefix + * @param volumeTypeKey the given property name prefix + * @return a Map storing with volume name as key and spec as value + */ + def parseVolumesWithPrefix( + sparkConf: SparkConf, + prefix: String, + volumeTypeKey: String): Map[String, KubernetesVolumeSpec] = { + val volumes = HashMap[String, KubernetesVolumeSpec]() + val properties = sparkConf.getAllWithPrefix(s"$prefix$volumeTypeKey.").toList + // Extract volume names + properties.foreach { + k => + val keys = k._1.split("\\.") + if (keys.nonEmpty && !volumes.contains(keys(0))) { + volumes.update(keys(0), KubernetesVolumeSpec.emptySpec()) + } + } + // Populate spec + volumes.foreach { + case (name, spec) => + properties.foreach { + k => + k._1.split("\\.") match { + case Array(`name`, KUBERNETES_VOLUMES_MOUNT_KEY, KUBERNETES_VOLUMES_PATH_KEY) => + spec.mountPath = Some(k._2) + case Array(`name`, KUBERNETES_VOLUMES_MOUNT_KEY, KUBERNETES_VOLUMES_READONLY_KEY) => + spec.mountReadOnly = Some(k._2.toBoolean) + case Array(`name`, KUBERNETES_VOLUMES_OPTIONS_KEY, option) => + spec.optionsSpec.update(option, k._2) + case _ => + None + } + } + } + volumes.toMap + } + + /** + * Extract Spark hostPath volume configuration properties with a given name prefix and + * return the result as a Map. + * + * @param sparkConf Spark configuration + * @param prefix the given property name prefix + * @return a Map storing with volume name as key and spec as value + */ + def parseHostPathVolumesWithPrefix( + sparkConf: SparkConf, + prefix: String): Map[String, KubernetesVolumeSpec] = { + parseVolumesWithPrefix(sparkConf, prefix, KUBERNETES_VOLUMES_HOSTPATH_KEY) + } + + /** + * Given hostPath volume specs, add volume to pod and volume mount to container. + * + * @param pod original specification of the pod + * @param container original specification of the container + * @param volumes list of named volume specs + * @return a tuple of (pod with the volume(s) added, container with mount(s) added) + */ + def addHostPathVolumes( + pod: Pod, + container: Container, + volumes: Map[String, KubernetesVolumeSpec]): (Pod, Container) = { + val podBuilder = new PodBuilder(pod).editOrNewSpec() + val containerBuilder = new ContainerBuilder(container) + volumes foreach { + case (name, spec) => + var hostPath: Option[String] = None + if (spec.optionsSpec.contains(KUBERNETES_VOLUMES_PATH_KEY)) { + hostPath = Some(spec.optionsSpec(KUBERNETES_VOLUMES_PATH_KEY)) + } + if (hostPath.isDefined && spec.mountPath.isDefined) { + podBuilder.addToVolumes(new VolumeBuilder() + .withHostPath(new HostPathVolumeSource(hostPath.get)) + .withName(name) + .build()) + val volumeBuilder = new VolumeMountBuilder() + .withMountPath(spec.mountPath.get) + .withName(name) + if (spec.mountReadOnly.isDefined) { + containerBuilder + .addToVolumeMounts(volumeBuilder + .withReadOnly(spec.mountReadOnly.get) + .build()) + } else { + containerBuilder.addToVolumeMounts(volumeBuilder.build()) + } + } + } + (podBuilder.endSpec().build(), containerBuilder.build()) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index 07bdccbe0479d..8f94f1620ead8 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -19,10 +19,10 @@ package org.apache.spark.deploy.k8s.features import scala.collection.JavaConverters._ import scala.collection.mutable -import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, HasMetadata, PodBuilder, QuantityBuilder} +import io.fabric8.kubernetes.api.model._ import org.apache.spark.SparkException -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesUtils, SparkPod} +import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.config._ @@ -109,7 +109,15 @@ private[spark] class BasicDriverFeatureStep( .addToImagePullSecrets(conf.imagePullSecrets(): _*) .endSpec() .build() - SparkPod(driverPod, driverContainer) + + val (driverPodWithVolumes, driverContainerVolumes) = + KubernetesVolumeUtils.addVolumes( + driverPod, + driverContainer, + conf.sparkConf, + KUBERNETES_DRIVER_VOLUMES_PREFIX) + + SparkPod(driverPodWithVolumes, driverContainerVolumes) } override def getAdditionalPodSystemProperties(): Map[String, String] = { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index d22097587aafe..f4db6cc91c6db 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -18,10 +18,10 @@ package org.apache.spark.deploy.k8s.features import scala.collection.JavaConverters._ -import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, HasMetadata, PodBuilder, QuantityBuilder} +import io.fabric8.kubernetes.api.model._ import org.apache.spark.SparkException -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, SparkPod} +import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.config.{EXECUTOR_CLASS_PATH, EXECUTOR_JAVA_OPTIONS, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD} @@ -170,7 +170,14 @@ private[spark] class BasicExecutorFeatureStep( .addToImagePullSecrets(kubernetesConf.imagePullSecrets(): _*) .endSpec() .build() - SparkPod(executorPod, containerWithLimitCores) + + val (executorPodWithVolumes, executorContainerWithVolumes) = + KubernetesVolumeUtils.addVolumes(executorPod, + containerWithLimitCores, + kubernetesConf.sparkConf, + KUBERNETES_EXECUTOR_VOLUMES_PREFIX) + + SparkPod(executorPodWithVolumes, executorContainerWithVolumes) } override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index eee85b8baa730..cd419ead9e199 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -150,4 +150,63 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { "spark.files" -> "https://localhost:9000/file1.txt,/opt/spark/file2.txt") assert(additionalProperties === expectedSparkConf) } + + test("single driver hostPath volume gets mounted") { + hostPathVolumeTest(1, false) + } + + test("multiple driver hostPath volumes get mounted") { + hostPathVolumeTest(2, false) + } + + test("single driver hostPath volume gets mounted w/ readOnly option") { + hostPathVolumeTest(1, true) + } + + test("multiple driver hostPath volumes get mounted w/ readOnly option") { + hostPathVolumeTest(2, true) + } + + private def hostPathVolumeTest(numVolumes: Int, readOnly: Boolean): Unit = { + val sparkConf = new SparkConf() + .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod") + .set(CONTAINER_IMAGE, "spark-driver:latest") + for (i <- 0 until numVolumes) { + sparkConf.set(s"spark.kubernetes.driver.volumes.hostPath.hostPath-$i.mount.path", + s"/opt/mount$i") + sparkConf.set(s"spark.kubernetes.driver.volumes.hostPath.hostPath-$i.options.path", + s"/tmp/mount$i") + if (readOnly) { + sparkConf.set(s"spark.kubernetes.driver.volumes.hostPath.hostPath-$i.mount.readOnly", + "true") + } + } + val kubernetesConf = KubernetesConf( + sparkConf, + KubernetesDriverSpecificConf( + None, + APP_NAME, + MAIN_CLASS, + APP_ARGS), + RESOURCE_NAME_PREFIX, + APP_ID, + DRIVER_LABELS, + DRIVER_ANNOTATIONS, + Map.empty, + Map.empty) + val step = new BasicDriverFeatureStep(kubernetesConf) + val driver = step.configurePod(SparkPod.initialPod()) + + assert(driver.container.getVolumeMounts.size() === numVolumes) + assert(driver.pod.getSpec.getVolumes.size() === numVolumes) + for (i <- 0 until numVolumes) { + assert(driver.container.getVolumeMounts.asScala + .exists(v => (v.getName == s"hostPath-$i" && v.getMountPath == s"/opt/mount$i"))) + assert(driver.pod.getSpec.getVolumes.asScala + .exists(v => (v.getName == s"hostPath-$i" && v.getHostPath.getPath == s"/tmp/mount$i"))) + if (readOnly) { + assert(driver.container.getVolumeMounts.get(i).getReadOnly == true) + } + } + } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index a764f7630b5c8..387b28483a699 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -152,6 +152,62 @@ class BasicExecutorFeatureStepSuite checkOwnerReferences(executor.pod, DRIVER_POD_UID) } + test("single executor hostPath volume gets mounted") { + hostPathVolumeTest(1, false) + } + + test("multiple executor hostPath volumes get mounted") { + hostPathVolumeTest(2, false) + } + + test("single executor hostPath volume gets mounted w/ readOnly option") { + hostPathVolumeTest(1, true) + } + + test("multiple executor hostPath volumes get mounted w/ readOnly option") { + hostPathVolumeTest(2, true) + } + + private def hostPathVolumeTest(numVolumes: Int, readOnly: Boolean): Unit = { + val conf = baseConf.clone() + for (i <- 0 until numVolumes) { + conf.set(s"spark.kubernetes.executor.volumes.hostPath.hostPath-$i.mount.path", + s"/opt/mount$i") + conf.set(s"spark.kubernetes.executor.volumes.hostPath.hostPath-$i.options.path", + s"/tmp/mount$i") + if (readOnly) { + conf.set(s"spark.kubernetes.executor.volumes.hostPath.hostPath-$i.mount.readOnly", + "true") + } + } + val step = new BasicExecutorFeatureStep( + KubernetesConf( + conf, + KubernetesExecutorSpecificConf("1", DRIVER_POD), + RESOURCE_NAME_PREFIX, + APP_ID, + LABELS, + ANNOTATIONS, + Map.empty, + Map.empty)) + val executor = step.configurePod(SparkPod.initialPod()) + + assert(executor.container.getImage === EXECUTOR_IMAGE) + assert(executor.container.getVolumeMounts.size() === numVolumes) + assert(executor.pod.getSpec.getVolumes.size() === numVolumes) + for (i <- 0 until numVolumes) { + assert(executor.container.getVolumeMounts.asScala + .exists(v => (v.getName == s"hostPath-$i" && v.getMountPath == s"/opt/mount$i"))) + assert(executor.pod.getSpec.getVolumes.asScala + .exists(v => (v.getName == s"hostPath-$i" && v.getHostPath.getPath == s"/tmp/mount$i"))) + if (readOnly) { + assert(executor.container.getVolumeMounts.get(i).getReadOnly == true) + } + } + + checkOwnerReferences(executor.pod, DRIVER_POD_UID) + } + // There is always exactly one controller reference, and it points to the driver pod. private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = { assert(executor.getMetadata.getOwnerReferences.size() === 1)