@@ -20,7 +20,7 @@ package org.apache.spark.deploy.yarn
2020import java .io ._
2121import java .net .URI
2222import java .nio .ByteBuffer
23- import java .util .concurrent .atomic .AtomicBoolean
23+ import java .util .concurrent .atomic .{ AtomicReference , AtomicBoolean }
2424import java .util .concurrent .{TimeUnit , ThreadFactory , Executors }
2525import java .util .regex .Matcher
2626import java .util .regex .Pattern
@@ -52,6 +52,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
5252 private var keytabFile : Option [String ] = None
5353 private var loginPrincipal : Option [String ] = None
5454 private val loggedInViaKeytab = new AtomicBoolean (false )
55+ private val loggedInUGI = new AtomicReference [UserGroupInformation ](null )
5556
5657 override def transferCredentials (source : UserGroupInformation , dest : UserGroupInformation ) {
5758 dest.addCredentials(source.getCredentials())
@@ -131,11 +132,12 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
131132 }).scheduleWithFixedDelay(new Runnable {
132133 override def run (): Unit = {
133134 if (! loggedInViaKeytab.get()) {
134- loginUserFromKeytab(principal, tempDir.getAbsolutePath + Path .SEPARATOR + keytab)
135+ loggedInUGI.set(UserGroupInformation .loginUserFromKeytabAndReturnUGI(
136+ principal, tempDir.getAbsolutePath + Path .SEPARATOR + keytab))
135137 loggedInViaKeytab.set(true )
136138 }
137139 val nns = getNameNodesToAccess(sparkConf) + remoteKeytabPath
138- val newCredentials = new Credentials ()
140+ val newCredentials = loggedInUGI.get().getCredentials
139141 obtainTokensForNamenodes(nns, conf, newCredentials)
140142 // Now write this out via Akka to executors.
141143 val outputStream = new ByteArrayOutputStream ()
0 commit comments