Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ object Constants {
val ENV_HADOOP_CONF_DIR = "HADOOP_CONF_DIR"
val HADOOP_CONFIG_MAP_NAME =
"spark.kubernetes.executor.hadoopConfigMapName"
val KRB_CONFIG_MAP_NAME = "spark.kubernetes.executor.krbConfigMapName"

// Kerberos Configuration
val KERBEROS_DT_SECRET_NAME =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.deploy.k8s.features
import java.io.File
import java.nio.charset.StandardCharsets

import scala.collection.mutable
import scala.jdk.CollectionConverters._

import com.google.common.io.Files
Expand Down Expand Up @@ -209,14 +210,17 @@ private[spark] class KerberosConfDriverFeatureStep(kubernetesConf: KubernetesDri
}

override def getAdditionalPodSystemProperties(): Map[String, String] = {
val additionalProps = mutable.Map[String, String]()
// If a submission-local keytab is provided, update the Spark config so that it knows the
// path of the keytab in the driver container.
if (needKeytabUpload) {
val ktName = new File(keytab.get).getName()
Map(KEYTAB.key -> s"$KERBEROS_KEYTAB_MOUNT_POINT/$ktName")
} else {
Map.empty
additionalProps += (KEYTAB.key -> s"$KERBEROS_KEYTAB_MOUNT_POINT/$ktName")
}
if (hasKerberosConf) {
additionalProps += (KRB_CONFIG_MAP_NAME -> krb5CMap.getOrElse(newConfigMapName))
}
additionalProps.toMap
}

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.features

import io.fabric8.kubernetes.api.model._

import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod}
import org.apache.spark.deploy.k8s.Constants._

/**
* Mounts the krb5 config map on the executor pod.
*/
private[spark] class KerberosConfExecutorFeatureStep(conf: KubernetesConf)
extends KubernetesFeatureConfigStep {

private def krb5FileMapName: Option[String] = conf.getOption(KRB_CONFIG_MAP_NAME)

override def configurePod(original: SparkPod): SparkPod = {
original.transform { case pod if krb5FileMapName.isDefined =>
val configMapVolume = new VolumeBuilder()
.withName(KRB_FILE_VOLUME)
.withNewConfigMap()
.withName(krb5FileMapName.get)
.endConfigMap()
.build()

val podWithVolume = new PodBuilder(pod.pod)
.editSpec()
.addNewVolumeLike(configMapVolume)
.endVolume()
.endSpec()
.build()

val containerWithMount = new ContainerBuilder(pod.container)
.addNewVolumeMount()
.withName(KRB_FILE_VOLUME)
.withMountPath(KRB_FILE_DIR_PATH + "/krb5.conf")
.withSubPath("krb5.conf")
.endVolumeMount()
.build()

SparkPod(podWithVolume, containerWithMount)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ private[spark] class KubernetesExecutorBuilder {
new EnvSecretsFeatureStep(conf),
new MountVolumesFeatureStep(conf),
new HadoopConfExecutorFeatureStep(conf),
new KerberosConfExecutorFeatureStep(conf),
new LocalDirsFeatureStep(conf)) ++ userFeatures

val spec = KubernetesExecutorSpec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class KerberosConfDriverFeatureStepSuite extends SparkFunSuite {
new SparkConf(false).set(KUBERNETES_KERBEROS_KRB5_CONFIG_MAP, configMap))

checkPodForKrbConf(step.configurePod(SparkPod.initialPod()), configMap)
assert(step.getAdditionalPodSystemProperties().isEmpty)
assert(step.getAdditionalPodSystemProperties().contains(Constants.KRB_CONFIG_MAP_NAME))
assert(filter[ConfigMap](step.getAdditionalKubernetesResources()).isEmpty)
}

Expand All @@ -65,7 +65,7 @@ class KerberosConfDriverFeatureStepSuite extends SparkFunSuite {
assert(confMap.getData().keySet().asScala === Set(krbConf.getName()))

checkPodForKrbConf(step.configurePod(SparkPod.initialPod()), confMap.getMetadata().getName())
assert(step.getAdditionalPodSystemProperties().isEmpty)
assert(step.getAdditionalPodSystemProperties().contains(Constants.KRB_CONFIG_MAP_NAME))
}

test("create keytab secret if client keytab file used") {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.k8s.features

import java.io.File
import java.nio.charset.StandardCharsets.UTF_8

import com.google.common.io.Files

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.util.Utils

class KerberosConfExecutorFeatureStepSuite extends SparkFunSuite {
import SecretVolumeUtils._

test("SPARK-50758: mounts the krb5 config map on the executor pod") {
val tmpDir = Utils.createTempDir()
val krbConf = File.createTempFile("krb5", ".conf", tmpDir)
Files.write("some data", krbConf, UTF_8)

Seq(
new SparkConf(false)
.set(KUBERNETES_KERBEROS_KRB5_CONFIG_MAP, "testConfigMap"),
new SparkConf(false)
.set(KUBERNETES_KERBEROS_KRB5_FILE, krbConf.getAbsolutePath()),
new SparkConf(false)).foreach { sparkConf =>

val driverConf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf)
val driverStep = new KerberosConfDriverFeatureStep(driverConf)

val additionalPodSystemProperties = driverStep.getAdditionalPodSystemProperties()
val executorSparkConf = new SparkConf(false)
if (hasKerberosConf(driverConf)) {
assert(additionalPodSystemProperties.contains(Constants.KRB_CONFIG_MAP_NAME))
additionalPodSystemProperties.foreach { case (key, value) =>
executorSparkConf.set(key, value)
}
} else {
assert(additionalPodSystemProperties.isEmpty)
}

val executorConf = KubernetesTestConf.createExecutorConf(sparkConf = executorSparkConf)
val executorStep = new KerberosConfExecutorFeatureStep(executorConf)
val executorPod = executorStep.configurePod(SparkPod.initialPod())

checkPod(executorPod, hasKerberosConf(driverConf))
}
}

private def hasKerberosConf(conf: KubernetesConf): Boolean = {
val krb5File = conf.get(KUBERNETES_KERBEROS_KRB5_FILE)
val krb5CMap = conf.get(KUBERNETES_KERBEROS_KRB5_CONFIG_MAP)
krb5CMap.isDefined | krb5File.isDefined
}

private def checkPod(pod: SparkPod, hasKerberosConf: Boolean): Unit = {
if (hasKerberosConf) {
assert(podHasVolume(pod.pod, KRB_FILE_VOLUME))
assert(containerHasVolume(pod.container, KRB_FILE_VOLUME, KRB_FILE_DIR_PATH + "/krb5.conf"))
} else {
assert(!podHasVolume(pod.pod, KRB_FILE_VOLUME))
assert(!containerHasVolume(pod.container, KRB_FILE_VOLUME, KRB_FILE_DIR_PATH + "/krb5.conf"))
}
}

}
Loading