Skip to content

Commit fde98a7

Browse files
sryzaJames Z.M. Gao
authored andcommitted
SPARK-1051. On YARN, executors don't doAs submitting user
This reopens https://github.com/apache/incubator-spark/pull/538 against the new repo Author: Sandy Ryza <[email protected]> Closes apache#29 from sryza/sandy-spark-1051 and squashes the following commits: 708ce49 [Sandy Ryza] SPARK-1051. doAs submitting user in YARN
1 parent 856267c commit fde98a7

File tree

5 files changed

+25
-10
lines changed

5 files changed

+25
-10
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import org.apache.hadoop.security.UserGroupInformation
2525

2626
import org.apache.spark.{SparkContext, SparkException}
2727

28+
import scala.collection.JavaConversions._
29+
2830
/**
2931
* Contains util methods to interact with Hadoop from Spark.
3032
*/
@@ -33,15 +35,9 @@ class SparkHadoopUtil {
3335
UserGroupInformation.setConfiguration(conf)
3436

3537
def runAsUser(user: String)(func: () => Unit) {
36-
// if we are already running as the user intended there is no reason to do the doAs. It
37-
// will actually break secure HDFS access as it doesn't fill in the credentials. Also if
38-
// the user is UNKNOWN then we shouldn't be creating a remote unknown user
39-
// (this is actually the path spark on yarn takes) since SPARK_USER is initialized only
40-
// in SparkContext.
41-
val currentUser = Option(System.getProperty("user.name")).
42-
getOrElse(SparkContext.SPARK_UNKNOWN_USER)
43-
if (user != SparkContext.SPARK_UNKNOWN_USER && currentUser != user) {
38+
if (user != SparkContext.SPARK_UNKNOWN_USER) {
4439
val ugi = UserGroupInformation.createRemoteUser(user)
40+
transferCredentials(UserGroupInformation.getCurrentUser(), ugi)
4541
ugi.doAs(new PrivilegedExceptionAction[Unit] {
4642
def run: Unit = func()
4743
})
@@ -50,6 +46,12 @@ class SparkHadoopUtil {
5046
}
5147
}
5248

49+
def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) {
50+
for (token <- source.getTokens()) {
51+
dest.addToken(token)
52+
}
53+
}
54+
5355
/**
5456
* Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop
5557
* subsystems.

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
3737
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
3838

3939
import org.apache.spark.{SparkConf, SparkContext, Logging}
40+
import org.apache.spark.deploy.SparkHadoopUtil
4041
import org.apache.spark.util.Utils
4142

4243
class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
@@ -67,6 +68,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
6768

6869
private var registered = false
6970

71+
private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
72+
SparkContext.SPARK_UNKNOWN_USER)
73+
7074
def run() {
7175
// Setup the directories so things go to yarn approved directories rather
7276
// then user specified and /tmp.
@@ -180,7 +184,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
180184
false /* initialize */ ,
181185
Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
182186
val t = new Thread {
183-
override def run() {
187+
override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
184188
var successed = false
185189
try {
186190
// Copy

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,7 @@ trait ClientBase extends Logging {
272272
ClientBase.populateClasspath(yarnConf, sparkConf, log4jConfLocalRes != null, env)
273273
env("SPARK_YARN_MODE") = "true"
274274
env("SPARK_YARN_STAGING_DIR") = stagingDir
275+
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
275276

276277
// Set the environment variables to be passed on to the Workers.
277278
distCacheMgr.setDistFilesEnv(env)

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ import org.apache.hadoop.conf.Configuration
2828
*/
2929
class YarnSparkHadoopUtil extends SparkHadoopUtil {
3030

31+
override def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) {
32+
dest.addCredentials(source.getCredentials())
33+
}
34+
3135
// Note that all params which start with SPARK are propagated all the way through, so if in yarn mode, this MUST be set to true.
3236
override def isYarnMode(): Boolean = { true }
3337

yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
3939
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
4040

4141
import org.apache.spark.{SparkConf, SparkContext, Logging}
42+
import org.apache.spark.deploy.SparkHadoopUtil
4243
import org.apache.spark.util.Utils
4344

4445

@@ -68,6 +69,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
6869
math.max(args.numWorkers * 2, 3))
6970

7071
private var registered = false
72+
73+
private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
74+
SparkContext.SPARK_UNKNOWN_USER)
7175

7276
def run() {
7377
// Setup the directories so things go to YARN approved directories rather
@@ -152,7 +156,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
152156
false /* initialize */ ,
153157
Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
154158
val t = new Thread {
155-
override def run() {
159+
override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
156160
var successed = false
157161
try {
158162
// Copy

0 commit comments

Comments
 (0)