@@ -55,7 +55,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
5555 private var principal : String = null
5656 @ volatile private var loggedInViaKeytab = false
5757 @ volatile private var loggedInUGI : UserGroupInformation = null
58- @ volatile private var lastCredentialsRefresh = 0l
58+ @ volatile private var lastCredentialsRefresh = 0L
5959 private lazy val delegationTokenRenewer =
6060 Executors .newSingleThreadScheduledExecutor(
6161 Utils .namedThreadFactory(" Delegation Token Refresh Thread" ))
@@ -130,8 +130,22 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
130130 newCredentials.writeTokenStorageToStream(stream)
131131 stream.hflush()
132132 stream.close()
133- remoteFs.delete(tokenPath, true )
133+ // HDFS does reads by inodes now, so just doing a rename should be fine. But I could
134+ // not find a clear explanation of when the blocks on HDFS are deleted. Ideally, we
135+ // would not need this, but just be defensive to ensure we don't mess up the
136+ // credentials. So create a file to show that we are currently updating - if the
137+ // reader sees this file, they go away and come back later. Then delete old token and
138+ // rename the old to new.
139+ val updatingPath = new Path (stagingDirPath, " _UPDATING" )
140+ if (remoteFs.exists(updatingPath)) {
141+ remoteFs.delete(updatingPath, true )
142+ }
143+ remoteFs.create(updatingPath).close()
144+ if (remoteFs.exists(tokenPath)) {
145+ remoteFs.delete(tokenPath, true )
146+ }
134147 remoteFs.rename(tempTokenPath, tokenPath)
148+ remoteFs.delete(updatingPath, true )
135149 delegationTokenRenewer.schedule(
136150 this , (0.75 * (getLatestValidity - System .currentTimeMillis())).toLong,
137151 TimeUnit .MILLISECONDS )
@@ -151,13 +165,19 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
151165 val sparkStagingDir = System .getenv(" SPARK_YARN_STAGING_DIR" )
152166 val stagingDirPath = new Path (remoteFs.getHomeDirectory, sparkStagingDir)
153167 val credentialsFilePath = new Path (stagingDirPath, credentialsFile)
168+ // If an update is currently in progress, come back later!
169+ if (remoteFs.exists( new Path (stagingDirPath, " _UPDATING" ))) {
170+ delegationTokenRenewer.schedule(delegationTokenExecuterUpdaterThread, 1 , TimeUnit .HOURS )
171+ }
172+ // Now check if the file exists, if it does go get the credentials from there
154173 if (remoteFs.exists(credentialsFilePath)) {
155174 val status = remoteFs.getFileStatus(credentialsFilePath)
156175 val modTimeAtStart = status.getModificationTime
157176 if (modTimeAtStart > lastCredentialsRefresh) {
158177 val newCredentials = getCredentialsFromHDFSFile(remoteFs, credentialsFilePath)
159178 val newStatus = remoteFs.getFileStatus(credentialsFilePath)
160- // File was updated after we started reading it, lets come back later and try to read it.
179+ // File was updated after we started reading it, lets come back later and try to read
180+ // it.
161181 if (newStatus.getModificationTime != modTimeAtStart) {
162182 delegationTokenRenewer
163183 .schedule(delegationTokenExecuterUpdaterThread, 1 , TimeUnit .HOURS )
@@ -177,7 +197,8 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
177197 }
178198 }
179199 } catch {
180- // Since the file may get deleted while we are reading it,
200+ // Since the file may get deleted while we are reading it, catch the Exception and come
201+ // back in an hour to try again
181202 case e : Exception =>
182203 logWarning(
183204 " Error encountered while trying to update credentials, will try again in 1 hour" , e)
0 commit comments