@@ -24,9 +24,9 @@ import java.util.UUID
2424
2525import scala .collection .JavaConversions ._
2626import scala .collection .mutable .{ArrayBuffer , HashMap , ListBuffer , Map }
27- import scala .util .{Random , Try , Success , Failure }
27+ import scala .util .{Try , Success , Failure }
2828
29- import com .google .common .base .Objects
29+ import com .google .common .base .{ Preconditions , Objects }
3030
3131import org .apache .hadoop .io .DataOutputBuffer
3232import org .apache .hadoop .conf .Configuration
@@ -43,8 +43,8 @@ import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication}
4343import org .apache .hadoop .yarn .conf .YarnConfiguration
4444import org .apache .hadoop .yarn .util .Records
4545
46- import org .apache .spark .{Logging , SecurityManager , SparkConf , SparkContext , SparkException }
4746import org .apache .spark .deploy .SparkHadoopUtil
47+ import org .apache .spark .{Logging , SecurityManager , SparkConf , SparkContext , SparkException }
4848import org .apache .spark .util .Utils
4949
5050private [spark] class Client (
@@ -67,10 +67,7 @@ private[spark] class Client(
6767 private val executorMemoryOverhead = args.executorMemoryOverhead // MB
6868 private val distCacheMgr = new ClientDistributedCacheManager ()
6969 private val isClusterMode = args.isClusterMode
70-
7170 private var loginFromKeytab = false
72- private var keytabFileName : String = null
73-
7471
7572 def stop (): Unit = yarnClient.stop()
7673
@@ -221,10 +218,8 @@ private[spark] class Client(
221218 // and add them as local resources to the application master.
222219 val fs = FileSystem .get(hadoopConf)
223220 val dst = new Path (fs.getHomeDirectory(), appStagingDir)
224- val nns =
225- SparkHadoopUtil .get.asInstanceOf [YarnSparkHadoopUtil ].getNameNodesToAccess(sparkConf) + dst
226- SparkHadoopUtil .get.asInstanceOf [YarnSparkHadoopUtil ].
227- obtainTokensForNamenodes(nns, hadoopConf, credentials)
221+ val nns = YarnSparkHadoopUtil .get.getNameNodesToAccess(sparkConf) + dst
222+ YarnSparkHadoopUtil .get.obtainTokensForNamenodes(nns, hadoopConf, credentials)
228223
229224 val replication = sparkConf.getInt(" spark.yarn.submit.file.replication" ,
230225 fs.getDefaultReplication(dst)).toShort
@@ -249,8 +244,8 @@ private[spark] class Client(
249244 val destinationPath = copyFileToRemote(dst, localPath, replication)
250245 val destFs = FileSystem .get(destinationPath.toUri(), hadoopConf)
251246 distCacheMgr.addResource(
252- destFs, hadoopConf, destinationPath, localResources, LocalResourceType .FILE , keytabFileName,
253- statCache, appMasterOnly = true )
247+ destFs, hadoopConf, destinationPath, localResources, LocalResourceType .FILE ,
248+ sparkConf.get( " spark.yarn.keytab " ), statCache, appMasterOnly = true )
254249 }
255250
256251 /**
@@ -334,8 +329,11 @@ private[spark] class Client(
334329 env(" SPARK_YARN_STAGING_DIR" ) = stagingDir
335330 env(" SPARK_USER" ) = UserGroupInformation .getCurrentUser().getShortUserName()
336331 if (loginFromKeytab) {
337- env(" SPARK_PRINCIPAL" ) = args.principal
338- env(" SPARK_KEYTAB" ) = keytabFileName
332+ val remoteFs = FileSystem .get(hadoopConf)
333+ val stagingDirPath = new Path (remoteFs.getHomeDirectory, stagingDir)
334+ val credentialsFile = " credentials-" + UUID .randomUUID().toString
335+ sparkConf.set(
336+ " spark.yarn.credentials.file" , new Path (stagingDirPath, credentialsFile).toString)
339337 }
340338
341339 // Set the environment variables to be passed on to the executors.
@@ -573,13 +571,13 @@ private[spark] class Client(
573571 val f = new File (args.keytab)
574572 // Generate a file name that can be used for the keytab file, that does not conflict
575573 // with any user file.
576- keytabFileName = f.getName + " -" + UUID .randomUUID().toString
574+ val keytabFileName = f.getName + " -" + UUID .randomUUID().toString
577575 val ugi = UserGroupInformation .loginUserFromKeytabAndReturnUGI(args.principal, args.keytab)
578576 credentials = ugi.getCredentials
579577 loginFromKeytab = true
580- val credentialsFile = " credentials- " + UUID .randomUUID().toString
581- sparkConf.set(" spark.yarn.credentials.file " , credentialsFile )
582- logInfo(" Successfully logged into Kerberos ." )
578+ sparkConf.set( " spark.yarn.keytab " , keytabFileName)
579+ sparkConf.set(" spark.yarn.principal " , args.principal )
580+ logInfo(" Successfully logged into the KDC ." )
583581 } else {
584582 credentials = UserGroupInformation .getCurrentUser.getCredentials
585583 }
0 commit comments