From 626318d8bd89bc2f5121b8cf11d5bea07f426b1c Mon Sep 17 00:00:00 2001 From: mccheah Date: Tue, 13 Jan 2015 18:58:26 -0800 Subject: [PATCH] [SPARK-5158] Spark standalone mode can authenticate against a Kerberos-secured Hadoop cluster Previously, Kerberos secured Hadoop clusters could only be accessed by Spark running on top of YARN. In other words, Spark standalone clusters had no way to read from secure Hadoop clusters. Other solutions were proposed previously, but all of them attempted to perform authentication by obtaining a token on a single node and passing that token around to all of the other Spark worker nodes. The shipping of the token is risky, and all previous iterations fell short in leaving the token open to man-in-the-middle attacks. This patch introduces an alternative approach. It assumes that the keytab file has already been distributed to every node in the cluster. When Spark starts in standalone mode, all of the workers individually log in via Kerberos using specified configurations in the driver's SparkConf. In addition, on basic Hadoop cluster setups the key tab file is often already manually deployed on all of the cluster's nodes; it's not a huge stretch to expect the keytab files to be deployed to the Spark worker nodes as well, if they are not already there. This assumes that Spark will always authenticate with Kerberos using the same principal and keytab, and the login is done at the very start of the job. Strictly speaking we should be trying to reduce the surface area of the region of code that operates under a logged-in state. Or to put it another way, the authentication should only be performed precisely when files are written or read from HDFS, and after the read or write is performed the subject should be logged out. However this is difficult to write and prone to errors, so this is left for a future refactor. --- .../apache/spark/deploy/SparkHadoopUtil.scala | 15 ++-- .../deploy/StandaloneSparkHadoopUtil.scala | 82 +++++++++++++++++++ .../CoarseGrainedExecutorBackend.scala | 62 +++++++------- .../org/apache/spark/rdd/HadoopRDD.scala | 1 + .../apache/spark/rdd/PairRDDFunctions.scala | 1 + .../deploy/yarn/YarnSparkHadoopUtil.scala | 8 ++ 6 files changed, 132 insertions(+), 37 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/deploy/StandaloneSparkHadoopUtil.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index e0a32fb65cd51..fc33ad0ab7f17 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -39,9 +39,7 @@ import scala.collection.JavaConversions._ * Contains util methods to interact with Hadoop from Spark. */ @DeveloperApi -class SparkHadoopUtil extends Logging { - val conf: Configuration = newConfiguration(new SparkConf()) - UserGroupInformation.setConfiguration(conf) +abstract class SparkHadoopUtil extends Logging { /** * Runs the given function with a Hadoop UserGroupInformation as a thread local variable @@ -54,13 +52,16 @@ class SparkHadoopUtil extends Logging { def runAsSparkUser(func: () => Unit) { val user = Utils.getCurrentUserName() logDebug("running as user: " + user) - val ugi = UserGroupInformation.createRemoteUser(user) + val ugi = getAuthenticatedUgiForSparkUser(user) transferCredentials(UserGroupInformation.getCurrentUser(), ugi) ugi.doAs(new PrivilegedExceptionAction[Unit] { def run: Unit = func() }) } + // Log in as the Spark user, if necessary. + def loginAsSparkUser() {} + def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) { for (token <- source.getTokens()) { dest.addToken(token) @@ -121,6 +122,8 @@ class SparkHadoopUtil extends Logging { UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename) } + protected def getAuthenticatedUgiForSparkUser(user: String): UserGroupInformation + /** * Returns a function that can be called to find Hadoop FileSystem bytes read. If * getFSBytesReadOnThreadCallback is called from thread r at time t, the returned callback will @@ -205,7 +208,7 @@ class SparkHadoopUtil extends Logging { object SparkHadoopUtil { - private val hadoop = { + private lazy val hadoop = { val yarnMode = java.lang.Boolean.valueOf( System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE"))) if (yarnMode) { @@ -217,7 +220,7 @@ object SparkHadoopUtil { case e: Exception => throw new SparkException("Unable to load YARN support", e) } } else { - new SparkHadoopUtil + new StandaloneSparkHadoopUtil } } diff --git a/core/src/main/scala/org/apache/spark/deploy/StandaloneSparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/StandaloneSparkHadoopUtil.scala new file mode 100644 index 0000000000000..2611d8a84ca0a --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/StandaloneSparkHadoopUtil.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy + +import java.security.PrivilegedAction + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.security.UserGroupInformation +import org.apache.spark.{SparkConf, SparkEnv, Logging} +import sun.security.jgss.krb5.Krb5InitCredential + +import scala.sys.process.Process + +private[spark] class StandaloneSparkHadoopUtil extends SparkHadoopUtil { + + val hadoopSecurityAuthenticationKey = "spark.hadoop.security.authentication" + val principalKey = "spark.hadoop.dfs.namenode.kerberos.principal" + val keytabKey = "spark.hadoop.dfs.namenode.keytab.file" + val securityEnabledButKeyNotDefined = "Hadoop security was enabled, but %s" + + "was not set in the Spark configuration." + + // Lazily evaluated upon invoking loginAsSparkUserAndReturnUGI() + lazy val loggedInUser: UserGroupInformation = { + val authenticationType = SparkEnv.get.conf.get(hadoopSecurityAuthenticationKey, "simple") + if (authenticationType.equalsIgnoreCase("kerberos")) { + logInfo("Setting up kerberos to authenticate") + SparkEnv.get.conf.getOption(principalKey) match { + case Some(principal) => + SparkEnv.get.conf.getOption(keytabKey) match { + case Some(keytab) => + UserGroupInformation.setConfiguration(newConfiguration(SparkEnv.get.conf)) + loginUserFromKeytab(principal, keytab) + UserGroupInformation.getLoginUser() + case None => + val errorMsg = securityEnabledButKeyNotDefined.format(keytabKey) + logError(errorMsg) + throw new IllegalStateException(errorMsg) + } + case None => + val errorMsg = securityEnabledButKeyNotDefined.format(principalKey) + logError(errorMsg) + throw new IllegalStateException(errorMsg) + } + } else { + logInfo("Not using Kerberos to authenticate to Hadoop.") + UserGroupInformation.getCurrentUser() + } + } + override def getAuthenticatedUgiForSparkUser(user: String): UserGroupInformation = { + UserGroupInformation.createProxyUser(user, loginAsSparkUserAndReturnUGI()) + } + + override def loginAsSparkUser() { + loginAsSparkUserAndReturnUGI() + } + + private def loginAsSparkUserAndReturnUGI(): UserGroupInformation = loggedInUser + + override def newConfiguration(sparkConf: SparkConf): Configuration = { + val originalConf = super.newConfiguration(sparkConf) + originalConf.set("hadoop.security.authentication", + sparkConf.get(hadoopSecurityAuthenticationKey, "simple")) + originalConf + } + +} diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index dd19e4947db1e..c2acc972cda90 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -125,39 +125,39 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { SignalLogger.register(log) - SparkHadoopUtil.get.runAsSparkUser { () => - // Debug code - Utils.checkHost(hostname) - - // Bootstrap to fetch the driver's Spark properties. - val executorConf = new SparkConf - val port = executorConf.getInt("spark.executor.port", 0) - val (fetcher, _) = AkkaUtils.createActorSystem( - "driverPropsFetcher", - hostname, - port, - executorConf, - new SecurityManager(executorConf)) - val driver = fetcher.actorSelection(driverUrl) - val timeout = AkkaUtils.askTimeout(executorConf) - val fut = Patterns.ask(driver, RetrieveSparkProps, timeout) - val props = Await.result(fut, timeout).asInstanceOf[Seq[(String, String)]] ++ - Seq[(String, String)](("spark.app.id", appId)) - fetcher.shutdown() - - // Create SparkEnv using properties we fetched from the driver. - val driverConf = new SparkConf() - for ((key, value) <- props) { - // this is required for SSL in standalone mode - if (SparkConf.isExecutorStartupConf(key)) { - driverConf.setIfMissing(key, value) - } else { - driverConf.set(key, value) - } + // Debug code + Utils.checkHost(hostname) + + // Bootstrap to fetch the driver's Spark properties. + val executorConf = new SparkConf + val port = executorConf.getInt("spark.executor.port", 0) + val (fetcher, _) = AkkaUtils.createActorSystem( + "driverPropsFetcher", + hostname, + port, + executorConf, + new SecurityManager(executorConf)) + val driver = fetcher.actorSelection(driverUrl) + val timeout = AkkaUtils.askTimeout(executorConf) + val fut = Patterns.ask(driver, RetrieveSparkProps, timeout) + val props = Await.result(fut, timeout).asInstanceOf[Seq[(String, String)]] ++ + Seq[(String, String)](("spark.app.id", appId)) + fetcher.shutdown() + + // Create SparkEnv using properties we fetched from the driver. + val driverConf = new SparkConf() + for ((key, value) <- props) { + // this is required for SSL in standalone mode + if (SparkConf.isExecutorStartupConf(key)) { + driverConf.setIfMissing(key, value) + } else { + driverConf.set(key, value) } - val env = SparkEnv.createExecutorEnv( - driverConf, executorId, hostname, port, cores, isLocal = false) + } + val env = SparkEnv.createExecutorEnv( + driverConf, executorId, hostname, port, cores, isLocal = false) + SparkHadoopUtil.get.runAsSparkUser { () => // SparkEnv sets spark.driver.port so it shouldn't be 0 anymore. val boundPort = env.conf.getInt("spark.executor.port", 0) assert(boundPort != 0) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 486e86ce1bb19..a4abaff1443a6 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -196,6 +196,7 @@ class HadoopRDD[K, V]( val jobConf = getJobConf() // add the credentials here as this can be called before SparkContext initialized SparkHadoopUtil.get.addCredentials(jobConf) + SparkHadoopUtil.get.loginAsSparkUser() val inputFormat = getInputFormat(jobConf) if (inputFormat.isInstanceOf[Configurable]) { inputFormat.asInstanceOf[Configurable].setConf(jobConf) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 955b42c3baaa1..76fe8e436f4eb 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -1043,6 +1043,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) throw new SparkException("Output value class not set") } SparkHadoopUtil.get.addCredentials(hadoopConf) + SparkHadoopUtil.get.loginAsSparkUser() logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " + valueClass.getSimpleName + ")") diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 146b2c0f1a302..23946e4ba30e2 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -42,6 +42,8 @@ import org.apache.spark.util.Utils * Contains util methods to interact with Hadoop from spark. */ class YarnSparkHadoopUtil extends SparkHadoopUtil { + val sparkConf = new SparkConf() + UserGroupInformation.setConfiguration(newConfiguration(sparkConf)) override def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) { dest.addCredentials(source.getCredentials()) @@ -71,6 +73,12 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { UserGroupInformation.getCurrentUser().addCredentials(creds) } + override def getAuthenticatedUgiForSparkUser(user: String): UserGroupInformation = { + val ugi = UserGroupInformation.createRemoteUser(user) + transferCredentials(UserGroupInformation.getCurrentUser, ugi) + ugi + } + override def addSecretKeyToUserCredentials(key: String, secret: String) { val creds = new Credentials() creds.addSecretKey(new Text(key), secret.getBytes("utf-8"))