@@ -127,7 +127,9 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
127127 val keytab = sparkConf.get(" spark.yarn.keytab" )
128128
129129 def scheduleRenewal (runnable : Runnable ) = {
130- val renewalInterval = (0.75 * (getLatestValidity - System .currentTimeMillis())).toLong
130+ // Latest validity can be -ve if the original tokens expired, and then the AM died.
131+ val renewalInterval =
132+ math.max((0.75 * (getLatestValidity - System .currentTimeMillis())).toLong, 0L )
131133 logInfo(" Scheduling login from keytab in " + renewalInterval + " millis." )
132134 delegationTokenRenewer.schedule(runnable, renewalInterval, TimeUnit .MILLISECONDS )
133135 }
@@ -180,6 +182,16 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
180182 val newCredentials = loggedInUGI.getCredentials
181183 obtainTokensForNamenodes(nns, conf, newCredentials)
182184 val remoteFs = FileSystem .get(conf)
185+ // If lastCredentialsFileSuffix is 0, then the AM is either started or restarted. If the AM
186+ // was restarted, then the lastCredentialsFileSuffix might be > 0, so find the newest file
187+ // and update the lastCredentialsFileSuffix.
188+ if (lastCredentialsFileSuffix == 0 ) {
189+ val credentialsPath = new Path (sparkConf.get(" spark.yarn.credentials.file" ))
190+ listCredentialsFilesSorted(remoteFs, credentialsPath)
191+ .lastOption.foreach { status =>
192+ lastCredentialsFileSuffix = getSuffixForCredentialsPath(status)
193+ }
194+ }
183195 val nextSuffix = lastCredentialsFileSuffix + 1
184196 val tokenPathStr =
185197 sparkConf.get(" spark.yarn.credentials.file" ) + " -" + nextSuffix
@@ -199,35 +211,43 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
199211 } finally {
200212 stream.foreach(_.close())
201213 }
202-
203214 lastCredentialsFileSuffix = nextSuffix
204215 }
205216
217+ private def listCredentialsFilesSorted (
218+ remoteFs : FileSystem ,
219+ credentialsFilePath : Path ): Array [FileStatus ] = {
220+ val fileStatuses = remoteFs.listStatus(credentialsFilePath.getParent,
221+ new PathFilter {
222+ override def accept (path : Path ): Boolean = {
223+ val name = path.getName
224+ name.startsWith(credentialsFilePath.getName) && ! name.endsWith(" .tmp" )
225+ }
226+ })
227+ Arrays .sort(fileStatuses, new Comparator [FileStatus ] {
228+ override def compare (o1 : FileStatus , o2 : FileStatus ): Int = {
229+ Longs .compare(o1.getModificationTime, o2.getModificationTime)
230+ }
231+ })
232+ fileStatuses
233+ }
234+
235+ private def getSuffixForCredentialsPath (credentialsStatus : FileStatus ): Int = {
236+ val fileName = credentialsStatus.getPath.getName
237+ fileName.substring(fileName.lastIndexOf(" -" ) + 1 ).toInt
238+ }
206239 override def updateCredentialsIfRequired (): Unit = {
207240 try {
208241 sparkConf.getOption(" spark.yarn.credentials.file" ).foreach { credentialsFile =>
209242 val credentialsFilePath = new Path (credentialsFile)
210243 val remoteFs = FileSystem .get(conf)
211- val stagingDirPath = new Path (remoteFs.getHomeDirectory, credentialsFilePath.getParent)
212- val fileStatuses =
213- remoteFs.listStatus(stagingDirPath,
214- new PathFilter {
215- override def accept (path : Path ): Boolean = {
216- val name = path.getName
217- name.startsWith(credentialsFilePath.getName) && ! name.endsWith(" .tmp" )
218- }
219- })
220- Arrays .sort(fileStatuses, new Comparator [FileStatus ] {
221- override def compare (o1 : FileStatus , o2 : FileStatus ): Int = {
222- Longs .compare(o1.getModificationTime, o2.getModificationTime)
223- }
224- })
225- fileStatuses.lastOption.foreach { credentialsStatus =>
226- val credentials = credentialsStatus.getPath
227- val suffix = credentials.getName.substring(credentials.getName.lastIndexOf(" -" ) + 1 ).toInt
244+
245+ listCredentialsFilesSorted(remoteFs, credentialsFilePath)
246+ .lastOption.foreach { credentialsStatus =>
247+ val suffix = getSuffixForCredentialsPath(credentialsStatus)
228248 if (suffix > lastCredentialsFileSuffix) {
229- logInfo(" Reading new delegation tokens from " + credentials.toString )
230- val newCredentials = getCredentialsFromHDFSFile(remoteFs, credentials )
249+ logInfo(" Reading new delegation tokens from " + credentialsStatus.getPath )
250+ val newCredentials = getCredentialsFromHDFSFile(remoteFs, credentialsStatus.getPath )
231251 lastCredentialsFileSuffix = suffix
232252 UserGroupInformation .getCurrentUser.addCredentials(newCredentials)
233253
0 commit comments