@@ -126,11 +126,12 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
126126 sparkConf.getOption(" spark.yarn.principal" ).foreach { principal =>
127127 val keytab = sparkConf.get(" spark.yarn.keytab" )
128128
129+ def getRenewalInterval =
130+ math.max((0.75 * (getLatestValidity - System .currentTimeMillis())).toLong, 0L )
131+
129132 def scheduleRenewal (runnable : Runnable ) = {
130- // Latest validity can be -ve if the original tokens expired, and then the AM died.
131- val renewalInterval =
132- math.max((0.75 * (getLatestValidity - System .currentTimeMillis())).toLong, 0L )
133- logInfo(" Scheduling login from keytab in " + renewalInterval + " millis." )
133+ val renewalInterval = getRenewalInterval
134+ logInfo(s " Scheduling login from keytab in $renewalInterval millis. " )
134135 delegationTokenRenewer.schedule(runnable, renewalInterval, TimeUnit .MILLISECONDS )
135136 }
136137
@@ -139,7 +140,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
139140 new Runnable {
140141 override def run (): Unit = {
141142 try {
142- renewCredentials (principal, keytab)
143+ writeNewTokensToHDFS (principal, keytab)
143144 } catch {
144145 case e : Exception =>
145146 logWarning(" Failed to write out new credentials to HDFS, will try again in an " +
@@ -150,11 +151,15 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
150151 scheduleRenewal(this )
151152 }
152153 }
154+ // If this is an AM restart, it is possible that the original tokens have expired, which
155+ // means we need to login immediately to get new tokens.
156+ if (getRenewalInterval == 0 ) writeNewTokensToHDFS(principal, keytab)
157+ // Schedule update of credentials
153158 scheduleRenewal(driverTokenRenewerRunnable)
154159 }
155160 }
156161
157- private def renewCredentials (principal : String , keytab : String ): Unit = {
162+ private def writeNewTokensToHDFS (principal : String , keytab : String ): Unit = {
158163 if (! loggedInViaKeytab) {
159164 // Keytab is copied by YARN to the working directory of the AM, so full path is
160165 // not needed.
@@ -179,8 +184,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
179184 delegationTokenRenewer.schedule(reloginRunnable, 6 , TimeUnit .HOURS )
180185 }
181186 val nns = getNameNodesToAccess(sparkConf)
182- val newCredentials = loggedInUGI.getCredentials
183- obtainTokensForNamenodes(nns, conf, newCredentials)
187+ obtainTokensForNamenodes(nns, conf, loggedInUGI.getCredentials)
184188 val remoteFs = FileSystem .get(conf)
185189 // If lastCredentialsFileSuffix is 0, then the AM is either started or restarted. If the AM
186190 // was restarted, then the lastCredentialsFileSuffix might be > 0, so find the newest file
@@ -201,7 +205,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
201205 val stream = Option (remoteFs.create(tempTokenPath, true ))
202206 try {
203207 stream.foreach { s =>
204- newCredentials .writeTokenStorageToStream(s)
208+ loggedInUGI.getCredentials .writeTokenStorageToStream(s)
205209 s.hflush()
206210 s.close()
207211 logInfo(s " Delegation Tokens written out successfully. Renaming file to $tokenPathStr" )
@@ -241,7 +245,6 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
241245 sparkConf.getOption(" spark.yarn.credentials.file" ).foreach { credentialsFile =>
242246 val credentialsFilePath = new Path (credentialsFile)
243247 val remoteFs = FileSystem .get(conf)
244-
245248 listCredentialsFilesSorted(remoteFs, credentialsFilePath)
246249 .lastOption.foreach { credentialsStatus =>
247250 val suffix = getSuffixForCredentialsPath(credentialsStatus)
@@ -250,7 +253,6 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
250253 val newCredentials = getCredentialsFromHDFSFile(remoteFs, credentialsStatus.getPath)
251254 lastCredentialsFileSuffix = suffix
252255 UserGroupInformation .getCurrentUser.addCredentials(newCredentials)
253-
254256 val totalValidity = getLatestValidity - credentialsStatus.getModificationTime
255257 val timeToRunRenewal =
256258 credentialsStatus.getModificationTime + (0.8 * totalValidity).toLong
0 commit comments