Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
73f2777
initial Driver logic for Hadoop and Kerberos Support
ifilonenko Jun 29, 2018
6069be5
add executors... still need to refactor to use sparkConf exclusivley
ifilonenko Jul 1, 2018
000120f
refactored executor logic preparing for e2e testing
ifilonenko Jul 2, 2018
13b3adc
resolved initial comments
ifilonenko Jul 7, 2018
0939738
merge conflicts
ifilonenko Jul 30, 2018
347536e
Merge branch 'spark-master' into secure-hdfs
ifilonenko Aug 7, 2018
c30ad8c
launching driver with kerberos authentication instead of simple
ifilonenko Aug 7, 2018
1697e74
merge conflicts and addition of security context
ifilonenko Aug 20, 2018
4a000d2
fix dockerfile
ifilonenko Aug 20, 2018
719b059
non-effective attempt to solve null UnixUsername error
ifilonenko Aug 29, 2018
fb9e810
move credential get
ifilonenko Aug 29, 2018
e7935f8
current working solution
ifilonenko Sep 4, 2018
aa3779c
merge conflicts
ifilonenko Sep 4, 2018
32c408c
merge conflicts
ifilonenko Sep 7, 2018
3cf644e
Merge branch 'spark-master' into secure-hdfs
ifilonenko Sep 13, 2018
583a52c
merge conflicts and various additions
ifilonenko Sep 21, 2018
6ae3def
Merge branch 'spark-master' into secure-hdfs
ifilonenko Sep 21, 2018
78953e6
fixes so tests pass
ifilonenko Sep 21, 2018
73f157f
refactor to handle login logic being used in spark-submit
ifilonenko Sep 26, 2018
367e65b
Merge branch 'spark-master' into secure-hdfs
ifilonenko Sep 27, 2018
5f52a1a
resolve comments and add documentation
ifilonenko Sep 27, 2018
6548ef9
resolved comments
ifilonenko Oct 6, 2018
7f72af5
resolved rest of comments
ifilonenko Oct 6, 2018
4ce00a5
small doc addition
ifilonenko Oct 6, 2018
89063fd
fixes to pass kerberos tests
ifilonenko Oct 7, 2018
e303048
resolve comments
ifilonenko Oct 8, 2018
69840a8
resolve comments
ifilonenko Oct 9, 2018
2108154
style and indentation
ifilonenko Oct 9, 2018
a987a70
resolving comments
ifilonenko Oct 9, 2018
e2f8063
hopefully final comment resolution
ifilonenko Oct 9, 2018
f3a0ffb
style issues
ifilonenko Oct 10, 2018
a958920
included new ability to bake krb5.conf into your docker images and no…
ifilonenko Oct 10, 2018
dd95fca
style check
ifilonenko Oct 10, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ private[spark] class SparkSubmit extends Logging {
val targetDir = Utils.createTempDir()

// assure a keytab is available from any place in a JVM
if (clusterManager == YARN || clusterManager == LOCAL || isMesosClient) {
if (clusterManager == YARN || clusterManager == LOCAL || isMesosClient || isKubernetesCluster) {
if (args.principal != null) {
if (args.keytab != null) {
require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist")
Expand Down Expand Up @@ -646,7 +646,8 @@ private[spark] class SparkSubmit extends Logging {
}
}

if (clusterManager == MESOS && UserGroupInformation.isSecurityEnabled) {
if ((clusterManager == MESOS || clusterManager == KUBERNETES)
&& UserGroupInformation.isSecurityEnabled) {
setRMPrincipal(sparkConf)
}

Expand Down Expand Up @@ -762,8 +763,8 @@ private[spark] class SparkSubmit extends Logging {
}

// [SPARK-20328]. HadoopRDD calls into a Hadoop library that fetches delegation tokens with
// renewer set to the YARN ResourceManager. Since YARN isn't configured in Mesos mode, we
// must trick it into thinking we're YARN.
// renewer set to the YARN ResourceManager. Since YARN isn't configured in Mesos or Kubernetes
// mode, we must trick it into thinking we're YARN.
private def setRMPrincipal(sparkConf: SparkConf): Unit = {
val shortUserName = UserGroupInformation.getCurrentUser.getShortUserName
val key = s"spark.hadoop.${YarnConfiguration.RM_PRINCIPAL}"
Expand Down
41 changes: 41 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -820,4 +820,45 @@ specific to Spark on Kubernetes.
This sets the major Python version of the docker image used to run the driver and executor containers. Can either be 2 or 3.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos.krb5.path</code></td>
<td><code>(none)</code></td>
<td>
Specify the local location of the krb5.conf file to be mounted on the driver and executors for Kerberos interaction.
It is important to note that the KDC defined needs to be visible from inside the containers.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos.krb5.configMapName</code></td>
<td><code>(none)</code></td>
<td>
Specify the name of the ConfigMap, containing the krb5.conf file, to be mounted on the driver and executors
for Kerberos interaction. The KDC defined needs to be visible from inside the containers. The ConfigMap must also
be in the same namespace of the driver and executor pods.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.hadoop.configMapName</code></td>
<td><code>(none)</code></td>
<td>
Specify the name of the ConfigMap, containing the HADOOP_CONF_DIR files, to be mounted on the driver
and executors for custom Hadoop configuration.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos.tokenSecret.name</code></td>
<td><code>(none)</code></td>
<td>
Specify the name of the secret where your existing delegation tokens are stored. This removes the need for the job user
to provide any kerberos credentials for launching a job.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos.tokenSecret.itemKey</code></td>
<td><code>(none)</code></td>
<td>
Specify the item key of the data where your existing delegation tokens are stored. This removes the need for the job user
to provide any kerberos credentials for launching a job.
</td>
</tr>
</table>
75 changes: 75 additions & 0 deletions docs/security.md
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,82 @@ with encryption, at least.
The Kerberos login will be periodically renewed using the provided credentials, and new delegation
tokens for supported will be created.

## Secure Interaction with Kubernetes

When talking to Hadoop-based services behind Kerberos, it was noted that Spark needs to obtain delegation tokens
so that non-local processes can authenticate. These delegation tokens in Kubernetes are stored in Secrets that are
shared by the Driver and its Executors. As such, there are three ways of submitting a Kerberos job:

In all cases you must define the environment variable: `HADOOP_CONF_DIR` or
`spark.kubernetes.hadoop.configMapName.`

It also important to note that the KDC needs to be visible from inside the containers.

If a user wishes to use a remote HADOOP_CONF directory, that contains the Hadoop configuration files, this could be
achieved by setting `spark.kubernetes.hadoop.configMapName` to a pre-existing ConfigMap.

1. Submitting with a $kinit that stores a TGT in the Local Ticket Cache:
```bash
/usr/bin/kinit -kt <keytab_file> <username>/<krb5 realm>
/opt/spark/bin/spark-submit \
--deploy-mode cluster \
--class org.apache.spark.examples.HdfsTest \
--master k8s://<KUBERNETES_MASTER_ENDPOINT> \
--conf spark.executor.instances=1 \
--conf spark.app.name=spark-hdfs \
--conf spark.kubernetes.container.image=spark:latest \
--conf spark.kubernetes.kerberos.krb5.path=/etc/krb5.conf \
local:///opt/spark/examples/jars/spark-examples_<VERSION>.jar \
<HDFS_FILE_LOCATION>
```
2. Submitting with a local Keytab and Principal
```bash
/opt/spark/bin/spark-submit \
--deploy-mode cluster \
--class org.apache.spark.examples.HdfsTest \
--master k8s://<KUBERNETES_MASTER_ENDPOINT> \
--conf spark.executor.instances=1 \
--conf spark.app.name=spark-hdfs \
--conf spark.kubernetes.container.image=spark:latest \
--conf spark.kerberos.keytab=<KEYTAB_FILE> \
--conf spark.kerberos.principal=<PRINCIPAL> \
--conf spark.kubernetes.kerberos.krb5.path=/etc/krb5.conf \
local:///opt/spark/examples/jars/spark-examples_<VERSION>.jar \
<HDFS_FILE_LOCATION>
```

3. Submitting with pre-populated secrets, that contain the Delegation Token, already existing within the namespace
```bash
/opt/spark/bin/spark-submit \
--deploy-mode cluster \
--class org.apache.spark.examples.HdfsTest \
--master k8s://<KUBERNETES_MASTER_ENDPOINT> \
--conf spark.executor.instances=1 \
--conf spark.app.name=spark-hdfs \
--conf spark.kubernetes.container.image=spark:latest \
--conf spark.kubernetes.kerberos.tokenSecret.name=<SECRET_TOKEN_NAME> \
--conf spark.kubernetes.kerberos.tokenSecret.itemKey=<SECRET_ITEM_KEY> \
--conf spark.kubernetes.kerberos.krb5.path=/etc/krb5.conf \
local:///opt/spark/examples/jars/spark-examples_<VERSION>.jar \
<HDFS_FILE_LOCATION>
```

3b. Submitting like in (3) however specifying a pre-created krb5 ConfigMap and pre-created `HADOOP_CONF_DIR` ConfigMap
```bash
/opt/spark/bin/spark-submit \
--deploy-mode cluster \
--class org.apache.spark.examples.HdfsTest \
--master k8s://<KUBERNETES_MASTER_ENDPOINT> \
--conf spark.executor.instances=1 \
--conf spark.app.name=spark-hdfs \
--conf spark.kubernetes.container.image=spark:latest \
--conf spark.kubernetes.kerberos.tokenSecret.name=<SECRET_TOKEN_NAME> \
--conf spark.kubernetes.kerberos.tokenSecret.itemKey=<SECRET_ITEM_KEY> \
--conf spark.kubernetes.hadoop.configMapName=<HCONF_CONFIG_MAP_NAME> \
--conf spark.kubernetes.kerberos.krb5.configMapName=<KRB_CONFIG_MAP_NAME> \
local:///opt/spark/examples/jars/spark-examples_<VERSION>.jar \
<HDFS_FILE_LOCATION>
```
# Event Logging

If your applications are using event logging, the directory where the event logs go
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ object HdfsTest {
val end = System.currentTimeMillis()
println(s"Iteration $iter took ${end-start} ms")
}
println(s"File contents: ${file.map(_.toString).take(1).mkString(",").slice(0, 10)}")
println(s"Returned length(s) of: ${file.map(_.length).sum().toString}")
spark.stop()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,43 @@ private[spark] object Config extends Logging {
"Ensure that major Python version is either Python2 or Python3")
.createWithDefault("2")

val KUBERNETES_KERBEROS_KRB5_FILE =
ConfigBuilder("spark.kubernetes.kerberos.krb5.path")
.doc("Specify the local location of the krb5.conf file to be mounted on the driver " +
"and executors for Kerberos. Note: The KDC defined needs to be " +
"visible from inside the containers ")
.stringConf
.createOptional

val KUBERNETES_KERBEROS_KRB5_CONFIG_MAP =
ConfigBuilder("spark.kubernetes.kerberos.krb5.configMapName")
.doc("Specify the name of the ConfigMap, containing the krb5.conf file, to be mounted " +
"on the driver and executors for Kerberos. Note: The KDC defined" +
"needs to be visible from inside the containers ")
.stringConf
.createOptional

val KUBERNETES_HADOOP_CONF_CONFIG_MAP =
ConfigBuilder("spark.kubernetes.hadoop.configMapName")
.doc("Specify the name of the ConfigMap, containing the HADOOP_CONF_DIR files, " +
"to be mounted on the driver and executors for custom Hadoop configuration.")
.stringConf
.createOptional

val KUBERNETES_KERBEROS_DT_SECRET_NAME =
ConfigBuilder("spark.kubernetes.kerberos.tokenSecret.name")
.doc("Specify the name of the secret where your existing delegation tokens are stored. " +
"This removes the need for the job user to provide any keytab for launching a job")
.stringConf
.createOptional

val KUBERNETES_KERBEROS_DT_SECRET_ITEM_KEY =
ConfigBuilder("spark.kubernetes.kerberos.tokenSecret.itemKey")
.doc("Specify the item key of the data where your existing delegation tokens are stored. " +
"This removes the need for the job user to provide any keytab for launching a job")
.stringConf
.createOptional

val APP_RESOURCE_TYPE =
ConfigBuilder("spark.kubernetes.resource.type")
.doc("This sets the resource type internally")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,13 @@ private[spark] object Constants {
val ENV_CLASSPATH = "SPARK_CLASSPATH"
val ENV_DRIVER_BIND_ADDRESS = "SPARK_DRIVER_BIND_ADDRESS"
val ENV_SPARK_CONF_DIR = "SPARK_CONF_DIR"
val ENV_SPARK_USER = "SPARK_USER"
// Spark app configs for containers
val SPARK_CONF_VOLUME = "spark-conf-volume"
val SPARK_CONF_DIR_INTERNAL = "/opt/spark/conf"
val SPARK_CONF_FILE_NAME = "spark.properties"
val SPARK_CONF_PATH = s"$SPARK_CONF_DIR_INTERNAL/$SPARK_CONF_FILE_NAME"
val ENV_HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION"

// BINDINGS
val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY"
Expand All @@ -78,4 +80,29 @@ private[spark] object Constants {
val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
val MEMORY_OVERHEAD_MIN_MIB = 384L

// Hadoop Configuration
val HADOOP_FILE_VOLUME = "hadoop-properties"
val KRB_FILE_VOLUME = "krb5-file"
val HADOOP_CONF_DIR_PATH = "/opt/hadoop/conf"
val KRB_FILE_DIR_PATH = "/etc"
val ENV_HADOOP_CONF_DIR = "HADOOP_CONF_DIR"
val HADOOP_CONFIG_MAP_NAME =
"spark.kubernetes.executor.hadoopConfigMapName"
val KRB5_CONFIG_MAP_NAME =
"spark.kubernetes.executor.krb5ConfigMapName"

// Kerberos Configuration
val KERBEROS_DELEGEGATION_TOKEN_SECRET_NAME = "delegation-tokens"
val KERBEROS_DT_SECRET_NAME =
"spark.kubernetes.kerberos.dt-secret-name"
val KERBEROS_DT_SECRET_KEY =
"spark.kubernetes.kerberos.dt-secret-key"
val KERBEROS_SPARK_USER_NAME =
"spark.kubernetes.kerberos.spark-user-name"
val KERBEROS_SECRET_KEY = "hadoop-tokens"

// Hadoop credentials secrets for the Spark app.
val SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR = "/mnt/secrets/hadoop-credentials"
val SPARK_APP_HADOOP_SECRET_VOLUME_NAME = "hadoop-secret"
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ package org.apache.spark.deploy.k8s
import scala.collection.mutable

import io.fabric8.kubernetes.api.model.{LocalObjectReference, LocalObjectReferenceBuilder, Pod}
import org.apache.hadoop.conf.Configuration

import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.security.KubernetesHadoopDelegationTokenManager
import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.deploy.k8s.submit.KubernetesClientApplication._
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.config.ConfigEntry


Expand All @@ -47,6 +50,13 @@ private[spark] case class KubernetesExecutorSpecificConf(
driverPod: Option[Pod])
extends KubernetesRoleSpecificConf

/*
* Structure containing metadata for HADOOP_CONF_DIR customization
*/
private[spark] case class HadoopConfSpec(
hadoopConfDir: Option[String],
hadoopConfigMapName: Option[String])

/**
* Structure containing metadata for Kubernetes logic to build Spark pods.
*/
Expand All @@ -61,7 +71,15 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
roleSecretEnvNamesToKeyRefs: Map[String, String],
roleEnvs: Map[String, String],
roleVolumes: Iterable[KubernetesVolumeSpec[_ <: KubernetesVolumeSpecificConf]],
sparkFiles: Seq[String]) {
sparkFiles: Seq[String],
hadoopConfSpec: Option[HadoopConfSpec]) {

def hadoopConfigMapName: String = s"$appResourceNamePrefix-hadoop-config"

def krbConfigMapName: String = s"$appResourceNamePrefix-krb5-file"

def tokenManager(conf: SparkConf, hConf: Configuration): KubernetesHadoopDelegationTokenManager =
new KubernetesHadoopDelegationTokenManager(new HadoopDelegationTokenManager(conf, hConf))

def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)

Expand Down Expand Up @@ -116,7 +134,8 @@ private[spark] object KubernetesConf {
mainAppResource: Option[MainAppResource],
mainClass: String,
appArgs: Array[String],
maybePyFiles: Option[String]): KubernetesConf[KubernetesDriverSpecificConf] = {
maybePyFiles: Option[String],
hadoopConfDir: Option[String]): KubernetesConf[KubernetesDriverSpecificConf] = {
val sparkConfWithMainAppJar = sparkConf.clone()
val additionalFiles = mutable.ArrayBuffer.empty[String]
mainAppResource.foreach {
Expand Down Expand Up @@ -175,6 +194,19 @@ private[spark] object KubernetesConf {
.map(str => str.split(",").toSeq)
.getOrElse(Seq.empty[String]) ++ additionalFiles

val hadoopConfigMapName = sparkConf.get(KUBERNETES_HADOOP_CONF_CONFIG_MAP)
KubernetesUtils.requireNandDefined(
hadoopConfDir,
hadoopConfigMapName,
"Do not specify both the `HADOOP_CONF_DIR` in your ENV and the ConfigMap " +
"as the creation of an additional ConfigMap, when one is already specified is extraneous" )
val hadoopConfSpec =
if (hadoopConfDir.isDefined || hadoopConfigMapName.isDefined) {
Some(HadoopConfSpec(hadoopConfDir, hadoopConfigMapName))
} else {
None
}

KubernetesConf(
sparkConfWithMainAppJar,
KubernetesDriverSpecificConf(mainAppResource, mainClass, appName, appArgs),
Expand All @@ -186,7 +218,8 @@ private[spark] object KubernetesConf {
driverSecretEnvNamesToKeyRefs,
driverEnvs,
driverVolumes,
sparkFiles)
sparkFiles,
hadoopConfSpec)
}

def createExecutorConf(
Expand Down Expand Up @@ -242,6 +275,7 @@ private[spark] object KubernetesConf {
executorEnvSecrets,
executorEnv,
executorVolumes,
Seq.empty[String])
Seq.empty[String],
None)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,27 @@ private[spark] object KubernetesUtils {
sparkConf.getAllWithPrefix(prefix).toMap
}

def requireBothOrNeitherDefined(
opt1: Option[_],
opt2: Option[_],
errMessageWhenFirstIsMissing: String,
errMessageWhenSecondIsMissing: String): Unit = {
requireSecondIfFirstIsDefined(opt1, opt2, errMessageWhenSecondIsMissing)
requireSecondIfFirstIsDefined(opt2, opt1, errMessageWhenFirstIsMissing)
}

def requireSecondIfFirstIsDefined(
opt1: Option[_],
opt2: Option[_],
errMessageWhenSecondIsMissing: String): Unit = {
opt1.foreach { _ =>
require(opt2.isDefined, errMessageWhenSecondIsMissing)
}
}

def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = {
opt1.foreach { _ => require(opt2.isEmpty, errMessage) }
opt2.foreach { _ => require(opt1.isEmpty, errMessage) }
}

/**
Expand Down
Loading