@@ -126,6 +126,13 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
126126 private [spark] override def scheduleLoginFromKeytab (): Unit = {
127127 sparkConf.getOption(" spark.yarn.principal" ).foreach { principal =>
128128 val keytab = sparkConf.get(" spark.yarn.keytab" )
129+
130+ def scheduleRenewal (runnable : Runnable ) = {
131+ val renewalInterval = (0.75 * (getLatestValidity - System .currentTimeMillis())).toLong
132+ logInfo(" Scheduling login from keytab in " + renewalInterval + " millis." )
133+ delegationTokenRenewer.schedule(runnable, renewalInterval, TimeUnit .MILLISECONDS )
134+ }
135+
129136 // This thread periodically runs on the driver to update the delegation tokens on HDFS.
130137 val driverTokenRenewerRunnable =
131138 new Runnable {
@@ -139,23 +146,21 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
139146 delegationTokenRenewer.schedule(this , 1 , TimeUnit .HOURS )
140147 return
141148 }
142- delegationTokenRenewer.schedule(
143- this , (0.75 * (getLatestValidity - System .currentTimeMillis())).toLong,
144- TimeUnit .MILLISECONDS )
149+ scheduleRenewal(this )
145150 }
146151 }
147- val timeToRenewal = (0.75 * (getLatestValidity - System .currentTimeMillis())).toLong
148- delegationTokenRenewer.schedule(
149- driverTokenRenewerRunnable, timeToRenewal, TimeUnit .MILLISECONDS )
152+ scheduleRenewal(driverTokenRenewerRunnable)
150153 }
151154 }
152155
153156 private def renewCredentials (principal : String , keytab : String ): Unit = {
154157 if (! loggedInViaKeytab) {
155158 // Keytab is copied by YARN to the working directory of the AM, so full path is
156159 // not needed.
160+ logInfo(s " Attempting to login to KDC using principal: $principal" )
157161 loggedInUGI = UserGroupInformation .loginUserFromKeytabAndReturnUGI(
158162 principal, keytab)
163+ logInfo(" Successfully logged into KDC." )
159164 loggedInViaKeytab = true
160165 }
161166 val nns = getNameNodesToAccess(sparkConf)
@@ -167,13 +172,16 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
167172 sparkConf.get(" spark.yarn.credentials.file" ) + " -" + nextSuffix
168173 val tokenPath = new Path (tokenPathStr)
169174 val tempTokenPath = new Path (tokenPathStr + " .tmp" )
175+ logInfo(" Writing out delegation tokens to " + tempTokenPath.toString)
170176 val stream = Option (remoteFs.create(tempTokenPath, true ))
171177 try {
172178 stream.foreach { s =>
173179 newCredentials.writeTokenStorageToStream(s)
174180 s.hflush()
175181 s.close()
182+ logInfo(s " Delegation Tokens written out successfully. Renaming file to $tokenPathStr" )
176183 remoteFs.rename(tempTokenPath, tokenPath)
184+ logInfo(" Delegation token file rename complete." )
177185 }
178186 } finally {
179187 stream.foreach(_.close())
@@ -205,16 +213,23 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
205213 val credentials = credentialsStatus.getPath
206214 val suffix = credentials.getName.substring(credentials.getName.lastIndexOf(" -" ) + 1 ).toInt
207215 if (suffix > lastCredentialsFileSuffix) {
216+ logInfo(" Reading new delegation tokens from " + credentials.toString)
208217 val newCredentials = getCredentialsFromHDFSFile(remoteFs, credentials)
218+ lastCredentialsFileSuffix = suffix
209219 UserGroupInformation .getCurrentUser.addCredentials(newCredentials)
220+
210221 val totalValidity = getLatestValidity - credentialsStatus.getModificationTime
211222 val timeToRunRenewal =
212223 credentialsStatus.getModificationTime + (0.8 * totalValidity).toLong
213224 val timeFromNowToRenewal = timeToRunRenewal - System .currentTimeMillis()
214- delegationTokenRenewer.schedule(executorUpdaterRunnable,
215- timeFromNowToRenewal, TimeUnit .MILLISECONDS )
225+ logInfo(" Updated delegation tokens, will check for new tokens in " +
226+ timeFromNowToRenewal + " millis" )
227+ delegationTokenRenewer.schedule(
228+ executorUpdaterRunnable, timeFromNowToRenewal, TimeUnit .MILLISECONDS )
216229 } else {
217230 // Check every hour to see if new credentials arrived.
231+ logInfo(" Updated delegation tokens were expected, but the driver has not updated the " +
232+ " tokens yet, will check again in an hour." )
218233 delegationTokenRenewer.schedule(executorUpdaterRunnable, 1 , TimeUnit .HOURS )
219234 }
220235 }
@@ -223,8 +238,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
223238 // Since the file may get deleted while we are reading it, catch the Exception and come
224239 // back in an hour to try again
225240 case e : Exception =>
226- logWarning(
227- " Error encountered while trying to update credentials, will try again in 1 hour" , e)
241+ logWarning(" Error while trying to update credentials, will try again in 1 hour" , e)
228242 delegationTokenRenewer.schedule(executorUpdaterRunnable, 1 , TimeUnit .HOURS )
229243 }
230244 }
0 commit comments