@@ -57,8 +57,6 @@ private[spark] class MesosHadoopDelegationTokenManager(
5757
5858 private val principal : String = conf.get(config.PRINCIPAL ).orNull
5959
60- private val (secretFile : Option [String ], mode : String ) = getSecretFile(conf)
61-
6260 private var (tokens : Array [Byte ], timeOfNextRenewal : Long ) = {
6361 try {
6462 val creds = UserGroupInformation .getCurrentUser.getCredentials
@@ -68,41 +66,24 @@ private[spark] class MesosHadoopDelegationTokenManager(
6866 (SparkHadoopUtil .get.serialize(creds), rt)
6967 } catch {
7068 case e : Exception =>
71- logError(" Failed to initialize Hadoop delegation tokens\n " +
72- s " \t Pricipal: $principal\n\t mode: $mode\n\t secret file $secretFile\n\t Exception: $e" )
69+ logError(s " Failed to fetch Hadoop delegation tokens $e" )
7370 throw e
7471 }
7572 }
7673
77- scheduleTokenRenewal( )
74+ private val keytabFile : Option [ String ] = conf.get(config. KEYTAB )
7875
79- private def getSecretFile (conf : SparkConf ): (Option [String ], String ) = {
80- val keytab = conf.get(config.KEYTAB )
81- val tgt = Option (conf.getenv(SparkHadoopUtil .TICKET_CACHE_ENVVAR ))
82- val (secretFile, mode) = if (keytab.isDefined && tgt.isDefined) {
83- // if a keytab and a specific ticket cache is specified use the keytab and log the behavior
84- logWarning(s " Keytab and TGT were detected, using keytab, ${keytab.get}, " +
85- s " unset ${config.KEYTAB .key} to use TGT ( ${tgt.get}) " )
86- (keytab, " keytab" )
87- } else {
88- val m = if (keytab.isDefined) " keytab" else " tgt"
89- val sf = if (keytab.isDefined) keytab else tgt
90- (sf, m)
91- }
76+ scheduleTokenRenewal()
9277
93- if (principal == null ) {
94- require(mode == " tgt" , s " Must specify a principal when using a Keytab, was $principal" )
95- logInfo(s " Using ticket cache to fetch Hadoop delegation tokens " )
78+ private def scheduleTokenRenewal (): Unit = {
79+ if (keytabFile.isDefined) {
80+ require(principal != null , " Principal is required for Keytab-based authentication" )
81+ logInfo(s " Using keytab: ${keytabFile.get} and principal $principal" )
9682 } else {
97- logInfo(s " Using principal: $principal with mode and keytab $keytab " +
98- s " to fetch Hadoop delegation tokens " )
83+ logInfo(" Using ticket cache for Kerberos authentication, no token renewal. " )
84+ return
9985 }
10086
101- logDebug(s " secretFile is $secretFile" )
102- (secretFile, mode)
103- }
104-
105- private def scheduleTokenRenewal (): Unit = {
10687 def scheduleRenewal (runnable : Runnable ): Unit = {
10788 val remainingTime = timeOfNextRenewal - System .currentTimeMillis()
10889 if (remainingTime <= 0 ) {
@@ -118,7 +99,7 @@ private[spark] class MesosHadoopDelegationTokenManager(
11899 new Runnable {
119100 override def run (): Unit = {
120101 try {
121- val tokensBytes = getNewDelegationTokens
102+ val tokensBytes = getNewDelegationTokens()
122103 broadcastDelegationTokens(tokensBytes)
123104 } catch {
124105 case e : Exception =>
@@ -134,21 +115,12 @@ private[spark] class MesosHadoopDelegationTokenManager(
134115 }
135116
136117 private def getNewDelegationTokens (): Array [Byte ] = {
137- logInfo(s " Attempting to login to KDC with ${conf.get(config. PRINCIPAL ).orNull }" )
118+ logInfo(s " Attempting to login to KDC with principal ${principal }" )
138119 // Get new delegation tokens by logging in with a new UGI
139- // inspired by AMCredentialRenewer.scala:L174
140- val ugi = if (mode == " keytab" ) {
141- UserGroupInformation .loginUserFromKeytabAndReturnUGI(principal, secretFile.get)
142- } else {
143- // if the ticket cache is not explicitly defined, use the default
144- if (secretFile.isEmpty) {
145- UserGroupInformation .getCurrentUser
146- } else {
147- UserGroupInformation .getUGIFromTicketCache(secretFile.get, principal)
148- }
149- }
120+ // inspired by AMCredentialRenewer.scala:L174.
121+ // Don't protect against keytabFile being empty because it's guarded above.
122+ val ugi = UserGroupInformation .loginUserFromKeytabAndReturnUGI(principal, keytabFile.get)
150123 logInfo(" Successfully logged into KDC" )
151-
152124 val tempCreds = ugi.getCredentials
153125 val hadoopConf = SparkHadoopUtil .get.newConfiguration(conf)
154126 val nextRenewalTime = ugi.doAs(new PrivilegedExceptionAction [Long ] {
0 commit comments