Skip to content

Commit dc2714d

Browse files
Marcelo Vanzinjerryshao
authored andcommitted
[SPARK-22290][CORE] Avoid creating Hive delegation tokens when not necessary.
Hive delegation tokens are only needed when the Spark driver has no access to the kerberos TGT. That happens only in two situations: - when using a proxy user - when using cluster mode without a keytab This change modifies the Hive provider so that it only generates delegation tokens in those situations, and tweaks the YARN AM so that it makes the proper user visible to the Hive code when running with keytabs, so that the TGT can be used instead of a delegation token. The effect of this change is that now it's possible to initialize multiple, non-concurrent SparkContext instances in the same JVM. Before, the second invocation would fail to fetch a new Hive delegation token, which then could make the second (or third or...) application fail once the token expired. With this change, the TGT will be used to authenticate to the HMS instead. This change also avoids polluting the current logged in user's credentials when launching applications. The credentials are copied only when running applications as a proxy user. This makes it possible to implement SPARK-11035 later, where multiple threads might be launching applications, and each app should have its own set of credentials. Tested by verifying HDFS and Hive access in following scenarios: - client and cluster mode - client and cluster mode with proxy user - client and cluster mode with principal / keytab - long-running cluster app with principal / keytab - pyspark app that creates (and stops) multiple SparkContext instances through its lifetime Author: Marcelo Vanzin <[email protected]> Closes #19509 from vanzin/SPARK-22290.
1 parent 6f1d0de commit dc2714d

File tree

11 files changed

+110
-32
lines changed

11 files changed

+110
-32
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,17 @@ class SparkHadoopUtil extends Logging {
6161
* do a FileSystem.closeAllForUGI in order to avoid leaking Filesystems
6262
*/
6363
def runAsSparkUser(func: () => Unit) {
64+
createSparkUser().doAs(new PrivilegedExceptionAction[Unit] {
65+
def run: Unit = func()
66+
})
67+
}
68+
69+
def createSparkUser(): UserGroupInformation = {
6470
val user = Utils.getCurrentUserName()
65-
logDebug("running as user: " + user)
71+
logDebug("creating UGI for user: " + user)
6672
val ugi = UserGroupInformation.createRemoteUser(user)
6773
transferCredentials(UserGroupInformation.getCurrentUser(), ugi)
68-
ugi.doAs(new PrivilegedExceptionAction[Unit] {
69-
def run: Unit = func()
70-
})
74+
ugi
7175
}
7276

7377
def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) {
@@ -417,6 +421,11 @@ class SparkHadoopUtil extends Logging {
417421
creds.readTokenStorageStream(new DataInputStream(tokensBuf))
418422
creds
419423
}
424+
425+
def isProxyUser(ugi: UserGroupInformation): Boolean = {
426+
ugi.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.PROXY
427+
}
428+
420429
}
421430

422431
object SparkHadoopUtil {

core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,9 @@ private[security] class HBaseDelegationTokenProvider
5656
None
5757
}
5858

