@@ -58,7 +58,6 @@ class AMDelegationTokenRenewer(sparkConf: SparkConf, hadoopConf: Configuration)
5858 Utils .namedThreadFactory(" Delegation Token Refresh Thread" ))
5959
6060 private var loggedInViaKeytab = false
61- private var loggedInUGI : UserGroupInformation = null
6261 var driverActor : ActorSelection = null
6362
6463 private lazy val hadoopUtil = YarnSparkHadoopUtil .get
@@ -74,9 +73,17 @@ class AMDelegationTokenRenewer(sparkConf: SparkConf, hadoopConf: Configuration)
7473 val keytab = sparkConf.get(" spark.yarn.keytab" )
7574
7675 def getRenewalInterval = {
76+ import scala .concurrent .duration ._
7777 val credentials = UserGroupInformation .getCurrentUser.getCredentials
78- math.max((0.75 * (hadoopUtil.getLatestTokenValidity(credentials) -
79- System .currentTimeMillis())).toLong, 0L )
78+ val interval = (0.75 * (hadoopUtil.getLatestTokenValidity(credentials) -
79+ System .currentTimeMillis())).toLong
80+ // If only 6 hours left, then force a renewal immediately. This is to avoid tokens with
81+ // very less validity being used on AM restart.
82+ if ((interval millis).toHours <= 6 ) {
83+ 0L
84+ } else {
85+ interval
86+ }
8087 }
8188
8289 def scheduleRenewal (runnable : Runnable ) = {
@@ -89,23 +96,32 @@ class AMDelegationTokenRenewer(sparkConf: SparkConf, hadoopConf: Configuration)
8996 val driverTokenRenewerRunnable =
9097 new Runnable {
9198 override def run (): Unit = {
99+ var wroteNewFiles = false
92100 try {
93101 writeNewTokensToHDFS(principal, keytab)
102+ wroteNewFiles = true
103+ cleanupOldFiles()
94104 } catch {
95105 case e : Exception =>
96- logWarning(" Failed to write out new credentials to HDFS, will try again in an " +
97- " hour! If this happens too often tasks will fail." , e)
98- delegationTokenRenewer.schedule(this , 1 , TimeUnit .HOURS )
99- return
106+ // If the exception was due to some issue deleting files, don't worry about it -
107+ // just try to clean up next time. Else, reschedule for an hour later so new
108+ // tokens get written out.
109+ if (! wroteNewFiles) {
110+ logWarning(" Failed to write out new credentials to HDFS, will try again in an " +
111+ " hour! If this happens too often tasks will fail." , e)
112+ delegationTokenRenewer.schedule(this , 1 , TimeUnit .HOURS )
113+ return
114+ } else {
115+ logWarning(" Error while attempting to clean up old delegation token files. " +
116+ " Cleanup will be reattempted the next time new tokens are being written." )
117+ }
100118 }
101- cleanupOldFiles()
102119 scheduleRenewal(this )
103120 }
104121 }
105- // If this is an AM restart, it is possible that the original tokens have expired, which
106- // means we need to login immediately to get new tokens.
107- if (getRenewalInterval == 0 ) writeNewTokensToHDFS(principal, keytab)
108- // Schedule update of credentials
122+ // Schedule update of credentials. This handles the case of updating the tokens right now
123+ // as well, since the renenwal interval will be 0, and the thread will get scheduled
124+ // immediately.
109125 scheduleRenewal(driverTokenRenewerRunnable)
110126
111127 }
@@ -135,34 +151,20 @@ class AMDelegationTokenRenewer(sparkConf: SparkConf, hadoopConf: Configuration)
135151 // Keytab is copied by YARN to the working directory of the AM, so full path is
136152 // not needed.
137153 logInfo(s " Attempting to login to KDC using principal: $principal" )
138- loggedInUGI = UserGroupInformation .loginUserFromKeytabAndReturnUGI(
139- principal, keytab)
154+ UserGroupInformation .loginUserFromKeytab(principal, keytab)
140155 logInfo(" Successfully logged into KDC." )
141156 loggedInViaKeytab = true
142- // Not exactly sure when HDFS re-logs in, be safe and do it ourselves.
143- // Periodically check and relogin this keytab. The UGI will take care of not relogging in
144- // if it is not necessary to relogin.
145- val reloginRunnable = new Runnable {
146- override def run (): Unit = {
147- try {
148- loggedInUGI.checkTGTAndReloginFromKeytab()
149- } catch {
150- case e : Exception =>
151- logError(" Error while attempting tp relogin to KDC" , e)
152- }
153- }
154- }
155- delegationTokenRenewer.schedule(reloginRunnable, 6 , TimeUnit .HOURS )
156157 }
157158 val nns = YarnSparkHadoopUtil .get.getNameNodesToAccess(sparkConf)
158- YarnSparkHadoopUtil .get.obtainTokensForNamenodes(nns, hadoopConf, loggedInUGI.getCredentials)
159+ hadoopUtil.obtainTokensForNamenodes(
160+ nns, hadoopConf, UserGroupInformation .getCurrentUser.getCredentials)
159161 val remoteFs = FileSystem .get(hadoopConf)
160162 // If lastCredentialsFileSuffix is 0, then the AM is either started or restarted. If the AM
161163 // was restarted, then the lastCredentialsFileSuffix might be > 0, so find the newest file
162164 // and update the lastCredentialsFileSuffix.
163165 if (lastCredentialsFileSuffix == 0 ) {
164166 val credentialsPath = new Path (sparkConf.get(" spark.yarn.credentials.file" ))
165- SparkHadoopUtil .get .listFilesSorted(
167+ hadoopUtil .listFilesSorted(
166168 remoteFs, credentialsPath.getParent, credentialsPath.getName, " .tmp" )
167169 .lastOption.foreach { status =>
168170 lastCredentialsFileSuffix = getSuffixForCredentialsPath(status)
@@ -179,9 +181,10 @@ class AMDelegationTokenRenewer(sparkConf: SparkConf, hadoopConf: Configuration)
179181 stream.foreach { s =>
180182 val baos = new ByteArrayOutputStream ()
181183 val dataOutputStream = new DataOutputStream (baos)
182- loggedInUGI.getCredentials.writeTokenStorageToStream(dataOutputStream)
184+ val credentials = UserGroupInformation .getCurrentUser.getCredentials
185+ credentials.writeTokenStorageToStream(dataOutputStream)
183186 dataOutputStream.close()
184- loggedInUGI.getCredentials .writeTokenStorageToStream(s)
187+ credentials .writeTokenStorageToStream(s)
185188 s.hflush()
186189 s.close()
187190 logInfo(s " Delegation Tokens written out successfully. Renaming file to $tokenPathStr" )
0 commit comments