diff --git a/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala index 617954cb98..cd60dd60a2 100644 --- a/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala @@ -16,14 +16,24 @@ */ package spark.deploy + import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.security.token.{Token, TokenIdentifier} + +import java.io.IOException +import java.security.PrivilegedExceptionAction /** * Contains util methods to interact with Hadoop from spark. */ object SparkHadoopUtil { + val HDFS_TOKEN_KEY = "SPARK_HDFS_TOKEN" + val conf = newConfiguration() + UserGroupInformation.setConfiguration(conf) def getUserNameFromEnvironment(): String = { // defaulting to -D ... @@ -31,9 +41,38 @@ object SparkHadoopUtil { } def runAsUser(func: (Product) => Unit, args: Product) { + runAsUser(func, args, getUserNameFromEnvironment) + } + + def runAsUser(func: (Product) => Unit, args: Product, user: String) { + val ugi = UserGroupInformation.createRemoteUser(user) + if (UserGroupInformation.isSecurityEnabled) { + Option(System.getenv(HDFS_TOKEN_KEY)) match { + case Some(s) => + ugi.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.TOKEN) + val token = new Token[TokenIdentifier]() + token.decodeFromUrlString(s) + ugi.addToken(token) + case None => throw new IOException("Failed to get token in security environment") + } + } - // Add support, if exists - for now, simply run func ! - func(args) + ugi.doAs(new PrivilegedExceptionAction[Unit] { + def run: Unit = { + func(args) + } + }) + } + + def createSerializedToken() :Option[String] = { + if (UserGroupInformation.isSecurityEnabled) { + val fs = FileSystem.get(conf) + val user = UserGroupInformation.getCurrentUser.getShortUserName + Option(fs.getDelegationToken(user).asInstanceOf[Token[_ <: TokenIdentifier]]) + .map(_.encodeToUrlString()) + } else { + None + } } // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems @@ -43,5 +82,4 @@ object SparkHadoopUtil { def addCredentials(conf: JobConf) {} def isYarnMode(): Boolean = { false } - } diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala index 6122fdced0..b614699f98 100644 --- a/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala @@ -48,6 +48,8 @@ object SparkHadoopUtil { func(args) } + def createSerializedToken(): Option[String] = None + // Note that all params which start with SPARK are propagated all the way through, so if in yarn mode, this MUST be set to true. def isYarnMode(): Boolean = { val yarnMode = System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")) diff --git a/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala index 617954cb98..3e0fc038ec 100644 --- a/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala @@ -16,14 +16,23 @@ */ package spark.deploy + import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import java.io.IOException +import java.security.PrivilegedExceptionAction /** * Contains util methods to interact with Hadoop from spark. */ object SparkHadoopUtil { + val HDFS_TOKEN_KEY = "SPARK_HDFS_TOKEN" + val conf = newConfiguration() + UserGroupInformation.setConfiguration(conf) def getUserNameFromEnvironment(): String = { // defaulting to -D ... @@ -31,9 +40,39 @@ object SparkHadoopUtil { } def runAsUser(func: (Product) => Unit, args: Product) { + runAsUser(func, args, getUserNameFromEnvironment()) + } + + def runAsUser(func: (Product) => Unit, args: Product, user: String) { + val ugi = UserGroupInformation.createRemoteUser(user) + if (UserGroupInformation.isSecurityEnabled) { + Option(System.getenv(HDFS_TOKEN_KEY)) match { + case Some(s) => + ugi.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.TOKEN) + val token = new Token[TokenIdentifier]() + token.decodeFromUrlString(s) + ugi.addToken(token) + case None => throw new IOException("Failed to get token in security environment") + + } + } - // Add support, if exists - for now, simply run func ! - func(args) + ugi.doAs(new PrivilegedExceptionAction[Unit] { + def run: Unit = { + func(args) + } + }) + } + + def createSerializedToken(): Option[String] = { + if (UserGroupInformation.isSecurityEnabled) { + val fs = FileSystem.get(conf) + val user = UserGroupInformation.getCurrentUser.getShortUserName + Option(fs.getDelegationToken(user).asInstanceOf[Token[_ <: TokenIdentifier]]) + .map(_.encodeToUrlString()) + } else { + None + } } // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems @@ -43,5 +82,4 @@ object SparkHadoopUtil { def addCredentials(conf: JobConf) {} def isYarnMode(): Boolean = { false } - } diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 77cb0ee0cd..01be105b08 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -53,7 +53,6 @@ import org.apache.hadoop.mapred.TextInputFormat import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.{Job => NewHadoopJob} import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} -import org.apache.hadoop.security.UserGroupInformation import org.apache.mesos.MesosNativeLibrary @@ -145,6 +144,11 @@ class SparkContext( executorEnvs ++= environment } + // Add token to environment variables to pass to executors for security Hadoop access + SparkHadoopUtil.createSerializedToken() map { e => + executorEnvs(SparkHadoopUtil.HDFS_TOKEN_KEY) = e + } + // Create and start the scheduler private var taskScheduler: TaskScheduler = { // Regular expression used for local[N] master format diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala index 345dfe879c..34bc021b02 100644 --- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala @@ -91,6 +91,7 @@ private[spark] class ExecutorRunner( case "{{EXECUTOR_ID}}" => execId.toString case "{{HOSTNAME}}" => Utils.parseHostPort(hostPort)._1 case "{{CORES}}" => cores.toString + case "{{USER}}" => appDesc.user case other => other } diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala index e47fe50021..7be0b8ebde 100644 --- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala +++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala @@ -33,7 +33,8 @@ private[spark] class StandaloneExecutorBackend( driverUrl: String, executorId: String, hostPort: String, - cores: Int) + cores: Int, + user: String) extends Actor with ExecutorBackend with Logging { @@ -81,20 +82,21 @@ private[spark] class StandaloneExecutorBackend( } private[spark] object StandaloneExecutorBackend { - def run(driverUrl: String, executorId: String, hostname: String, cores: Int) { - SparkHadoopUtil.runAsUser(run0, Tuple4[Any, Any, Any, Any] (driverUrl, executorId, hostname, cores)) + def run(driverUrl: String, executorId: String, hostname: String, cores: Int, user: String) { + SparkHadoopUtil.runAsUser(run0, Tuple5[Any, Any, Any, Any, Any] (driverUrl, executorId, hostname, cores, user), user) } // This will be run 'as' the user def run0(args: Product) { - assert(4 == args.productArity) + assert(5 == args.productArity) runImpl(args.productElement(0).asInstanceOf[String], args.productElement(1).asInstanceOf[String], args.productElement(2).asInstanceOf[String], - args.productElement(3).asInstanceOf[Int]) + args.productElement(3).asInstanceOf[Int], + args.productElement(4).asInstanceOf[String]) } - private def runImpl(driverUrl: String, executorId: String, hostname: String, cores: Int) { + private def runImpl(driverUrl: String, executorId: String, hostname: String, cores: Int, user: String) { // Debug code Utils.checkHost(hostname) @@ -105,17 +107,17 @@ private[spark] object StandaloneExecutorBackend { val sparkHostPort = hostname + ":" + boundPort System.setProperty("spark.hostPort", sparkHostPort) val actor = actorSystem.actorOf( - Props(new StandaloneExecutorBackend(driverUrl, executorId, sparkHostPort, cores)), + Props(new StandaloneExecutorBackend(driverUrl, executorId, sparkHostPort, cores, user)), name = "Executor") actorSystem.awaitTermination() } def main(args: Array[String]) { - if (args.length < 4) { + if (args.length < 5) { //the reason we allow the last frameworkId argument is to make it easy to kill rogue executors - System.err.println("Usage: StandaloneExecutorBackend []") + System.err.println("Usage: StandaloneExecutorBackend []") System.exit(1) } - run(args(0), args(1), args(2), args(3).toInt) + run(args(0), args(1), args(2), args(3).toInt, args(4)) } } diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 55d6c0a47e..05b46ad185 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -44,7 +44,7 @@ private[spark] class SparkDeploySchedulerBackend( val driverUrl = "akka://spark@%s:%s/user/%s".format( System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"), StandaloneSchedulerBackend.ACTOR_NAME) - val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}") + val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{USER}}") val command = Command("spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs) val sparkHome = sc.getSparkHome().getOrElse( throw new IllegalArgumentException("must supply spark home for spark standalone"))