Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.

Commit 5cb15c3

Browse files
Marcelo VanzinMichael Gummelt
authored andcommitted
[SPARK-13478][YARN] Use real user when fetching delegation tokens.
The Hive client library is not smart enough to notice that the current user is a proxy user; so when using a proxy user, it fails to fetch delegation tokens from the metastore because of a missing kerberos TGT for the current user. To fix it, just run the code that fetches the delegation token as the real logged in user. Tested on a kerberos cluster both submitting normally and with a proxy user; Hive and HBase tokens are retrieved correctly in both cases. Author: Marcelo Vanzin <vanzincloudera.com> Closes apache#11358 from vanzin/SPARK-13478. (cherry picked from commit c7fccb5) Author: Marcelo Vanzin <[email protected]> Closes apache#16665 from vanzin/SPARK-13478_1.6.
1 parent 1055bb2 commit 5cb15c3

File tree

3 files changed

+42
-12
lines changed

3 files changed

+42
-12
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,10 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
255255
"either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.")
256256
}
257257
}
258+
259+
if (proxyUser != null && principal != null) {
260+
SparkSubmit.printErrorAndExit("Only one of --proxy-user or --principal can be provided.")
261+
}
258262
}
259263

260264
private def validateKillArguments(): Unit = {
@@ -520,6 +524,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
520524
| --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).
521525
|
522526
| --proxy-user NAME User to impersonate when submitting the application.
527+
| This argument does not work with --principal / --keytab.
523528
|
524529
| --help, -h Show this help message and exit
525530
| --verbose, -v Print additional debug output

yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
package org.apache.spark.deploy.yarn
1919

2020
import java.io.File
21+
import java.lang.reflect.UndeclaredThrowableException
2122
import java.nio.charset.StandardCharsets.UTF_8
23+
import java.security.PrivilegedExceptionAction
2224
import java.util.regex.Matcher
2325
import java.util.regex.Pattern
2426

@@ -126,7 +128,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
126128
*/
127129
def obtainTokenForHiveMetastore(conf: Configuration): Option[Token[DelegationTokenIdentifier]] = {
128130
try {
129-
obtainTokenForHiveMetastoreInner(conf, UserGroupInformation.getCurrentUser().getUserName)
131+
obtainTokenForHiveMetastoreInner(conf)
130132
} catch {
131133
case e: ClassNotFoundException =>
132134
logInfo(s"Hive class not found $e")
@@ -141,8 +143,8 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
141143
* @param username the username of the principal requesting the delegating token.
142144
* @return a delegation token
143145
*/
144-
private[yarn] def obtainTokenForHiveMetastoreInner(conf: Configuration,
145-
username: String): Option[Token[DelegationTokenIdentifier]] = {
146+
private[yarn] def obtainTokenForHiveMetastoreInner(conf: Configuration):
147+
Option[Token[DelegationTokenIdentifier]] = {
146148
val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
147149

148150
// the hive configuration class is a subclass of Hadoop Configuration, so can be cast down
@@ -157,11 +159,12 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
157159

158160
// Check for local metastore
159161
if (metastoreUri.nonEmpty) {
160-
require(username.nonEmpty, "Username undefined")
161162
val principalKey = "hive.metastore.kerberos.principal"
162163
val principal = hiveConf.getTrimmed(principalKey, "")
163164
require(principal.nonEmpty, "Hive principal $principalKey undefined")
164-
logDebug(s"Getting Hive delegation token for $username against $principal at $metastoreUri")
165+
val currentUser = UserGroupInformation.getCurrentUser()
166+
logDebug(s"Getting Hive delegation token for ${currentUser.getUserName()} against " +
167+
s"$principal at $metastoreUri")
165168
val hiveClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.ql.metadata.Hive")
166169
val closeCurrent = hiveClass.getMethod("closeCurrent")
167170
try {
@@ -170,12 +173,14 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
170173
classOf[String], classOf[String])
171174
val getHive = hiveClass.getMethod("get", hiveConfClass)
172175

173-
// invoke
174-
val hive = getHive.invoke(null, hiveConf)
175-
val tokenStr = getDelegationToken.invoke(hive, username, principal).asInstanceOf[String]
176-
val hive2Token = new Token[DelegationTokenIdentifier]()
177-
hive2Token.decodeFromUrlString(tokenStr)
178-
Some(hive2Token)
176+
doAsRealUser {
177+
val hive = getHive.invoke(null, hiveConf)
178+
val tokenStr = getDelegationToken.invoke(hive, currentUser.getUserName(), principal)
179+
.asInstanceOf[String]
180+
val hive2Token = new Token[DelegationTokenIdentifier]()
181+
hive2Token.decodeFromUrlString(tokenStr)
182+
Some(hive2Token)
183+
}
179184
} finally {
180185
Utils.tryLogNonFatalError {
181186
closeCurrent.invoke(null)
@@ -186,6 +191,26 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
186191
None
187192
}
188193
}
194+
195+
/**
196+
* Run some code as the real logged in user (which may differ from the current user, for
197+
* example, when using proxying).
198+
*/
199+
private def doAsRealUser[T](fn: => T): T = {
200+
val currentUser = UserGroupInformation.getCurrentUser()
201+
val realUser = Option(currentUser.getRealUser()).getOrElse(currentUser)
202+
203+
// For some reason the Scala-generated anonymous class ends up causing an
204+
// UndeclaredThrowableException, even if you annotate the method with @throws.
205+
try {
206+
realUser.doAs(new PrivilegedExceptionAction[T]() {
207+
override def run(): T = fn
208+
})
209+
} catch {
210+
case e: UndeclaredThrowableException => throw Option(e.getCause()).getOrElse(e)
211+
}
212+
}
213+
189214
}
190215

191216
object YarnSparkHadoopUtil {

yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
257257
hadoopConf.set("hive.metastore.uris", "http://localhost:0")
258258
val util = new YarnSparkHadoopUtil
259259
assertNestedHiveException(intercept[InvocationTargetException] {
260-
util.obtainTokenForHiveMetastoreInner(hadoopConf, "alice")
260+
util.obtainTokenForHiveMetastoreInner(hadoopConf)
261261
})
262262
// expect exception trapping code to unwind this hive-side exception
263263
assertNestedHiveException(intercept[InvocationTargetException] {

0 commit comments

Comments
 (0)