59-
override def delegationTokensRequired(hadoopConf: Configuration): Boolean = {
59+
override def delegationTokensRequired(
60+
sparkConf: SparkConf,
61+
hadoopConf: Configuration): Boolean = {
6062
hbaseConf(hadoopConf).get("hbase.security.authentication") == "kerberos"
6163
}
6264

core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ private[spark] class HadoopDelegationTokenManager(
115115
hadoopConf: Configuration,
116116
creds: Credentials): Long = {
117117
delegationTokenProviders.values.flatMap { provider =>
118-
if (provider.delegationTokensRequired(hadoopConf)) {
118+
if (provider.delegationTokensRequired(sparkConf, hadoopConf)) {
119119
provider.obtainDelegationTokens(hadoopConf, sparkConf, creds)
120120
} else {
121121
logDebug(s"Service ${provider.serviceName} does not require a token." +

core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ private[spark] trait HadoopDelegationTokenProvider {
3737
* Returns true if delegation tokens are required for this service. By default, it is based on
3838
* whether Hadoop security is enabled.
3939
*/
40-
def delegationTokensRequired(hadoopConf: Configuration): Boolean
40+
def delegationTokensRequired(sparkConf: SparkConf, hadoopConf: Configuration): Boolean
4141

4242
/**
4343
* Obtain delegation tokens for this service and get the time of the next renewal.

core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,9 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration
6969
nextRenewalDate
7070
}
7171

72-
def delegationTokensRequired(hadoopConf: Configuration): Boolean = {
72+
override def delegationTokensRequired(
73+
sparkConf: SparkConf,
74+
hadoopConf: Configuration): Boolean = {
7375
UserGroupInformation.isSecurityEnabled
7476
}
7577

core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ import org.apache.hadoop.security.{Credentials, UserGroupInformation}
3131
import org.apache.hadoop.security.token.Token
3232

3333
import org.apache.spark.SparkConf
34+
import org.apache.spark.deploy.SparkHadoopUtil
3435
import org.apache.spark.internal.Logging
36+
import org.apache.spark.internal.config.KEYTAB
3537
import org.apache.spark.util.Utils
3638

3739
private[security] class HiveDelegationTokenProvider
@@ -55,9 +57,21 @@ private[security] class HiveDelegationTokenProvider
5557
}
5658
}
5759

58-
override def delegationTokensRequired(hadoopConf: Configuration): Boolean = {
60+
override def delegationTokensRequired(
61+
sparkConf: SparkConf,
62+
hadoopConf: Configuration): Boolean = {
63+
// Delegation tokens are needed only when:
64+
// - trying to connect to a secure metastore
65+
// - either deploying in cluster mode without a keytab, or impersonating another user
66+
//
67+
// Other modes (such as client with or without keytab, or cluster mode with keytab) do not need
68+
// a delegation token, since there's a valid kerberos TGT for the right user available to the
69+
// driver, which is the only process that connects to the HMS.
70+
val deployMode = sparkConf.get("spark.submit.deployMode", "client")
5971
UserGroupInformation.isSecurityEnabled &&
60-
hiveConf(hadoopConf).getTrimmed("hive.metastore.uris", "").nonEmpty
72+
hiveConf(hadoopConf).getTrimmed("hive.metastore.uris", "").nonEmpty &&
73+
(SparkHadoopUtil.get.isProxyUser(UserGroupInformation.getCurrentUser()) ||
74+
(deployMode == "cluster" && !sparkConf.contains(KEYTAB)))
6175
}
6276

6377
override def obtainDelegationTokens(
@@ -83,7 +97,7 @@ private[security] class HiveDelegationTokenProvider
8397

8498
val hive2Token = new Token[DelegationTokenIdentifier]()
8599
hive2Token.decodeFromUrlString(tokenStr)
86-
logInfo(s"Get Token from hive metastore: ${hive2Token.toString}")
100+
logDebug(s"Get Token from hive metastore: ${hive2Token.toString}")
87101
creds.addToken(new Text("hive.server2.delegation.token"), hive2Token)
88102
}
89103

docs/running-on-yarn.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,15 @@ To use a custom metrics.properties for the application master and executors, upd
401401
Principal to be used to login to KDC, while running on secure HDFS. (Works also with the "local" master)
402402
</td>
403403
</tr>
404+
<tr>
405+
<td><code>spark.yarn.kerberos.relogin.period</code></td>
406+
<td>1m</td>
407+
<td>
408+
How often to check whether the kerberos TGT should be renewed. This should be set to a value
409+
that is shorter than the TGT renewal period (or the TGT lifetime if TGT renewal is not enabled).
410+
The default value should be enough for most deployments.
411+
</td>
412+
</tr>
404413
<tr>
405414
<td><code>spark.yarn.config.gatewayPath</code></td>
406415
<td>(none)</td>

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 55 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.deploy.yarn
2020
import java.io.{File, IOException}
2121
import java.lang.reflect.InvocationTargetException
2222
import java.net.{Socket, URI, URL}
23+
import java.security.PrivilegedExceptionAction
2324
import java.util.concurrent.{TimeoutException, TimeUnit}
2425

2526
import scala.collection.mutable.HashMap
@@ -28,6 +29,7 @@ import scala.concurrent.duration.Duration
2829
import scala.util.control.NonFatal
2930

3031
import org.apache.hadoop.fs.{FileSystem, Path}
32+
import org.apache.hadoop.security.UserGroupInformation
3133
import org.apache.hadoop.yarn.api._
3234
import org.apache.hadoop.yarn.api.records._
3335
import org.apache.hadoop.yarn.conf.YarnConfiguration
@@ -49,10 +51,7 @@ import org.apache.spark.util._
4951
/**
5052
* Common application master functionality for Spark on Yarn.
5153
*/
52-
private[spark] class ApplicationMaster(
53-
args: ApplicationMasterArguments,
54-
client: YarnRMClient)
55-
extends Logging {
54+
private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends Logging {
5655

5756
// TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
5857
// optimal as more containers are available. Might need to handle this better.
@@ -62,6 +61,46 @@ private[spark] class ApplicationMaster(
6261
.asInstanceOf[YarnConfiguration]
6362
private val isClusterMode = args.userClass != null
6463

64+
private val ugi = {
65+
val original = UserGroupInformation.getCurrentUser()
66+
67+
// If a principal and keytab were provided, log in to kerberos, and set up a thread to
68+
// renew the kerberos ticket when needed. Because the UGI API does not expose the TTL
69+
// of the TGT, use a configuration to define how often to check that a relogin is necessary.
70+
// checkTGTAndReloginFromKeytab() is a no-op if the relogin is not yet needed.
71+
val principal = sparkConf.get(PRINCIPAL).orNull
72+
val keytab = sparkConf.get(KEYTAB).orNull
73+
if (principal != null && keytab != null) {
74+
UserGroupInformation.loginUserFromKeytab(principal, keytab)
75+
76+
val renewer = new Thread() {
77+
override def run(): Unit = Utils.tryLogNonFatalError {
78+
while (true) {
79+
TimeUnit.SECONDS.sleep(sparkConf.get(KERBEROS_RELOGIN_PERIOD))
80+
UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab()
81+
}
82+
}
83+
}
84+
renewer.setName("am-kerberos-renewer")
85+
renewer.setDaemon(true)
86+
renewer.start()
87+
88+
// Transfer the original user's tokens to the new user, since that's needed to connect to
89+
// YARN. It also copies over any delegation tokens that might have been created by the
90+
// client, which will then be transferred over when starting executors (until new ones
91+
// are created by the periodic task).
92+
val newUser = UserGroupInformation.getCurrentUser()
93+
SparkHadoopUtil.get.transferCredentials(original, newUser)
94+
newUser
95+
} else {
96+
SparkHadoopUtil.get.createSparkUser()
97+
}
98+
}
99+
100+
private val client = ugi.doAs(new PrivilegedExceptionAction[YarnRMClient]() {
101+
def run: YarnRMClient = new YarnRMClient()
102+
})
103+
65104
// Default to twice the number of executors (twice the maximum number of executors if dynamic
66105
// allocation is enabled), with a minimum of 3.
67106

@@ -201,6 +240,13 @@ private[spark] class ApplicationMaster(
201240
}
202241

203242
final def run(): Int = {
243+
ugi.doAs(new PrivilegedExceptionAction[Unit]() {
244+
def run: Unit = runImpl()
245+
})
246+
exitCode
247+
}
248+
249+
private def runImpl(): Unit = {
204250
try {
205251
val appAttemptId = client.getAttemptId()
206252

@@ -254,11 +300,6 @@ private[spark] class ApplicationMaster(
254300
}
255301
}
256302

257-
// Call this to force generation of secret so it gets populated into the
258-
// Hadoop UGI. This has to happen before the startUserApplication which does a
259-
// doAs in order for the credentials to be passed on to the executor containers.
260-
val securityMgr = new SecurityManager(sparkConf)
261-
262303
// If the credentials file config is present, we must periodically renew tokens. So create
263304
// a new AMDelegationTokenRenewer
264305
if (sparkConf.contains(CREDENTIALS_FILE_PATH)) {
@@ -284,6 +325,9 @@ private[spark] class ApplicationMaster(
284325
credentialRenewerThread.join()
285326
}
286327

328+
// Call this to force generation of secret so it gets populated into the Hadoop UGI.
329+
val securityMgr = new SecurityManager(sparkConf)
330+
287331
if (isClusterMode) {
288332
runDriver(securityMgr)
289333
} else {
@@ -297,7 +341,6 @@ private[spark] class ApplicationMaster(
297341
ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
298342
"Uncaught exception: " + e)
299343
}
300-
exitCode
301344
}
302345

303346
/**
@@ -775,10 +818,8 @@ object ApplicationMaster extends Logging {
775818
sys.props(k) = v
776819
}
777820
}
778-
SparkHadoopUtil.get.runAsSparkUser { () =>
779-
master = new ApplicationMaster(amArgs, new YarnRMClient)
780-
System.exit(master.run())
781-
}
821+
master = new ApplicationMaster(amArgs)
822+
System.exit(master.run())
782823
}
783824

784825
private[spark] def sparkContextInitialized(sc: SparkContext): Unit = {

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,10 @@ private[spark] class Client(
394394
if (credentials != null) {
395395
// Add credentials to current user's UGI, so that following operations don't need to use the
396396
// Kerberos tgt to get delegations again in the client side.
397-
UserGroupInformation.getCurrentUser.addCredentials(credentials)
397+
val currentUser = UserGroupInformation.getCurrentUser()
398+
if (SparkHadoopUtil.get.isProxyUser(currentUser)) {
399+
currentUser.addCredentials(credentials)
400+
}
398401
logDebug(YarnSparkHadoopUtil.get.dumpTokens(credentials).mkString("\n"))
399402
}
400403

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,10 @@ package object config {
347347
.timeConf(TimeUnit.MILLISECONDS)
348348
.createWithDefault(Long.MaxValue)
349349

350+
private[spark] val KERBEROS_RELOGIN_PERIOD = ConfigBuilder("spark.yarn.kerberos.relogin.period")
351+
.timeConf(TimeUnit.SECONDS)
352+
.createWithDefaultString("1m")
353+
350354
// The list of cache-related config entries. This is used by Client and the AM to clean
351355
// up the environment so that these settings do not appear on the web UI.
352356
private[yarn] val CACHE_CONFIGS = Seq(

0 commit comments

Comments
 (0)