From 1e89e78d18b87acdeb15afe1ccaa92887ee15b74 Mon Sep 17 00:00:00 2001 From: Yinan Li Date: Fri, 28 Mar 2014 14:32:33 -0700 Subject: [PATCH 1/2] Added support for accessing secured HDFS Also changed the way task run so tasks always run under the user who submit the tasks. This replaces the old approach of using a environment variable SPARK_USER to specify the user, which is far less flexible. This eases security management since users no longer need to open access to HDFS files under their home directory to the user who starts the Spark cluster. Signed-off-by: Yinan Li --- .../scala/org/apache/spark/SparkContext.scala | 5 + .../apache/spark/deploy/SparkHadoopUtil.scala | 171 +++++++++++++++++- .../org/apache/spark/executor/Executor.scala | 19 +- .../org/apache/spark/scheduler/Task.scala | 13 +- .../spark/scheduler/TaskSetManager.scala | 3 +- docs/configuration.md | 35 ++++ 6 files changed, 237 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index b23accbbb9410..7a1d5ce3dc086 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -221,6 +221,11 @@ class SparkContext( } executorEnvs("SPARK_USER") = sparkUser + // Need to do security authentication when Hadoop security is turned on + if (SparkHadoopUtil.get.isSecurityEnabled()) { + SparkHadoopUtil.get.doUserAuthentication(this) + } + // Create and start the scheduler private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master) taskScheduler.start() diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 9bdbfb33bf54f..84468a0d6538e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -24,17 +24,25 @@ import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.security.Credentials import org.apache.hadoop.security.UserGroupInformation -import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark._ import scala.collection.JavaConversions._ +import java.util.{Collection, TimerTask, Timer} +import java.io.{File, IOException} +import org.apache.hadoop.fs.{FileSystem, Path} +import java.net.URI +import org.apache.hadoop.security.token.{TokenIdentifier, Token} +import org.apache.hadoop.fs.permission.FsPermission /** * Contains util methods to interact with Hadoop from Spark. */ -class SparkHadoopUtil { +class SparkHadoopUtil extends Logging { val conf: Configuration = newConfiguration() UserGroupInformation.setConfiguration(conf) + val sparkConf = new SparkConf() + def runAsUser(user: String)(func: () => Unit) { if (user != SparkContext.SPARK_UNKNOWN_USER) { val ugi = UserGroupInformation.createRemoteUser(user) @@ -75,6 +83,165 @@ class SparkHadoopUtil { def getSecretKeyFromUserCredentials(key: String): Array[Byte] = { null } + /** + * Return whether Hadoop security is enabled or not. + * + * @return Whether Hadoop security is enabled or not + */ + def isSecurityEnabled(): Boolean = { + UserGroupInformation.isSecurityEnabled + } + + /** + * Do user authentication when Hadoop security is turned on. Used by the driver. + * + * @param sc Spark context + */ + def doUserAuthentication(sc: SparkContext) { + getAuthenticationType match { + case "keytab" => { + // Authentication through a Kerberos keytab file. Necessary for + // long-running services like Shark/Spark Streaming. + scheduleKerberosRenewTask(sc) + } + case _ => { + // No authentication needed. Assuming authentication is already done + // before Spark is launched, e.g., the user has authenticated with + // Kerberos through kinit already. + // Renew a Hadoop delegation token and store the token into a file. + // Add the token file so it gets downloaded by every slave nodes. + sc.addFile(initDelegationToken().toString) + } + } + } + + /** + * Get the user whom the task belongs to. + * + * @param userName Name of the user whom the task belongs to + * @return The user whom the task belongs to + */ + def getTaskUser(userName: String): UserGroupInformation = { + val ugi = UserGroupInformation.createRemoteUser(userName) + // Change the authentication method to Kerberos + ugi.setAuthenticationMethod( + UserGroupInformation.AuthenticationMethod.KERBEROS) + // Get and add Hadoop delegation tokens for the user + val iter = getDelegationTokens().iterator() + while (iter.hasNext) { + ugi.addToken(iter.next()) + } + + ugi + } + + /** + * Get the type of Hadoop security authentication. + * + * @return Type of Hadoop security authentication + */ + private def getAuthenticationType: String = { + sparkConf.get("spark.hadoop.security.authentication") + } + + /** + * Schedule a timer task for automatically renewing Kerberos credential. + * + * @param sc Spark context + */ + private def scheduleKerberosRenewTask(sc: SparkContext) { + val kerberosRenewTimer = new Timer() + val kerberosRenewTimerTask = new TimerTask { + def run() { + try { + kerberosLoginFromKeytab + // Renew a Hadoop delegation token and store the token into a file. + // Add the token file so it gets downloaded by every slave nodes. + sc.addFile(initDelegationToken().toString) + } catch { + case ioe: IOException => { + logError("Failed to login from Kerberos keytab", ioe) + } + } + } + } + + val interval = sparkConf.getLong( + "spark.hadoop.security.kerberos.renewInterval", 21600000) + kerberosRenewTimer.schedule(kerberosRenewTimerTask, 0, interval) + logInfo("Scheduled timer task for renewing Kerberos credential") + } + + /** + * Log a user in from a keytab file. Loads user credential from a keytab + * file and logs the user in. + */ + private def kerberosLoginFromKeytab() { + val defaultKeytab = System.getProperty("user.home") + Path.SEPARATOR + + System.getProperty("user.name") + ".keytab" + val keytab = sparkConf.get( + "spark.hadoop.security.kerberos.keytab", defaultKeytab) + val principal = sparkConf.get( + "spark.hadoop.security.kerberos.principal", System.getProperty("user.name")) + + // Keytab file not found + if (!new File(keytab).exists()) { + throw new IOException("Keytab file %s not found".format(keytab)) + } + + UserGroupInformation.loginUserFromKeytab(principal, keytab) + } + + /** + * Initialize a Hadoop delegation token, store the token into a file, + * and add it to the SparkContext so executors can get it. + * + * @return URI of the token file + */ + private def initDelegationToken(): URI = { + val localFS = FileSystem.getLocal(conf) + // Store the token file under user's home directory + val tokenFile = new Path(localFS.getHomeDirectory, sparkConf.get( + "spark.hadoop.security.token.name", "spark.token")) + if (localFS.exists(tokenFile)) { + localFS.delete(tokenFile, false) + } + + // Get a new token and write it to the given token file + val currentUser = UserGroupInformation.getCurrentUser + val fs = FileSystem.get(conf) + val token: Token[_ <: TokenIdentifier] = + fs.getDelegationToken(currentUser.getShortUserName) + .asInstanceOf[Token[_ <: TokenIdentifier]] + val cred = new Credentials() + cred.addToken(token.getService, token) + cred.writeTokenStorageFile(tokenFile, conf) + // Make sure the token file is read-only to the owner + localFS.setPermission(tokenFile, FsPermission.createImmutable(0400)) + + logInfo("Stored Hadoop delegation token for user %s to file %s".format( + currentUser.getShortUserName, tokenFile.toUri.toString)) + tokenFile.toUri + } + + /** + * Get delegation tokens from the token file added through SparkContext.addFile(). + * + * @return Collection of delegation tokens + */ + private def getDelegationTokens(): Collection[Token[_ <: TokenIdentifier]] = { + // Get the token file added through SparkContext.addFile() + val source = new File(SparkFiles.get(sparkConf.get( + "spark.hadoop.security.token.name", "spark.token"))) + if (source.exists()) { + val sourcePath = new Path("file://" + source.getAbsolutePath) + // Read credentials from the token file + Credentials.readTokenStorageFile(sourcePath, conf).getAllTokens + } else { + throw new IOException( + "Token file %s does not exist".format(source.getAbsolutePath)) + } + } } object SparkHadoopUtil { diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index aecb069e4202b..5def5edae123a 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -30,6 +30,7 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler._ import org.apache.spark.storage.{StorageLevel, TaskResultBlockId} import org.apache.spark.util.{AkkaUtils, Utils} +import java.security.PrivilegedExceptionAction /** * Spark executor used with Mesos, YARN, and the standalone scheduler. @@ -173,7 +174,7 @@ private[spark] class Executor( } } - override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () => + override def run() { val startTime = System.currentTimeMillis() SparkEnv.set(env) Thread.currentThread.setContextClassLoader(replClassLoader) @@ -188,7 +189,7 @@ private[spark] class Executor( try { SparkEnv.set(env) Accumulators.clear() - val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask) + val (userName, taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask) updateDependencies(taskFiles, taskJars) task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader) @@ -208,7 +209,19 @@ private[spark] class Executor( // Run the actual task and measure its runtime. taskStart = System.currentTimeMillis() - val value = task.run(taskId.toInt) + var value: Any = None + if (SparkHadoopUtil.get.isSecurityEnabled()) { + // Get the user whom the task belongs to + val ugi = SparkHadoopUtil.get.getTaskUser(userName) + // Run the task as the user whom the task belongs to + ugi.doAs(new PrivilegedExceptionAction[Unit] { + def run(): Unit = { + value = task.run(taskId.toInt) + } + }) + } else { + value = task.run(taskId.toInt) + } val taskFinish = System.currentTimeMillis() // If the task has been killed, let's fail it. diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index b85b4a50cd93a..fbab1773bd921 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -98,6 +98,7 @@ private[spark] object Task { * Serialize a task and the current app dependencies (files and JARs added to the SparkContext) */ def serializeWithDependencies( + userName: String, task: Task[_], currentFiles: HashMap[String, Long], currentJars: HashMap[String, Long], @@ -107,6 +108,9 @@ private[spark] object Task { val out = new FastByteArrayOutputStream(4096) val dataOut = new DataOutputStream(out) + // Write the name of the user launching the task + dataOut.writeUTF(userName) + // Write currentFiles dataOut.writeInt(currentFiles.size) for ((name, timestamp) <- currentFiles) { @@ -134,14 +138,17 @@ private[spark] object Task { * and return the task itself as a serialized ByteBuffer. The caller can then update its * ClassLoaders and deserialize the task. * - * @return (taskFiles, taskJars, taskBytes) + * @return (userName, taskFiles, taskJars, taskBytes) */ def deserializeWithDependencies(serializedTask: ByteBuffer) - : (HashMap[String, Long], HashMap[String, Long], ByteBuffer) = { + : (String, HashMap[String, Long], HashMap[String, Long], ByteBuffer) = { val in = new ByteBufferInputStream(serializedTask) val dataIn = new DataInputStream(in) + // Read the name of the user launching the task + val userName = dataIn.readUTF() + // Read task's files val taskFiles = new HashMap[String, Long]() val numFiles = dataIn.readInt() @@ -158,6 +165,6 @@ private[spark] object Task { // Create a sub-buffer for the rest of the data, which is the serialized Task object val subBuffer = serializedTask.slice() // ByteBufferInputStream will have read just up to task - (taskFiles, taskJars, subBuffer) + (userName, taskFiles, taskJars, subBuffer) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 86d2050a03f18..6ba2207bd8080 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -410,10 +410,11 @@ private[spark] class TaskSetManager( lastLaunchTime = curTime // Serialize and return the task val startTime = clock.getTime() + val userName = System.getProperty("user.name") // We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here // we assume the task can be serialized without exceptions. val serializedTask = Task.serializeWithDependencies( - task, sched.sc.addedFiles, sched.sc.addedJars, ser) + userName, task, sched.sc.addedFiles, sched.sc.addedJars, ser) val timeTaken = clock.getTime() - startTime addRunningTask(taskId) logInfo("Serialized task %s:%d as %d bytes in %d ms".format( diff --git a/docs/configuration.md b/docs/configuration.md index 1ff0150567255..b6f30d8d6dc5c 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -586,6 +586,41 @@ Apart from these, the following properties are also available, and may be useful Number of cores to allocate for each task. + + spark.hadoop.security.authentication + (none) + + Method used for authenticating user when Hadoop security is turned on. A Hadoop delegation token can be obtained only after the user is authenticated. + + + + spark.hadoop.security.kerberos.renewInterval + 21600000 + + Interval for automatically renewing the Kerberos credential when Hadoop security is turned on and Kerberos is the method for user authentication. + + + + spark.hadoop.security.kerberos.keytab + {Current login user name}.keytab under the home directory of the current login user + + Local path of the Kerberos keytab file. The keytab usually is located on the gateway host to the Spark cluster. + + + + spark.hadoop.security.kerberos.principal + Current login user name + + Principal used for Kerberos login. + + + + spark.hadoop.security.token.name + spark.token + + Name of the file storing the Hadoop delegation token obtained by the driver. + + ## Viewing Spark Properties From 3f5d7d6924c294bb6fc3f21afba425e9ed5b47e3 Mon Sep 17 00:00:00 2001 From: Yinan Li Date: Fri, 28 Mar 2014 14:59:19 -0700 Subject: [PATCH 2/2] Fixed style error Signed-off-by: Yinan Li --- core/src/main/scala/org/apache/spark/executor/Executor.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 5def5edae123a..83e6dc3e4d3b1 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -189,7 +189,8 @@ private[spark] class Executor( try { SparkEnv.set(env) Accumulators.clear() - val (userName, taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask) + val (userName, taskFiles, taskJars, taskBytes) = + Task.deserializeWithDependencies(serializedTask) updateDependencies(taskFiles, taskJars) task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)