diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index d5f2865f87281..64dc86fc850e7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -335,7 +335,7 @@ private[spark] class SparkSubmit extends Logging { val targetDir = Utils.createTempDir() // assure a keytab is available from any place in a JVM - if (clusterManager == YARN || clusterManager == LOCAL || isMesosClient) { + if (clusterManager == YARN || clusterManager == LOCAL || isMesosClient || isKubernetesCluster) { if (args.principal != null) { if (args.keytab != null) { require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist") @@ -646,7 +646,8 @@ private[spark] class SparkSubmit extends Logging { } } - if (clusterManager == MESOS && UserGroupInformation.isSecurityEnabled) { + if ((clusterManager == MESOS || clusterManager == KUBERNETES) + && UserGroupInformation.isSecurityEnabled) { setRMPrincipal(sparkConf) } @@ -762,8 +763,8 @@ private[spark] class SparkSubmit extends Logging { } // [SPARK-20328]. HadoopRDD calls into a Hadoop library that fetches delegation tokens with - // renewer set to the YARN ResourceManager. Since YARN isn't configured in Mesos mode, we - // must trick it into thinking we're YARN. + // renewer set to the YARN ResourceManager. Since YARN isn't configured in Mesos or Kubernetes + // mode, we must trick it into thinking we're YARN. private def setRMPrincipal(sparkConf: SparkConf): Unit = { val shortUserName = UserGroupInformation.getCurrentUser.getShortUserName val key = s"spark.hadoop.${YarnConfiguration.RM_PRINCIPAL}" diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 4ae7acaae2314..9dfb52634e21f 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -820,4 +820,45 @@ specific to Spark on Kubernetes. This sets the major Python version of the docker image used to run the driver and executor containers. Can either be 2 or 3. + + spark.kubernetes.kerberos.krb5.path + (none) + + Specify the local location of the krb5.conf file to be mounted on the driver and executors for Kerberos interaction. + It is important to note that the KDC defined needs to be visible from inside the containers. + + + + spark.kubernetes.kerberos.krb5.configMapName + (none) + + Specify the name of the ConfigMap, containing the krb5.conf file, to be mounted on the driver and executors + for Kerberos interaction. The KDC defined needs to be visible from inside the containers. The ConfigMap must also + be in the same namespace of the driver and executor pods. + + + + spark.kubernetes.hadoop.configMapName + (none) + + Specify the name of the ConfigMap, containing the HADOOP_CONF_DIR files, to be mounted on the driver + and executors for custom Hadoop configuration. + + + + spark.kubernetes.kerberos.tokenSecret.name + (none) + + Specify the name of the secret where your existing delegation tokens are stored. This removes the need for the job user + to provide any kerberos credentials for launching a job. + + + + spark.kubernetes.kerberos.tokenSecret.itemKey + (none) + + Specify the item key of the data where your existing delegation tokens are stored. This removes the need for the job user + to provide any kerberos credentials for launching a job. + + diff --git a/docs/security.md b/docs/security.md index 7fb3e17de94c9..ffae683df6256 100644 --- a/docs/security.md +++ b/docs/security.md @@ -722,7 +722,82 @@ with encryption, at least. The Kerberos login will be periodically renewed using the provided credentials, and new delegation tokens for supported will be created. +## Secure Interaction with Kubernetes + +When talking to Hadoop-based services behind Kerberos, it was noted that Spark needs to obtain delegation tokens +so that non-local processes can authenticate. These delegation tokens in Kubernetes are stored in Secrets that are +shared by the Driver and its Executors. As such, there are three ways of submitting a Kerberos job: + +In all cases you must define the environment variable: `HADOOP_CONF_DIR` or +`spark.kubernetes.hadoop.configMapName.` + +It also important to note that the KDC needs to be visible from inside the containers. + +If a user wishes to use a remote HADOOP_CONF directory, that contains the Hadoop configuration files, this could be +achieved by setting `spark.kubernetes.hadoop.configMapName` to a pre-existing ConfigMap. + +1. Submitting with a $kinit that stores a TGT in the Local Ticket Cache: +```bash +/usr/bin/kinit -kt / +/opt/spark/bin/spark-submit \ + --deploy-mode cluster \ + --class org.apache.spark.examples.HdfsTest \ + --master k8s:// \ + --conf spark.executor.instances=1 \ + --conf spark.app.name=spark-hdfs \ + --conf spark.kubernetes.container.image=spark:latest \ + --conf spark.kubernetes.kerberos.krb5.path=/etc/krb5.conf \ + local:///opt/spark/examples/jars/spark-examples_.jar \ + +``` +2. Submitting with a local Keytab and Principal +```bash +/opt/spark/bin/spark-submit \ + --deploy-mode cluster \ + --class org.apache.spark.examples.HdfsTest \ + --master k8s:// \ + --conf spark.executor.instances=1 \ + --conf spark.app.name=spark-hdfs \ + --conf spark.kubernetes.container.image=spark:latest \ + --conf spark.kerberos.keytab= \ + --conf spark.kerberos.principal= \ + --conf spark.kubernetes.kerberos.krb5.path=/etc/krb5.conf \ + local:///opt/spark/examples/jars/spark-examples_.jar \ + +``` +3. Submitting with pre-populated secrets, that contain the Delegation Token, already existing within the namespace +```bash +/opt/spark/bin/spark-submit \ + --deploy-mode cluster \ + --class org.apache.spark.examples.HdfsTest \ + --master k8s:// \ + --conf spark.executor.instances=1 \ + --conf spark.app.name=spark-hdfs \ + --conf spark.kubernetes.container.image=spark:latest \ + --conf spark.kubernetes.kerberos.tokenSecret.name= \ + --conf spark.kubernetes.kerberos.tokenSecret.itemKey= \ + --conf spark.kubernetes.kerberos.krb5.path=/etc/krb5.conf \ + local:///opt/spark/examples/jars/spark-examples_.jar \ + +``` + +3b. Submitting like in (3) however specifying a pre-created krb5 ConfigMap and pre-created `HADOOP_CONF_DIR` ConfigMap +```bash +/opt/spark/bin/spark-submit \ + --deploy-mode cluster \ + --class org.apache.spark.examples.HdfsTest \ + --master k8s:// \ + --conf spark.executor.instances=1 \ + --conf spark.app.name=spark-hdfs \ + --conf spark.kubernetes.container.image=spark:latest \ + --conf spark.kubernetes.kerberos.tokenSecret.name= \ + --conf spark.kubernetes.kerberos.tokenSecret.itemKey= \ + --conf spark.kubernetes.hadoop.configMapName= \ + --conf spark.kubernetes.kerberos.krb5.configMapName= \ + local:///opt/spark/examples/jars/spark-examples_.jar \ + +``` # Event Logging If your applications are using event logging, the directory where the event logs go diff --git a/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala b/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala index e1f985ece8c06..08af3306a96f1 100644 --- a/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala @@ -41,6 +41,8 @@ object HdfsTest { val end = System.currentTimeMillis() println(s"Iteration $iter took ${end-start} ms") } + println(s"File contents: ${file.map(_.toString).take(1).mkString(",").slice(0, 10)}") + println(s"Returned length(s) of: ${file.map(_.length).sum().toString}") spark.stop() } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 71e4d321a0e3a..c2ad80c4755a6 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -225,6 +225,43 @@ private[spark] object Config extends Logging { "Ensure that major Python version is either Python2 or Python3") .createWithDefault("2") + val KUBERNETES_KERBEROS_KRB5_FILE = + ConfigBuilder("spark.kubernetes.kerberos.krb5.path") + .doc("Specify the local location of the krb5.conf file to be mounted on the driver " + + "and executors for Kerberos. Note: The KDC defined needs to be " + + "visible from inside the containers ") + .stringConf + .createOptional + + val KUBERNETES_KERBEROS_KRB5_CONFIG_MAP = + ConfigBuilder("spark.kubernetes.kerberos.krb5.configMapName") + .doc("Specify the name of the ConfigMap, containing the krb5.conf file, to be mounted " + + "on the driver and executors for Kerberos. Note: The KDC defined" + + "needs to be visible from inside the containers ") + .stringConf + .createOptional + + val KUBERNETES_HADOOP_CONF_CONFIG_MAP = + ConfigBuilder("spark.kubernetes.hadoop.configMapName") + .doc("Specify the name of the ConfigMap, containing the HADOOP_CONF_DIR files, " + + "to be mounted on the driver and executors for custom Hadoop configuration.") + .stringConf + .createOptional + + val KUBERNETES_KERBEROS_DT_SECRET_NAME = + ConfigBuilder("spark.kubernetes.kerberos.tokenSecret.name") + .doc("Specify the name of the secret where your existing delegation tokens are stored. " + + "This removes the need for the job user to provide any keytab for launching a job") + .stringConf + .createOptional + + val KUBERNETES_KERBEROS_DT_SECRET_ITEM_KEY = + ConfigBuilder("spark.kubernetes.kerberos.tokenSecret.itemKey") + .doc("Specify the item key of the data where your existing delegation tokens are stored. " + + "This removes the need for the job user to provide any keytab for launching a job") + .stringConf + .createOptional + val APP_RESOURCE_TYPE = ConfigBuilder("spark.kubernetes.resource.type") .doc("This sets the resource type internally") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala index 8202d874a4626..172a9054bb4f2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala @@ -60,11 +60,13 @@ private[spark] object Constants { val ENV_CLASSPATH = "SPARK_CLASSPATH" val ENV_DRIVER_BIND_ADDRESS = "SPARK_DRIVER_BIND_ADDRESS" val ENV_SPARK_CONF_DIR = "SPARK_CONF_DIR" + val ENV_SPARK_USER = "SPARK_USER" // Spark app configs for containers val SPARK_CONF_VOLUME = "spark-conf-volume" val SPARK_CONF_DIR_INTERNAL = "/opt/spark/conf" val SPARK_CONF_FILE_NAME = "spark.properties" val SPARK_CONF_PATH = s"$SPARK_CONF_DIR_INTERNAL/$SPARK_CONF_FILE_NAME" + val ENV_HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION" // BINDINGS val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY" @@ -78,4 +80,29 @@ private[spark] object Constants { val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc" val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver" val MEMORY_OVERHEAD_MIN_MIB = 384L + + // Hadoop Configuration + val HADOOP_FILE_VOLUME = "hadoop-properties" + val KRB_FILE_VOLUME = "krb5-file" + val HADOOP_CONF_DIR_PATH = "/opt/hadoop/conf" + val KRB_FILE_DIR_PATH = "/etc" + val ENV_HADOOP_CONF_DIR = "HADOOP_CONF_DIR" + val HADOOP_CONFIG_MAP_NAME = + "spark.kubernetes.executor.hadoopConfigMapName" + val KRB5_CONFIG_MAP_NAME = + "spark.kubernetes.executor.krb5ConfigMapName" + + // Kerberos Configuration + val KERBEROS_DELEGEGATION_TOKEN_SECRET_NAME = "delegation-tokens" + val KERBEROS_DT_SECRET_NAME = + "spark.kubernetes.kerberos.dt-secret-name" + val KERBEROS_DT_SECRET_KEY = + "spark.kubernetes.kerberos.dt-secret-key" + val KERBEROS_SPARK_USER_NAME = + "spark.kubernetes.kerberos.spark-user-name" + val KERBEROS_SECRET_KEY = "hadoop-tokens" + + // Hadoop credentials secrets for the Spark app. + val SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR = "/mnt/secrets/hadoop-credentials" + val SPARK_APP_HADOOP_SECRET_VOLUME_NAME = "hadoop-secret" } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index cae6e7d5ad518..3e30ab2c8353e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -19,12 +19,15 @@ package org.apache.spark.deploy.k8s import scala.collection.mutable import io.fabric8.kubernetes.api.model.{LocalObjectReference, LocalObjectReferenceBuilder, Pod} +import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkConf import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.security.KubernetesHadoopDelegationTokenManager import org.apache.spark.deploy.k8s.submit._ import org.apache.spark.deploy.k8s.submit.KubernetesClientApplication._ +import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.internal.config.ConfigEntry @@ -47,6 +50,13 @@ private[spark] case class KubernetesExecutorSpecificConf( driverPod: Option[Pod]) extends KubernetesRoleSpecificConf +/* + * Structure containing metadata for HADOOP_CONF_DIR customization + */ +private[spark] case class HadoopConfSpec( + hadoopConfDir: Option[String], + hadoopConfigMapName: Option[String]) + /** * Structure containing metadata for Kubernetes logic to build Spark pods. */ @@ -61,7 +71,15 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf]( roleSecretEnvNamesToKeyRefs: Map[String, String], roleEnvs: Map[String, String], roleVolumes: Iterable[KubernetesVolumeSpec[_ <: KubernetesVolumeSpecificConf]], - sparkFiles: Seq[String]) { + sparkFiles: Seq[String], + hadoopConfSpec: Option[HadoopConfSpec]) { + + def hadoopConfigMapName: String = s"$appResourceNamePrefix-hadoop-config" + + def krbConfigMapName: String = s"$appResourceNamePrefix-krb5-file" + + def tokenManager(conf: SparkConf, hConf: Configuration): KubernetesHadoopDelegationTokenManager = + new KubernetesHadoopDelegationTokenManager(new HadoopDelegationTokenManager(conf, hConf)) def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE) @@ -116,7 +134,8 @@ private[spark] object KubernetesConf { mainAppResource: Option[MainAppResource], mainClass: String, appArgs: Array[String], - maybePyFiles: Option[String]): KubernetesConf[KubernetesDriverSpecificConf] = { + maybePyFiles: Option[String], + hadoopConfDir: Option[String]): KubernetesConf[KubernetesDriverSpecificConf] = { val sparkConfWithMainAppJar = sparkConf.clone() val additionalFiles = mutable.ArrayBuffer.empty[String] mainAppResource.foreach { @@ -175,6 +194,19 @@ private[spark] object KubernetesConf { .map(str => str.split(",").toSeq) .getOrElse(Seq.empty[String]) ++ additionalFiles + val hadoopConfigMapName = sparkConf.get(KUBERNETES_HADOOP_CONF_CONFIG_MAP) + KubernetesUtils.requireNandDefined( + hadoopConfDir, + hadoopConfigMapName, + "Do not specify both the `HADOOP_CONF_DIR` in your ENV and the ConfigMap " + + "as the creation of an additional ConfigMap, when one is already specified is extraneous" ) + val hadoopConfSpec = + if (hadoopConfDir.isDefined || hadoopConfigMapName.isDefined) { + Some(HadoopConfSpec(hadoopConfDir, hadoopConfigMapName)) + } else { + None + } + KubernetesConf( sparkConfWithMainAppJar, KubernetesDriverSpecificConf(mainAppResource, mainClass, appName, appArgs), @@ -186,7 +218,8 @@ private[spark] object KubernetesConf { driverSecretEnvNamesToKeyRefs, driverEnvs, driverVolumes, - sparkFiles) + sparkFiles, + hadoopConfSpec) } def createExecutorConf( @@ -242,6 +275,7 @@ private[spark] object KubernetesConf { executorEnvSecrets, executorEnv, executorVolumes, - Seq.empty[String]) + Seq.empty[String], + None) } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index f5fae7cc8c470..8f36fa12aed17 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -39,8 +39,27 @@ private[spark] object KubernetesUtils { sparkConf.getAllWithPrefix(prefix).toMap } + def requireBothOrNeitherDefined( + opt1: Option[_], + opt2: Option[_], + errMessageWhenFirstIsMissing: String, + errMessageWhenSecondIsMissing: String): Unit = { + requireSecondIfFirstIsDefined(opt1, opt2, errMessageWhenSecondIsMissing) + requireSecondIfFirstIsDefined(opt2, opt1, errMessageWhenFirstIsMissing) + } + + def requireSecondIfFirstIsDefined( + opt1: Option[_], + opt2: Option[_], + errMessageWhenSecondIsMissing: String): Unit = { + opt1.foreach { _ => + require(opt2.isDefined, errMessageWhenSecondIsMissing) + } + } + def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = { opt1.foreach { _ => require(opt2.isEmpty, errMessage) } + opt2.foreach { _ => require(opt1.isEmpty, errMessage) } } /** diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala new file mode 100644 index 0000000000000..fd09de2a918a1 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import io.fabric8.kubernetes.api.model.HasMetadata + +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, SparkPod} +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.features.hadooputils.HadoopBootstrapUtil +import org.apache.spark.internal.Logging + +/** + * This step is responsible for bootstraping the container with ConfigMaps + * containing Hadoop config files mounted as volumes and an ENV variable + * pointed to the mounted file directory. + */ +private[spark] class HadoopConfExecutorFeatureStep( + kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf]) + extends KubernetesFeatureConfigStep with Logging { + + override def configurePod(pod: SparkPod): SparkPod = { + val sparkConf = kubernetesConf.sparkConf + val hadoopConfDirCMapName = sparkConf.getOption(HADOOP_CONFIG_MAP_NAME) + require(hadoopConfDirCMapName.isDefined, + "Ensure that the env `HADOOP_CONF_DIR` is defined either in the client or " + + " using pre-existing ConfigMaps") + logInfo("HADOOP_CONF_DIR defined") + HadoopBootstrapUtil.bootstrapHadoopConfDir(None, None, hadoopConfDirCMapName, pod) + } + + override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty + + override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopSparkUserExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopSparkUserExecutorFeatureStep.scala new file mode 100644 index 0000000000000..5b6a6d5a7db45 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopSparkUserExecutorFeatureStep.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import io.fabric8.kubernetes.api.model.HasMetadata + +import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod} +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesExecutorSpecificConf +import org.apache.spark.deploy.k8s.features.hadooputils.HadoopBootstrapUtil +import org.apache.spark.internal.Logging + +/** + * This step is responsible for setting ENV_SPARK_USER when HADOOP_FILES are detected + * however, this step would not be run if Kerberos is enabled, as Kerberos sets SPARK_USER + */ +private[spark] class HadoopSparkUserExecutorFeatureStep( + kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf]) + extends KubernetesFeatureConfigStep with Logging { + + override def configurePod(pod: SparkPod): SparkPod = { + val sparkUserName = kubernetesConf.sparkConf.get(KERBEROS_SPARK_USER_NAME) + HadoopBootstrapUtil.bootstrapSparkUserPod(sparkUserName, pod) + } + + override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty + + override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala new file mode 100644 index 0000000000000..ce47933b7f700 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import io.fabric8.kubernetes.api.model.HasMetadata + +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils, SparkPod} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesDriverSpecificConf +import org.apache.spark.deploy.k8s.features.hadooputils._ +import org.apache.spark.internal.Logging + +/** + * Runs the necessary Hadoop-based logic based on Kerberos configs and the presence of the + * HADOOP_CONF_DIR. This runs various bootstrap methods defined in HadoopBootstrapUtil. + */ +private[spark] class KerberosConfDriverFeatureStep( + kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]) + extends KubernetesFeatureConfigStep with Logging { + + require(kubernetesConf.hadoopConfSpec.isDefined, + "Ensure that HADOOP_CONF_DIR is defined either via env or a pre-defined ConfigMap") + private val hadoopConfDirSpec = kubernetesConf.hadoopConfSpec.get + private val conf = kubernetesConf.sparkConf + private val principal = conf.get(org.apache.spark.internal.config.PRINCIPAL) + private val keytab = conf.get(org.apache.spark.internal.config.KEYTAB) + private val existingSecretName = conf.get(KUBERNETES_KERBEROS_DT_SECRET_NAME) + private val existingSecretItemKey = conf.get(KUBERNETES_KERBEROS_DT_SECRET_ITEM_KEY) + private val krb5File = conf.get(KUBERNETES_KERBEROS_KRB5_FILE) + private val krb5CMap = conf.get(KUBERNETES_KERBEROS_KRB5_CONFIG_MAP) + private val kubeTokenManager = kubernetesConf.tokenManager(conf, + SparkHadoopUtil.get.newConfiguration(conf)) + private val isKerberosEnabled = + (hadoopConfDirSpec.hadoopConfDir.isDefined && kubeTokenManager.isSecurityEnabled) || + (hadoopConfDirSpec.hadoopConfigMapName.isDefined && + (krb5File.isDefined || krb5CMap.isDefined)) + require(keytab.isEmpty || isKerberosEnabled, + "You must enable Kerberos support if you are specifying a Kerberos Keytab") + + require(existingSecretName.isEmpty || isKerberosEnabled, + "You must enable Kerberos support if you are specifying a Kerberos Secret") + + KubernetesUtils.requireNandDefined( + krb5File, + krb5CMap, + "Do not specify both a Krb5 local file and the ConfigMap as the creation " + + "of an additional ConfigMap, when one is already specified, is extraneous") + + KubernetesUtils.requireBothOrNeitherDefined( + keytab, + principal, + "If a Kerberos principal is specified you must also specify a Kerberos keytab", + "If a Kerberos keytab is specified you must also specify a Kerberos principal") + + KubernetesUtils.requireBothOrNeitherDefined( + existingSecretName, + existingSecretItemKey, + "If a secret data item-key where the data of the Kerberos Delegation Token is specified" + + " you must also specify the name of the secret", + "If a secret storing a Kerberos Delegation Token is specified you must also" + + " specify the item-key where the data is stored") + + private val hadoopConfigurationFiles = hadoopConfDirSpec.hadoopConfDir.map { hConfDir => + HadoopBootstrapUtil.getHadoopConfFiles(hConfDir) + } + private val newHadoopConfigMapName = + if (hadoopConfDirSpec.hadoopConfigMapName.isEmpty) { + Some(kubernetesConf.hadoopConfigMapName) + } else { + None + } + + // Either use pre-existing secret or login to create new Secret with DT stored within + private val kerberosConfSpec: Option[KerberosConfigSpec] = (for { + secretName <- existingSecretName + secretItemKey <- existingSecretItemKey + } yield { + KerberosConfigSpec( + dtSecret = None, + dtSecretName = secretName, + dtSecretItemKey = secretItemKey, + jobUserName = kubeTokenManager.getCurrentUser.getShortUserName) + }).orElse( + if (isKerberosEnabled) { + Some(HadoopKerberosLogin.buildSpec( + conf, + kubernetesConf.appResourceNamePrefix, + kubeTokenManager)) + } else { + None + } + ) + + override def configurePod(pod: SparkPod): SparkPod = { + val hadoopBasedSparkPod = HadoopBootstrapUtil.bootstrapHadoopConfDir( + hadoopConfDirSpec.hadoopConfDir, + newHadoopConfigMapName, + hadoopConfDirSpec.hadoopConfigMapName, + pod) + kerberosConfSpec.map { hSpec => + HadoopBootstrapUtil.bootstrapKerberosPod( + hSpec.dtSecretName, + hSpec.dtSecretItemKey, + hSpec.jobUserName, + krb5File, + Some(kubernetesConf.krbConfigMapName), + krb5CMap, + hadoopBasedSparkPod) + }.getOrElse( + HadoopBootstrapUtil.bootstrapSparkUserPod( + kubeTokenManager.getCurrentUser.getShortUserName, + hadoopBasedSparkPod)) + } + + override def getAdditionalPodSystemProperties(): Map[String, String] = { + val resolvedConfValues = kerberosConfSpec.map { hSpec => + Map(KERBEROS_DT_SECRET_NAME -> hSpec.dtSecretName, + KERBEROS_DT_SECRET_KEY -> hSpec.dtSecretItemKey, + KERBEROS_SPARK_USER_NAME -> hSpec.jobUserName, + KRB5_CONFIG_MAP_NAME -> krb5CMap.getOrElse(kubernetesConf.krbConfigMapName)) + }.getOrElse( + Map(KERBEROS_SPARK_USER_NAME -> + kubeTokenManager.getCurrentUser.getShortUserName)) + Map(HADOOP_CONFIG_MAP_NAME -> + hadoopConfDirSpec.hadoopConfigMapName.getOrElse( + kubernetesConf.hadoopConfigMapName)) ++ resolvedConfValues + } + + override def getAdditionalKubernetesResources(): Seq[HasMetadata] = { + val hadoopConfConfigMap = for { + hName <- newHadoopConfigMapName + hFiles <- hadoopConfigurationFiles + } yield { + HadoopBootstrapUtil.buildHadoopConfigMap(hName, hFiles) + } + + val krb5ConfigMap = krb5File.map { fileLocation => + HadoopBootstrapUtil.buildkrb5ConfigMap( + kubernetesConf.krbConfigMapName, + fileLocation) + } + + val kerberosDTSecret = kerberosConfSpec.flatMap(_.dtSecret) + + hadoopConfConfigMap.toSeq ++ + krb5ConfigMap.toSeq ++ + kerberosDTSecret.toSeq + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStep.scala new file mode 100644 index 0000000000000..06a88b6c229f7 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStep.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import io.fabric8.kubernetes.api.model.HasMetadata + +import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod} +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesExecutorSpecificConf +import org.apache.spark.deploy.k8s.features.hadooputils.HadoopBootstrapUtil +import org.apache.spark.internal.Logging + +/** + * This step is responsible for mounting the DT secret for the executors + */ +private[spark] class KerberosConfExecutorFeatureStep( + kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf]) + extends KubernetesFeatureConfigStep with Logging { + + private val sparkConf = kubernetesConf.sparkConf + private val maybeKrb5CMap = sparkConf.getOption(KRB5_CONFIG_MAP_NAME) + require(maybeKrb5CMap.isDefined, "HADOOP_CONF_DIR ConfigMap not found") + + override def configurePod(pod: SparkPod): SparkPod = { + logInfo(s"Mounting Resources for Kerberos") + HadoopBootstrapUtil.bootstrapKerberosPod( + sparkConf.get(KERBEROS_DT_SECRET_NAME), + sparkConf.get(KERBEROS_DT_SECRET_KEY), + sparkConf.get(KERBEROS_SPARK_USER_NAME), + None, + None, + maybeKrb5CMap, + pod) + } + + override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty + + override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty[HasMetadata] +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopBootstrapUtil.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopBootstrapUtil.scala new file mode 100644 index 0000000000000..5bee766caf2be --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopBootstrapUtil.scala @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features.hadooputils + +import java.io.File +import java.nio.charset.StandardCharsets + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files +import io.fabric8.kubernetes.api.model._ + +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.SparkPod +import org.apache.spark.internal.Logging + +private[spark] object HadoopBootstrapUtil extends Logging { + + /** + * Mounting the DT secret for both the Driver and the executors + * + * @param dtSecretName Name of the secret that stores the Delegation Token + * @param dtSecretItemKey Name of the Item Key storing the Delegation Token + * @param userName Name of the SparkUser to set SPARK_USER + * @param fileLocation Optional Location of the krb5 file + * @param newKrb5ConfName Optional location of the ConfigMap for Krb5 + * @param existingKrb5ConfName Optional name of ConfigMap for Krb5 + * @param pod Input pod to be appended to + * @return a modified SparkPod + */ + def bootstrapKerberosPod( + dtSecretName: String, + dtSecretItemKey: String, + userName: String, + fileLocation: Option[String], + newKrb5ConfName: Option[String], + existingKrb5ConfName: Option[String], + pod: SparkPod): SparkPod = { + + val preConfigMapVolume = existingKrb5ConfName.map { kconf => + new VolumeBuilder() + .withName(KRB_FILE_VOLUME) + .withNewConfigMap() + .withName(kconf) + .endConfigMap() + .build() + } + + val createConfigMapVolume = for { + fLocation <- fileLocation + krb5ConfName <- newKrb5ConfName + } yield { + val krb5File = new File(fLocation) + val fileStringPath = krb5File.toPath.getFileName.toString + new VolumeBuilder() + .withName(KRB_FILE_VOLUME) + .withNewConfigMap() + .withName(krb5ConfName) + .withItems(new KeyToPathBuilder() + .withKey(fileStringPath) + .withPath(fileStringPath) + .build()) + .endConfigMap() + .build() + } + + // Breaking up Volume creation for clarity + val configMapVolume = preConfigMapVolume.orElse(createConfigMapVolume) + if (configMapVolume.isEmpty) { + logInfo("You have not specified a krb5.conf file locally or via a ConfigMap. " + + "Make sure that you have the krb5.conf locally on the Driver and Executor images") + } + + val kerberizedPodWithDTSecret = new PodBuilder(pod.pod) + .editOrNewSpec() + .addNewVolume() + .withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME) + .withNewSecret() + .withSecretName(dtSecretName) + .endSecret() + .endVolume() + .endSpec() + .build() + + // Optionally add the krb5.conf ConfigMap + val kerberizedPod = configMapVolume.map { cmVolume => + new PodBuilder(kerberizedPodWithDTSecret) + .editSpec() + .addNewVolumeLike(cmVolume) + .endVolume() + .endSpec() + .build() + }.getOrElse(kerberizedPodWithDTSecret) + + val kerberizedContainerWithMounts = new ContainerBuilder(pod.container) + .addNewVolumeMount() + .withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME) + .withMountPath(SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR) + .endVolumeMount() + .addNewEnv() + .withName(ENV_HADOOP_TOKEN_FILE_LOCATION) + .withValue(s"$SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR/$dtSecretItemKey") + .endEnv() + .addNewEnv() + .withName(ENV_SPARK_USER) + .withValue(userName) + .endEnv() + .build() + + // Optionally add the krb5.conf Volume Mount + val kerberizedContainer = + if (configMapVolume.isDefined) { + new ContainerBuilder(kerberizedContainerWithMounts) + .addNewVolumeMount() + .withName(KRB_FILE_VOLUME) + .withMountPath(KRB_FILE_DIR_PATH + "/krb5.conf") + .withSubPath("krb5.conf") + .endVolumeMount() + .build() + } else { + kerberizedContainerWithMounts + } + + SparkPod(kerberizedPod, kerberizedContainer) + } + + /** + * setting ENV_SPARK_USER when HADOOP_FILES are detected + * + * @param sparkUserName Name of the SPARK_USER + * @param pod Input pod to be appended to + * @return a modified SparkPod + */ + def bootstrapSparkUserPod(sparkUserName: String, pod: SparkPod): SparkPod = { + val envModifiedContainer = new ContainerBuilder(pod.container) + .addNewEnv() + .withName(ENV_SPARK_USER) + .withValue(sparkUserName) + .endEnv() + .build() + SparkPod(pod.pod, envModifiedContainer) + } + + /** + * Grabbing files in the HADOOP_CONF_DIR + * + * @param path location of HADOOP_CONF_DIR + * @return a list of File object + */ + def getHadoopConfFiles(path: String): Seq[File] = { + val dir = new File(path) + if (dir.isDirectory) { + dir.listFiles.filter(_.isFile).toSeq + } else { + Seq.empty[File] + } + } + + /** + * Bootstraping the container with ConfigMaps that store + * Hadoop configuration files + * + * @param hadoopConfDir directory location of HADOOP_CONF_DIR env + * @param newHadoopConfigMapName name of the new configMap for HADOOP_CONF_DIR + * @param existingHadoopConfigMapName name of the pre-defined configMap for HADOOP_CONF_DIR + * @param pod Input pod to be appended to + * @return a modified SparkPod + */ + def bootstrapHadoopConfDir( + hadoopConfDir: Option[String], + newHadoopConfigMapName: Option[String], + existingHadoopConfigMapName: Option[String], + pod: SparkPod): SparkPod = { + val preConfigMapVolume = existingHadoopConfigMapName.map { hConf => + new VolumeBuilder() + .withName(HADOOP_FILE_VOLUME) + .withNewConfigMap() + .withName(hConf) + .endConfigMap() + .build() } + + val createConfigMapVolume = for { + dirLocation <- hadoopConfDir + hConfName <- newHadoopConfigMapName + } yield { + val hadoopConfigFiles = getHadoopConfFiles(dirLocation) + val keyPaths = hadoopConfigFiles.map { file => + val fileStringPath = file.toPath.getFileName.toString + new KeyToPathBuilder() + .withKey(fileStringPath) + .withPath(fileStringPath) + .build() + } + new VolumeBuilder() + .withName(HADOOP_FILE_VOLUME) + .withNewConfigMap() + .withName(hConfName) + .withItems(keyPaths.asJava) + .endConfigMap() + .build() + } + + // Breaking up Volume Creation for clarity + val configMapVolume = preConfigMapVolume.getOrElse(createConfigMapVolume.get) + + val hadoopSupportedPod = new PodBuilder(pod.pod) + .editSpec() + .addNewVolumeLike(configMapVolume) + .endVolume() + .endSpec() + .build() + + val hadoopSupportedContainer = new ContainerBuilder(pod.container) + .addNewVolumeMount() + .withName(HADOOP_FILE_VOLUME) + .withMountPath(HADOOP_CONF_DIR_PATH) + .endVolumeMount() + .addNewEnv() + .withName(ENV_HADOOP_CONF_DIR) + .withValue(HADOOP_CONF_DIR_PATH) + .endEnv() + .build() + SparkPod(hadoopSupportedPod, hadoopSupportedContainer) + } + + /** + * Builds ConfigMap given the file location of the + * krb5.conf file + * + * @param configMapName name of configMap for krb5 + * @param fileLocation location of krb5 file + * @return a ConfigMap + */ + def buildkrb5ConfigMap( + configMapName: String, + fileLocation: String): ConfigMap = { + val file = new File(fileLocation) + new ConfigMapBuilder() + .withNewMetadata() + .withName(configMapName) + .endMetadata() + .addToData(Map(file.toPath.getFileName.toString -> + Files.toString(file, StandardCharsets.UTF_8)).asJava) + .build() + } + + /** + * Builds ConfigMap given the ConfigMap name + * and a list of Hadoop Conf files + * + * @param hadoopConfigMapName name of hadoopConfigMap + * @param hadoopConfFiles list of hadoopFiles + * @return a ConfigMap + */ + def buildHadoopConfigMap( + hadoopConfigMapName: String, + hadoopConfFiles: Seq[File]): ConfigMap = { + new ConfigMapBuilder() + .withNewMetadata() + .withName(hadoopConfigMapName) + .endMetadata() + .addToData(hadoopConfFiles.map { file => + (file.toPath.getFileName.toString, + Files.toString(file, StandardCharsets.UTF_8)) + }.toMap.asJava) + .build() + } + +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopKerberosLogin.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopKerberosLogin.scala new file mode 100644 index 0000000000000..67a58491e442e --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopKerberosLogin.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features.hadooputils + +import io.fabric8.kubernetes.api.model.SecretBuilder +import org.apache.commons.codec.binary.Base64 + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.security.KubernetesHadoopDelegationTokenManager + +/** + * This logic does all the heavy lifting for Delegation Token creation. This step + * assumes that the job user has either specified a principal and keytab or ran + * $kinit before running spark-submit. By running UGI.getCurrentUser we are able + * to obtain the current user, either signed in via $kinit or keytab. With the + * Job User principal you then retrieve the delegation token from the NameNode + * and store values in DelegationToken. Lastly, the class puts the data into + * a secret. All this is defined in a KerberosConfigSpec. + */ +private[spark] object HadoopKerberosLogin { + def buildSpec( + submissionSparkConf: SparkConf, + kubernetesResourceNamePrefix: String, + tokenManager: KubernetesHadoopDelegationTokenManager): KerberosConfigSpec = { + val hadoopConf = SparkHadoopUtil.get.newConfiguration(submissionSparkConf) + // The JobUserUGI will be taken fom the Local Ticket Cache or via keytab+principal + // The login happens in the SparkSubmit so login logic is not necessary to include + val jobUserUGI = tokenManager.getCurrentUser + val originalCredentials = jobUserUGI.getCredentials + val (tokenData, renewalInterval) = tokenManager.getDelegationTokens( + originalCredentials, + submissionSparkConf, + hadoopConf) + require(tokenData.nonEmpty, "Did not obtain any delegation tokens") + val initialTokenDataKeyName = KERBEROS_SECRET_KEY + val newSecretName = s"$kubernetesResourceNamePrefix-$KERBEROS_DELEGEGATION_TOKEN_SECRET_NAME" + val secretDT = + new SecretBuilder() + .withNewMetadata() + .withName(newSecretName) + .endMetadata() + .addToData(initialTokenDataKeyName, Base64.encodeBase64String(tokenData)) + .build() + KerberosConfigSpec( + dtSecret = Some(secretDT), + dtSecretName = newSecretName, + dtSecretItemKey = initialTokenDataKeyName, + jobUserName = jobUserUGI.getShortUserName) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/KerberosConfigSpec.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/KerberosConfigSpec.scala new file mode 100644 index 0000000000000..7f7ef216cf485 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/KerberosConfigSpec.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features.hadooputils + +import io.fabric8.kubernetes.api.model.Secret + +/** + * Represents a given configuration of the Kerberos Configuration logic + *

+ * - The secret containing a DT, either previously specified or built on the fly + * - The name of the secret where the DT will be stored + * - The data item-key on the secret which correlates with where the current DT data is stored + * - The Job User's username + */ +private[spark] case class KerberosConfigSpec( + dtSecret: Option[Secret], + dtSecretName: String, + dtSecretItemKey: String, + jobUserName: String) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala new file mode 100644 index 0000000000000..135e2c482bbbc --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.k8s.security + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.security.{Credentials, UserGroupInformation} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.Logging + +/** + * The KubernetesHadoopDelegationTokenManager fetches Hadoop delegation tokens + * on the behalf of the Kubernetes submission client. The new credentials + * (called Tokens when they are serialized) are stored in Secrets accessible + * to the driver and executors, when new Tokens are received they overwrite the current Secrets. + */ +private[spark] class KubernetesHadoopDelegationTokenManager( + tokenManager: HadoopDelegationTokenManager) extends Logging { + + // HadoopUGI Util methods + def getCurrentUser: UserGroupInformation = UserGroupInformation.getCurrentUser + def getShortUserName: String = getCurrentUser.getShortUserName + def getFileSystem(hadoopConf: Configuration): FileSystem = FileSystem.get(hadoopConf) + def isSecurityEnabled: Boolean = UserGroupInformation.isSecurityEnabled + def loginUserFromKeytabAndReturnUGI(principal: String, keytab: String): UserGroupInformation = + UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab) + def serializeCreds(creds: Credentials): Array[Byte] = SparkHadoopUtil.get.serialize(creds) + def nextRT(rt: Long, conf: SparkConf): Long = SparkHadoopUtil.nextCredentialRenewalTime(rt, conf) + + def getDelegationTokens( + creds: Credentials, + conf: SparkConf, + hadoopConf: Configuration): (Array[Byte], Long) = { + try { + val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds) + logDebug(s"Initialized tokens") + (serializeCreds(creds), nextRT(rt, conf)) + } catch { + case e: Exception => + logError(s"Failed to fetch Hadoop delegation tokens $e") + throw e + } + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index edeaa380194ac..e495e519baf65 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -22,6 +22,7 @@ import java.util.Properties import io.fabric8.kubernetes.api.model._ import io.fabric8.kubernetes.client.KubernetesClient +import org.apache.hadoop.security.UserGroupInformation import scala.collection.mutable import scala.util.control.NonFatal @@ -45,7 +46,8 @@ private[spark] case class ClientArguments( mainAppResource: Option[MainAppResource], mainClass: String, driverArgs: Array[String], - maybePyFiles: Option[String]) + maybePyFiles: Option[String], + hadoopConfigDir: Option[String]) private[spark] object ClientArguments { @@ -79,7 +81,8 @@ private[spark] object ClientArguments { mainAppResource, mainClass.get, driverArgs.toArray, - maybePyFiles) + maybePyFiles, + sys.env.get(ENV_HADOOP_CONF_DIR)) } } @@ -222,7 +225,8 @@ private[spark] class KubernetesClientApplication extends SparkApplication { clientArguments.mainAppResource, clientArguments.mainClass, clientArguments.driverArgs, - clientArguments.maybePyFiles) + clientArguments.maybePyFiles, + clientArguments.hadoopConfigDir) val builder = new KubernetesDriverBuilder val namespace = kubernetesConf.namespace() // The master URL has been checked for validity already in SparkSubmit. diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala index 8f3f18ffadc3b..b0b53321abd25 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy.k8s.submit import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf} -import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, EnvSecretsFeatureStep, LocalDirsFeatureStep, MountSecretsFeatureStep, MountVolumesFeatureStep} +import org.apache.spark.deploy.k8s.features._ import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, PythonDriverFeatureStep, RDriverFeatureStep} private[spark] class KubernetesDriverBuilder( @@ -51,7 +51,11 @@ private[spark] class KubernetesDriverBuilder( provideJavaStep: ( KubernetesConf[KubernetesDriverSpecificConf] => JavaDriverFeatureStep) = - new JavaDriverFeatureStep(_)) { + new JavaDriverFeatureStep(_), + provideHadoopGlobalStep: ( + KubernetesConf[KubernetesDriverSpecificConf] + => KerberosConfDriverFeatureStep) = + new KerberosConfDriverFeatureStep(_)) { def buildFromFeatures( kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]): KubernetesDriverSpec = { @@ -80,8 +84,14 @@ private[spark] class KubernetesDriverBuilder( provideRStep(kubernetesConf)} .getOrElse(provideJavaStep(kubernetesConf)) - val allFeatures = (baseFeatures :+ bindingsStep) ++ - secretFeature ++ envSecretFeature ++ volumesFeature + val maybeHadoopConfigStep = + kubernetesConf.hadoopConfSpec.map { _ => + provideHadoopGlobalStep(kubernetesConf)} + + val allFeatures: Seq[KubernetesFeatureConfigStep] = + (baseFeatures :+ bindingsStep) ++ + secretFeature ++ envSecretFeature ++ volumesFeature ++ + maybeHadoopConfigStep.toSeq var spec = KubernetesDriverSpec.initialSpec(kubernetesConf.sparkConf.getAll.toMap) for (feature <- allFeatures) { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala index 364b6fb367722..6199a8ae30430 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala @@ -17,8 +17,8 @@ package org.apache.spark.scheduler.cluster.k8s import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, KubernetesRoleSpecificConf, SparkPod} +import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.features._ -import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, EnvSecretsFeatureStep, LocalDirsFeatureStep, MountSecretsFeatureStep} private[spark] class KubernetesExecutorBuilder( provideBasicStep: (KubernetesConf [KubernetesExecutorSpecificConf]) @@ -35,10 +35,26 @@ private[spark] class KubernetesExecutorBuilder( new LocalDirsFeatureStep(_), provideVolumesStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf] => MountVolumesFeatureStep) = - new MountVolumesFeatureStep(_)) { + new MountVolumesFeatureStep(_), + provideHadoopConfStep: ( + KubernetesConf[KubernetesExecutorSpecificConf] + => HadoopConfExecutorFeatureStep) = + new HadoopConfExecutorFeatureStep(_), + provideKerberosConfStep: ( + KubernetesConf[KubernetesExecutorSpecificConf] + => KerberosConfExecutorFeatureStep) = + new KerberosConfExecutorFeatureStep(_), + provideHadoopSparkUserStep: ( + KubernetesConf[KubernetesExecutorSpecificConf] + => HadoopSparkUserExecutorFeatureStep) = + new HadoopSparkUserExecutorFeatureStep(_)) { def buildFromFeatures( kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf]): SparkPod = { + val sparkConf = kubernetesConf.sparkConf + val maybeHadoopConfigMap = sparkConf.getOption(HADOOP_CONFIG_MAP_NAME) + val maybeDTSecretName = sparkConf.getOption(KERBEROS_DT_SECRET_NAME) + val maybeDTDataItem = sparkConf.getOption(KERBEROS_DT_SECRET_KEY) val baseFeatures = Seq(provideBasicStep(kubernetesConf), provideLocalDirsStep(kubernetesConf)) val secretFeature = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) { @@ -51,7 +67,23 @@ private[spark] class KubernetesExecutorBuilder( Seq(provideVolumesStep(kubernetesConf)) } else Nil - val allFeatures = baseFeatures ++ secretFeature ++ secretEnvFeature ++ volumesFeature + val maybeHadoopConfFeatureSteps = maybeHadoopConfigMap.map { _ => + val maybeKerberosStep = + if (maybeDTSecretName.isDefined && maybeDTDataItem.isDefined) { + provideKerberosConfStep(kubernetesConf) + } else { + provideHadoopSparkUserStep(kubernetesConf) + } + Seq(provideHadoopConfStep(kubernetesConf)) :+ + maybeKerberosStep + }.getOrElse(Seq.empty[KubernetesFeatureConfigStep]) + + val allFeatures: Seq[KubernetesFeatureConfigStep] = + baseFeatures ++ + secretFeature ++ + secretEnvFeature ++ + volumesFeature ++ + maybeHadoopConfFeatureSteps var executorPod = SparkPod.initialPod() for (feature <- allFeatures) { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala index e3c19cdb81567..bb2b94f9976e2 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala @@ -59,7 +59,8 @@ class KubernetesConfSuite extends SparkFunSuite { mainAppResource = None, MAIN_CLASS, APP_ARGS, - maybePyFiles = None) + maybePyFiles = None, + hadoopConfDir = None) assert(conf.appId === APP_ID) assert(conf.sparkConf.getAll.toMap === sparkConf.getAll.toMap) assert(conf.appResourceNamePrefix === RESOURCE_NAME_PREFIX) @@ -81,7 +82,8 @@ class KubernetesConfSuite extends SparkFunSuite { mainAppJar, MAIN_CLASS, APP_ARGS, - maybePyFiles = None) + maybePyFiles = None, + hadoopConfDir = None) assert(kubernetesConfWithMainJar.sparkConf.get("spark.jars") .split(",") === Array("local:///opt/spark/jar1.jar", "local:///opt/spark/main.jar")) @@ -93,7 +95,8 @@ class KubernetesConfSuite extends SparkFunSuite { mainAppResource = None, MAIN_CLASS, APP_ARGS, - maybePyFiles = None) + maybePyFiles = None, + hadoopConfDir = None) assert(kubernetesConfWithoutMainJar.sparkConf.get("spark.jars").split(",") === Array("local:///opt/spark/jar1.jar")) assert(kubernetesConfWithoutMainJar.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.1) @@ -114,7 +117,8 @@ class KubernetesConfSuite extends SparkFunSuite { mainAppResource, MAIN_CLASS, APP_ARGS, - Some(inputPyFiles.mkString(","))) + Some(inputPyFiles.mkString(",")), + hadoopConfDir = None) assert(kubernetesConfWithMainResource.sparkConf.get("spark.jars").split(",") === Array("local:///opt/spark/jar1.jar")) assert(kubernetesConfWithMainResource.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.4) @@ -136,7 +140,8 @@ class KubernetesConfSuite extends SparkFunSuite { mainAppResource, MAIN_CLASS, APP_ARGS, - maybePyFiles = None) + maybePyFiles = None, + hadoopConfDir = None) assert(kubernetesConfWithMainResource.sparkConf.get("spark.jars").split(",") === Array("local:///opt/spark/jar1.jar")) assert(kubernetesConfWithMainResource.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.4) @@ -158,7 +163,8 @@ class KubernetesConfSuite extends SparkFunSuite { mainAppResource, MAIN_CLASS, APP_ARGS, - None) + maybePyFiles = None, + hadoopConfDir = None) assert(conf.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.3) } @@ -189,7 +195,8 @@ class KubernetesConfSuite extends SparkFunSuite { mainAppResource = None, MAIN_CLASS, APP_ARGS, - maybePyFiles = None) + maybePyFiles = None, + hadoopConfDir = None) assert(conf.roleLabels === Map( SPARK_APP_ID_LABEL -> APP_ID, SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) ++ diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index 0968cce971c31..eebdd157da638 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -77,7 +77,8 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { Map.empty, DRIVER_ENVS, Nil, - Seq.empty[String]) + Seq.empty[String], + hadoopConfSpec = None) val featureStep = new BasicDriverFeatureStep(kubernetesConf) val basePod = SparkPod.initialPod() @@ -139,7 +140,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { .set(CONTAINER_IMAGE, "spark-driver:latest") val pythonSparkConf = new SparkConf() .set(org.apache.spark.internal.config.DRIVER_MEMORY.key, "4g") - .set(CONTAINER_IMAGE, "spark-driver:latest") + .set(CONTAINER_IMAGE, "spark-driver-py:latest") val javaKubernetesConf = KubernetesConf( javaSparkConf, KubernetesDriverSpecificConf( @@ -155,7 +156,9 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { Map.empty, DRIVER_ENVS, Nil, - Seq.empty[String]) + Seq.empty[String], + hadoopConfSpec = None) + val pythonKubernetesConf = KubernetesConf( pythonSparkConf, KubernetesDriverSpecificConf( @@ -171,12 +174,15 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { Map.empty, DRIVER_ENVS, Nil, - Seq.empty[String]) + Seq.empty[String], + hadoopConfSpec = None) val javaFeatureStep = new BasicDriverFeatureStep(javaKubernetesConf) val pythonFeatureStep = new BasicDriverFeatureStep(pythonKubernetesConf) val basePod = SparkPod.initialPod() val configuredJavaPod = javaFeatureStep.configurePod(basePod) val configuredPythonPod = pythonFeatureStep.configurePod(basePod) + assert(configuredJavaPod.container.getImage === "spark-driver:latest") + assert(configuredPythonPod.container.getImage === "spark-driver-py:latest") } test("Additional system properties resolve jars and set cluster-mode confs.") { @@ -198,7 +204,8 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { Map.empty, DRIVER_ENVS, Nil, - allFiles) + allFiles, + hadoopConfSpec = None) val step = new BasicDriverFeatureStep(kubernetesConf) val additionalProperties = step.getAdditionalPodSystemProperties() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index 63b237b9dfe46..41f34bd45cd5b 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -91,7 +91,8 @@ class BasicExecutorFeatureStepSuite Map.empty, Map.empty, Nil, - Seq.empty[String])) + Seq.empty[String], + hadoopConfSpec = None)) val executor = step.configurePod(SparkPod.initialPod()) // The executor pod name and default labels. @@ -131,7 +132,8 @@ class BasicExecutorFeatureStepSuite Map.empty, Map.empty, Nil, - Seq.empty[String])) + Seq.empty[String], + hadoopConfSpec = None)) assert(step.configurePod(SparkPod.initialPod()).pod.getSpec.getHostname.length === 63) } @@ -152,7 +154,8 @@ class BasicExecutorFeatureStepSuite Map.empty, Map("qux" -> "quux"), Nil, - Seq.empty[String])) + Seq.empty[String], + hadoopConfSpec = None)) val executor = step.configurePod(SparkPod.initialPod()) checkEnv(executor, @@ -179,7 +182,8 @@ class BasicExecutorFeatureStepSuite Map.empty, Map.empty, Nil, - Seq.empty[String])) + Seq.empty[String], + hadoopConfSpec = None)) val executor = step.configurePod(SparkPod.initialPod()) // This is checking that basic executor + executorMemory = 1408 + 42 = 1450 assert(executor.container.getResources.getRequests.get("memory").getAmount === "1450Mi") diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala index 7e916b3854404..8675ceb48cf6d 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala @@ -62,7 +62,8 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef Map.empty, Map.empty, Nil, - Seq.empty[String]) + Seq.empty[String], + hadoopConfSpec = None) val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf) assert(kubernetesCredentialsStep.configurePod(BASE_DRIVER_POD) === BASE_DRIVER_POD) assert(kubernetesCredentialsStep.getAdditionalPodSystemProperties().isEmpty) @@ -94,7 +95,8 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef Map.empty, Map.empty, Nil, - Seq.empty[String]) + Seq.empty[String], + hadoopConfSpec = None) val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf) assert(kubernetesCredentialsStep.configurePod(BASE_DRIVER_POD) === BASE_DRIVER_POD) @@ -133,7 +135,8 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef Map.empty, Map.empty, Nil, - Seq.empty[String]) + Seq.empty[String], + hadoopConfSpec = None) val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf) val resolvedProperties = kubernetesCredentialsStep.getAdditionalPodSystemProperties() val expectedSparkConf = Map( diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala index 8b91e93eecd8c..5c3e801501513 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala @@ -68,7 +68,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String])) + Seq.empty[String], + hadoopConfSpec = None)) assert(configurationStep.configurePod(SparkPod.initialPod()) === SparkPod.initialPod()) assert(configurationStep.getAdditionalKubernetesResources().size === 1) assert(configurationStep.getAdditionalKubernetesResources().head.isInstanceOf[Service]) @@ -100,7 +101,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String])) + Seq.empty[String], + hadoopConfSpec = None)) val expectedServiceName = SHORT_RESOURCE_NAME_PREFIX + DriverServiceFeatureStep.DRIVER_SVC_POSTFIX val expectedHostName = s"$expectedServiceName.my-namespace.svc" @@ -122,7 +124,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String])) + Seq.empty[String], + hadoopConfSpec = None)) val resolvedService = configurationStep .getAdditionalKubernetesResources() .head @@ -153,7 +156,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String]), + Seq.empty[String], + hadoopConfSpec = None), clock) val driverService = configurationStep .getAdditionalKubernetesResources() @@ -181,7 +185,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String]), + Seq.empty[String], + hadoopConfSpec = None), clock) fail("The driver bind address should not be allowed.") } catch { @@ -207,7 +212,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String]), + Seq.empty[String], + hadoopConfSpec = None), clock) fail("The driver host address should not be allowed.") } catch { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala index 85c6cb282d2b0..43796b77efdc7 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala @@ -46,7 +46,8 @@ class EnvSecretsFeatureStepSuite extends SparkFunSuite{ envVarsToKeys, Map.empty, Nil, - Seq.empty[String]) + Seq.empty[String], + hadoopConfSpec = None) val step = new EnvSecretsFeatureStep(kubernetesConf) val driverContainerWithEnvSecrets = step.configurePod(baseDriverPod).container diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala index acdd07bc594b2..3a4e60547d7f2 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala @@ -48,7 +48,8 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String]) + Seq.empty[String], + hadoopConfSpec = None) } test("Resolve to default local dir if neither env nor configuration are set") { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala index dad610c443acc..18e3d773f690d 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala @@ -44,7 +44,8 @@ class MountSecretsFeatureStepSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String]) + Seq.empty[String], + hadoopConfSpec = None) val step = new MountSecretsFeatureStep(kubernetesConf) val driverPodWithSecretsMounted = step.configurePod(baseDriverPod).pod diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala index d309aa94ec115..0d0a5fb951f64 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala @@ -36,7 +36,8 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { roleSecretEnvNamesToKeyRefs = Map.empty, roleEnvs = Map.empty, roleVolumes = Nil, - sparkFiles = Nil) + sparkFiles = Nil, + hadoopConfSpec = None) test("Mounts hostPath volumes") { val volumeConf = KubernetesVolumeSpec( diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala index bf552aeb8b901..9172e0c3dc408 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala @@ -43,7 +43,8 @@ class JavaDriverFeatureStepSuite extends SparkFunSuite { roleSecretEnvNamesToKeyRefs = Map.empty, roleEnvs = Map.empty, roleVolumes = Nil, - sparkFiles = Seq.empty[String]) + sparkFiles = Seq.empty[String], + hadoopConfSpec = None) val step = new JavaDriverFeatureStep(kubernetesConf) val driverPod = step.configurePod(baseDriverPod).pod diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala index c14af1d3b0f01..2bcc6465b79d6 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala @@ -53,7 +53,8 @@ class PythonDriverFeatureStepSuite extends SparkFunSuite { roleSecretEnvNamesToKeyRefs = Map.empty, roleEnvs = Map.empty, roleVolumes = Nil, - sparkFiles = Seq.empty[String]) + sparkFiles = Seq.empty[String], + hadoopConfSpec = None) val step = new PythonDriverFeatureStep(kubernetesConf) val driverPod = step.configurePod(baseDriverPod).pod @@ -90,7 +91,8 @@ class PythonDriverFeatureStepSuite extends SparkFunSuite { roleSecretEnvNamesToKeyRefs = Map.empty, roleEnvs = Map.empty, roleVolumes = Nil, - sparkFiles = Seq.empty[String]) + sparkFiles = Seq.empty[String], + hadoopConfSpec = None) val step = new PythonDriverFeatureStep(kubernetesConf) val driverContainerwithPySpark = step.configurePod(baseDriverPod).container val args = driverContainerwithPySpark diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStepSuite.scala index ace0faa8629c3..17af6011a17d5 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStepSuite.scala @@ -47,7 +47,8 @@ class RDriverFeatureStepSuite extends SparkFunSuite { roleSecretEnvNamesToKeyRefs = Map.empty, roleEnvs = Map.empty, roleVolumes = Seq.empty, - sparkFiles = Seq.empty[String]) + sparkFiles = Seq.empty[String], + hadoopConfSpec = None) val step = new RDriverFeatureStep(kubernetesConf) val driverContainerwithR = step.configurePod(baseDriverPod).container diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala index 4d8e79189ff32..ae13df39b7a76 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala @@ -142,7 +142,8 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String]) + Seq.empty[String], + hadoopConfSpec = None) when(driverBuilder.buildFromFeatures(kubernetesConf)).thenReturn(BUILT_KUBERNETES_SPEC) when(kubernetesClient.pods()).thenReturn(podOperations) when(podOperations.withName(POD_NAME)).thenReturn(namedPods) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala index 4117c5487a41e..051d7b6994f5d 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.deploy.k8s.submit import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.features._ -import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, EnvSecretsFeatureStep, KubernetesFeaturesTestUtils, LocalDirsFeatureStep, MountSecretsFeatureStep} import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, PythonDriverFeatureStep, RDriverFeatureStep} class KubernetesDriverBuilderSuite extends SparkFunSuite { @@ -30,9 +29,10 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { private val LOCAL_DIRS_STEP_TYPE = "local-dirs" private val SECRETS_STEP_TYPE = "mount-secrets" private val JAVA_STEP_TYPE = "java-bindings" - private val PYSPARK_STEP_TYPE = "pyspark-bindings" private val R_STEP_TYPE = "r-bindings" + private val PYSPARK_STEP_TYPE = "pyspark-bindings" private val ENV_SECRETS_STEP_TYPE = "env-secrets" + private val HADOOP_GLOBAL_STEP_TYPE = "hadoop-global" private val MOUNT_VOLUMES_STEP_TYPE = "mount-volumes" private val basicFeatureStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( @@ -62,6 +62,9 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { private val envSecretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( ENV_SECRETS_STEP_TYPE, classOf[EnvSecretsFeatureStep]) + private val hadoopGlobalStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( + HADOOP_GLOBAL_STEP_TYPE, classOf[KerberosConfDriverFeatureStep]) + private val mountVolumesStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( MOUNT_VOLUMES_STEP_TYPE, classOf[MountVolumesFeatureStep]) @@ -76,7 +79,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { _ => mountVolumesStep, _ => pythonStep, _ => rStep, - _ => javaStep) + _ => javaStep, + _ => hadoopGlobalStep) test("Apply fundamental steps all the time.") { val conf = KubernetesConf( @@ -94,7 +98,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String]) + Seq.empty[String], + hadoopConfSpec = None) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, @@ -120,7 +125,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Map("EnvName" -> "SecretName:secretKey"), Map.empty, Nil, - Seq.empty[String]) + Seq.empty[String], + hadoopConfSpec = None) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, @@ -148,7 +154,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String]) + Seq.empty[String], + hadoopConfSpec = None) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, @@ -174,7 +181,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String]) + Seq.empty[String], + hadoopConfSpec = None) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, @@ -205,7 +213,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, volumeSpec :: Nil, - Seq.empty[String]) + Seq.empty[String], + hadoopConfSpec = None) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, @@ -232,7 +241,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String]) + Seq.empty[String], + hadoopConfSpec = None) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, @@ -242,8 +252,71 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { R_STEP_TYPE) } + test("Apply HadoopSteps if HADOOP_CONF_DIR is defined.") { + val conf = KubernetesConf( + new SparkConf(false), + KubernetesDriverSpecificConf( + None, + "test-app", + "main", + Seq.empty), + "prefix", + "appId", + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Nil, + Seq.empty[String], + hadoopConfSpec = Some( + HadoopConfSpec( + Some("/var/hadoop-conf"), + None))) + validateStepTypesApplied( + builderUnderTest.buildFromFeatures(conf), + BASIC_STEP_TYPE, + CREDENTIALS_STEP_TYPE, + SERVICE_STEP_TYPE, + LOCAL_DIRS_STEP_TYPE, + JAVA_STEP_TYPE, + HADOOP_GLOBAL_STEP_TYPE) + } + + test("Apply HadoopSteps if HADOOP_CONF ConfigMap is defined.") { + val conf = KubernetesConf( + new SparkConf(false), + KubernetesDriverSpecificConf( + None, + "test-app", + "main", + Seq.empty), + "prefix", + "appId", + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Nil, + Seq.empty[String], + hadoopConfSpec = Some( + HadoopConfSpec( + None, + Some("pre-defined-configMapName")))) + validateStepTypesApplied( + builderUnderTest.buildFromFeatures(conf), + BASIC_STEP_TYPE, + CREDENTIALS_STEP_TYPE, + SERVICE_STEP_TYPE, + LOCAL_DIRS_STEP_TYPE, + JAVA_STEP_TYPE, + HADOOP_GLOBAL_STEP_TYPE) + } + + private def validateStepTypesApplied(resolvedSpec: KubernetesDriverSpec, stepTypes: String*) - : Unit = { + : Unit = { assert(resolvedSpec.systemProperties.size === stepTypes.size) stepTypes.foreach { stepType => assert(resolvedSpec.pod.pod.getMetadata.getLabels.get(stepType) === stepType) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index 0e617b0021019..b336774838bcb 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -162,6 +162,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { } else { val k8sConf = argument.asInstanceOf[KubernetesConf[KubernetesExecutorSpecificConf]] val executorSpecificConf = k8sConf.roleSpecificConf + // TODO: HADOOP_CONF_DIR val expectedK8sConf = KubernetesConf.createExecutorConf( conf, executorSpecificConf.executorId, diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala index 44fe4a24e1102..b572dac2bf624 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala @@ -20,6 +20,7 @@ import io.fabric8.kubernetes.api.model.PodBuilder import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s._ +import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.features._ class KubernetesExecutorBuilderSuite extends SparkFunSuite { @@ -27,6 +28,9 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { private val SECRETS_STEP_TYPE = "mount-secrets" private val ENV_SECRETS_STEP_TYPE = "env-secrets" private val LOCAL_DIRS_STEP_TYPE = "local-dirs" + private val HADOOP_CONF_STEP_TYPE = "hadoop-conf-step" + private val HADOOP_SPARK_USER_STEP_TYPE = "hadoop-spark-user" + private val KERBEROS_CONF_STEP_TYPE = "kerberos-step" private val MOUNT_VOLUMES_STEP_TYPE = "mount-volumes" private val basicFeatureStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( @@ -37,6 +41,12 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { ENV_SECRETS_STEP_TYPE, classOf[EnvSecretsFeatureStep]) private val localDirsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( LOCAL_DIRS_STEP_TYPE, classOf[LocalDirsFeatureStep]) + private val hadoopConfStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( + HADOOP_CONF_STEP_TYPE, classOf[HadoopConfExecutorFeatureStep]) + private val hadoopSparkUser = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( + HADOOP_SPARK_USER_STEP_TYPE, classOf[HadoopSparkUserExecutorFeatureStep]) + private val kerberosConf = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( + KERBEROS_CONF_STEP_TYPE, classOf[KerberosConfExecutorFeatureStep]) private val mountVolumesStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( MOUNT_VOLUMES_STEP_TYPE, classOf[MountVolumesFeatureStep]) @@ -45,7 +55,10 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { _ => mountSecretsStep, _ => envSecretsStep, _ => localDirsStep, - _ => mountVolumesStep) + _ => mountVolumesStep, + _ => hadoopConfStep, + _ => kerberosConf, + _ => hadoopSparkUser) test("Basic steps are consistently applied.") { val conf = KubernetesConf( @@ -60,7 +73,8 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String]) + Seq.empty[String], + None) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, LOCAL_DIRS_STEP_TYPE) } @@ -78,7 +92,8 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { Map("secret-name" -> "secret-key"), Map.empty, Nil, - Seq.empty[String]) + Seq.empty[String], + None) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, @@ -105,7 +120,8 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, volumeSpec :: Nil, - Seq.empty[String]) + Seq.empty[String], + None) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, @@ -113,6 +129,64 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { MOUNT_VOLUMES_STEP_TYPE) } + test("Apply basicHadoop step if HADOOP_CONF_DIR is defined") { + // HADOOP_DELEGATION_TOKEN + val HADOOP_CREDS_PREFIX = "spark.security.credentials." + val HADOOPFS_PROVIDER = s"$HADOOP_CREDS_PREFIX.hadoopfs.enabled" + val conf = KubernetesConf( + new SparkConf(false) + .set(HADOOP_CONFIG_MAP_NAME, "hadoop-conf-map-name") + .set(KRB5_CONFIG_MAP_NAME, "krb5-conf-map-name") + .set(KERBEROS_SPARK_USER_NAME, "spark-user") + .set(HADOOPFS_PROVIDER, "true"), + KubernetesExecutorSpecificConf( + "executor-id", Some(new PodBuilder().build())), + "prefix", + "appId", + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Nil, + Seq.empty[String], + Some(HadoopConfSpec(Some("/var/hadoop-conf"), None))) + validateStepTypesApplied( + builderUnderTest.buildFromFeatures(conf), + BASIC_STEP_TYPE, + LOCAL_DIRS_STEP_TYPE, + HADOOP_CONF_STEP_TYPE, + HADOOP_SPARK_USER_STEP_TYPE) + } + + test("Apply kerberos step if DT secrets created") { + val conf = KubernetesConf( + new SparkConf(false) + .set(HADOOP_CONFIG_MAP_NAME, "hadoop-conf-map-name") + .set(KRB5_CONFIG_MAP_NAME, "krb5-conf-map-name") + .set(KERBEROS_SPARK_USER_NAME, "spark-user") + .set(KERBEROS_DT_SECRET_NAME, "dt-secret") + .set(KERBEROS_DT_SECRET_KEY, "dt-key"), + KubernetesExecutorSpecificConf( + "executor-id", Some(new PodBuilder().build())), + "prefix", + "appId", + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Nil, + Seq.empty[String], + Some(HadoopConfSpec(None, Some("pre-defined-onfigMapName")))) + validateStepTypesApplied( + builderUnderTest.buildFromFeatures(conf), + BASIC_STEP_TYPE, + LOCAL_DIRS_STEP_TYPE, + HADOOP_CONF_STEP_TYPE, + KERBEROS_CONF_STEP_TYPE) + } + private def validateStepTypesApplied(resolvedPod: SparkPod, stepTypes: String*): Unit = { assert(resolvedPod.pod.getMetadata.getLabels.size === stepTypes.size) stepTypes.foreach { stepType => diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile index 7ae57bf6e42d0..f51b0ffc32fee 100644 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile @@ -29,7 +29,7 @@ ARG img_path=kubernetes/dockerfiles RUN set -ex && \ apk upgrade --no-cache && \ - apk add --no-cache bash tini libc6-compat linux-pam && \ + apk add --no-cache bash tini libc6-compat linux-pam krb5 krb5-libs && \ mkdir -p /opt/spark && \ mkdir -p /opt/spark/work-dir && \ touch /opt/spark/RELEASE && \ diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh index 216e8fe31becb..4958b7363fee0 100755 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh @@ -83,6 +83,10 @@ elif [ "$PYSPARK_MAJOR_PYTHON_VERSION" == "3" ]; then export PYSPARK_DRIVER_PYTHON="python3" fi +if ! [ -z ${HADOOP_CONF_DIR+x} ]; then + SPARK_CLASSPATH="$HADOOP_CONF_DIR:$SPARK_CLASSPATH"; +fi + case "$SPARK_K8S_CMD" in driver) CMD=(