From 24b0453a14629f76755bdd28e15b69194be1b06d Mon Sep 17 00:00:00 2001 From: lifumao Date: Fri, 10 Jan 2025 22:56:20 +0800 Subject: [PATCH] [SPARK-50758][K8S]Mounts the krb5 config map on the executor pod --- .../apache/spark/deploy/k8s/Constants.scala | 1 + .../KerberosConfDriverFeatureStep.scala | 10 ++- .../KerberosConfExecutorFeatureStep.scala | 59 +++++++++++++ .../k8s/KubernetesExecutorBuilder.scala | 1 + .../KerberosConfDriverFeatureStepSuite.scala | 4 +- ...KerberosConfExecutorFeatureStepSuite.scala | 84 +++++++++++++++++++ 6 files changed, 154 insertions(+), 5 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStep.scala create mode 100644 resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStepSuite.scala diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala index aee07c096fe5..60207695c59f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala @@ -113,6 +113,7 @@ object Constants { val ENV_HADOOP_CONF_DIR = "HADOOP_CONF_DIR" val HADOOP_CONFIG_MAP_NAME = "spark.kubernetes.executor.hadoopConfigMapName" + val KRB_CONFIG_MAP_NAME = "spark.kubernetes.executor.krbConfigMapName" // Kerberos Configuration val KERBEROS_DT_SECRET_NAME = diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala index 89aefe47e46d..27ea73dd5710 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.k8s.features import java.io.File import java.nio.charset.StandardCharsets +import scala.collection.mutable import scala.jdk.CollectionConverters._ import com.google.common.io.Files @@ -209,14 +210,17 @@ private[spark] class KerberosConfDriverFeatureStep(kubernetesConf: KubernetesDri } override def getAdditionalPodSystemProperties(): Map[String, String] = { + val additionalProps = mutable.Map[String, String]() // If a submission-local keytab is provided, update the Spark config so that it knows the // path of the keytab in the driver container. if (needKeytabUpload) { val ktName = new File(keytab.get).getName() - Map(KEYTAB.key -> s"$KERBEROS_KEYTAB_MOUNT_POINT/$ktName") - } else { - Map.empty + additionalProps += (KEYTAB.key -> s"$KERBEROS_KEYTAB_MOUNT_POINT/$ktName") + } + if (hasKerberosConf) { + additionalProps += (KRB_CONFIG_MAP_NAME -> krb5CMap.getOrElse(newConfigMapName)) } + additionalProps.toMap } override def getAdditionalKubernetesResources(): Seq[HasMetadata] = { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStep.scala new file mode 100644 index 000000000000..16a047f31779 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStep.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import io.fabric8.kubernetes.api.model._ + +import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod} +import org.apache.spark.deploy.k8s.Constants._ + +/** + * Mounts the krb5 config map on the executor pod. + */ +private[spark] class KerberosConfExecutorFeatureStep(conf: KubernetesConf) + extends KubernetesFeatureConfigStep { + + private def krb5FileMapName: Option[String] = conf.getOption(KRB_CONFIG_MAP_NAME) + + override def configurePod(original: SparkPod): SparkPod = { + original.transform { case pod if krb5FileMapName.isDefined => + val configMapVolume = new VolumeBuilder() + .withName(KRB_FILE_VOLUME) + .withNewConfigMap() + .withName(krb5FileMapName.get) + .endConfigMap() + .build() + + val podWithVolume = new PodBuilder(pod.pod) + .editSpec() + .addNewVolumeLike(configMapVolume) + .endVolume() + .endSpec() + .build() + + val containerWithMount = new ContainerBuilder(pod.container) + .addNewVolumeMount() + .withName(KRB_FILE_VOLUME) + .withMountPath(KRB_FILE_DIR_PATH + "/krb5.conf") + .withSubPath("krb5.conf") + .endVolumeMount() + .build() + + SparkPod(podWithVolume, containerWithMount) + } + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala index a85e42662b89..64099cde5917 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala @@ -72,6 +72,7 @@ private[spark] class KubernetesExecutorBuilder { new EnvSecretsFeatureStep(conf), new MountVolumesFeatureStep(conf), new HadoopConfExecutorFeatureStep(conf), + new KerberosConfExecutorFeatureStep(conf), new LocalDirsFeatureStep(conf)) ++ userFeatures val spec = KubernetesExecutorSpec( diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStepSuite.scala index b172bdc06ddc..682f8cf90319 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStepSuite.scala @@ -49,7 +49,7 @@ class KerberosConfDriverFeatureStepSuite extends SparkFunSuite { new SparkConf(false).set(KUBERNETES_KERBEROS_KRB5_CONFIG_MAP, configMap)) checkPodForKrbConf(step.configurePod(SparkPod.initialPod()), configMap) - assert(step.getAdditionalPodSystemProperties().isEmpty) + assert(step.getAdditionalPodSystemProperties().contains(Constants.KRB_CONFIG_MAP_NAME)) assert(filter[ConfigMap](step.getAdditionalKubernetesResources()).isEmpty) } @@ -65,7 +65,7 @@ class KerberosConfDriverFeatureStepSuite extends SparkFunSuite { assert(confMap.getData().keySet().asScala === Set(krbConf.getName())) checkPodForKrbConf(step.configurePod(SparkPod.initialPod()), confMap.getMetadata().getName()) - assert(step.getAdditionalPodSystemProperties().isEmpty) + assert(step.getAdditionalPodSystemProperties().contains(Constants.KRB_CONFIG_MAP_NAME)) } test("create keytab secret if client keytab file used") { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStepSuite.scala new file mode 100644 index 000000000000..fdb1ab4534c7 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStepSuite.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.k8s.features + +import java.io.File +import java.nio.charset.StandardCharsets.UTF_8 + +import com.google.common.io.Files + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s._ +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.util.Utils + +class KerberosConfExecutorFeatureStepSuite extends SparkFunSuite { + import SecretVolumeUtils._ + + test("SPARK-50758: mounts the krb5 config map on the executor pod") { + val tmpDir = Utils.createTempDir() + val krbConf = File.createTempFile("krb5", ".conf", tmpDir) + Files.write("some data", krbConf, UTF_8) + + Seq( + new SparkConf(false) + .set(KUBERNETES_KERBEROS_KRB5_CONFIG_MAP, "testConfigMap"), + new SparkConf(false) + .set(KUBERNETES_KERBEROS_KRB5_FILE, krbConf.getAbsolutePath()), + new SparkConf(false)).foreach { sparkConf => + + val driverConf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf) + val driverStep = new KerberosConfDriverFeatureStep(driverConf) + + val additionalPodSystemProperties = driverStep.getAdditionalPodSystemProperties() + val executorSparkConf = new SparkConf(false) + if (hasKerberosConf(driverConf)) { + assert(additionalPodSystemProperties.contains(Constants.KRB_CONFIG_MAP_NAME)) + additionalPodSystemProperties.foreach { case (key, value) => + executorSparkConf.set(key, value) + } + } else { + assert(additionalPodSystemProperties.isEmpty) + } + + val executorConf = KubernetesTestConf.createExecutorConf(sparkConf = executorSparkConf) + val executorStep = new KerberosConfExecutorFeatureStep(executorConf) + val executorPod = executorStep.configurePod(SparkPod.initialPod()) + + checkPod(executorPod, hasKerberosConf(driverConf)) + } + } + + private def hasKerberosConf(conf: KubernetesConf): Boolean = { + val krb5File = conf.get(KUBERNETES_KERBEROS_KRB5_FILE) + val krb5CMap = conf.get(KUBERNETES_KERBEROS_KRB5_CONFIG_MAP) + krb5CMap.isDefined | krb5File.isDefined + } + + private def checkPod(pod: SparkPod, hasKerberosConf: Boolean): Unit = { + if (hasKerberosConf) { + assert(podHasVolume(pod.pod, KRB_FILE_VOLUME)) + assert(containerHasVolume(pod.container, KRB_FILE_VOLUME, KRB_FILE_DIR_PATH + "/krb5.conf")) + } else { + assert(!podHasVolume(pod.pod, KRB_FILE_VOLUME)) + assert(!containerHasVolume(pod.container, KRB_FILE_VOLUME, KRB_FILE_DIR_PATH + "/krb5.conf")) + } + } + +}