Skip to content

Commit b4cb917

Browse files
Send keytab to AM via DistributedCache rather than directly via HDFS
1 parent 0985b4e commit b4cb917

File tree

2 files changed

+26
-50
lines changed

2 files changed

+26
-50
lines changed

yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

Lines changed: 20 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.io.File
2121
import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
2222
import java.nio.ByteBuffer
2323
import java.nio.file.Files
24+
import java.util.UUID
2425

2526
import scala.collection.JavaConversions._
2627
import scala.collection.mutable.{ArrayBuffer, HashMap, ListBuffer, Map}
@@ -222,8 +223,10 @@ private[spark] class Client(
222223
// and add them as local resources to the application master.
223224
val fs = FileSystem.get(hadoopConf)
224225
val dst = new Path(fs.getHomeDirectory(), appStagingDir)
225-
val nns = getNameNodesToAccess(sparkConf) + dst
226-
obtainTokensForNamenodes(nns, hadoopConf, credentials)
226+
val nns =
227+
SparkHadoopUtil.get.asInstanceOf[YarnSparkHadoopUtil].getNameNodesToAccess(sparkConf) + dst
228+
SparkHadoopUtil.get.asInstanceOf[YarnSparkHadoopUtil].
229+
obtainTokensForNamenodes(nns, hadoopConf, credentials)
227230

228231
val replication = sparkConf.getInt("spark.yarn.submit.file.replication",
229232
fs.getDefaultReplication(dst)).toShort
@@ -240,6 +243,20 @@ private[spark] class Client(
240243
"for alternatives.")
241244
}
242245

246+
// If we passed in a keytab, make sure we copy the keytab to the staging directory on
247+
// HDFS, and setup the relevant environment vars, so the AM can login again.
248+
if (loginFromKeytab) {
249+
val fs = FileSystem.get(hadoopConf)
250+
val stagingDirPath = new Path(fs.getHomeDirectory, appStagingDir)
251+
val localUri = new URI(args.keytab)
252+
val localPath = getQualifiedLocalPath(localUri, hadoopConf)
253+
val destinationPath = new Path(stagingDirPath, keytabFileName)
254+
copyFileToRemote(destinationPath, localPath, replication)
255+
distCacheMgr.addResource(
256+
fs, hadoopConf, destinationPath, localResources, LocalResourceType.FILE, keytabFileName,
257+
statCache, appMasterOnly = true)
258+
}
259+
243260
/**
244261
* Copy the given main resource to the distributed cache if the scheme is not "local".
245262
* Otherwise, set the corresponding key in our SparkConf to handle it downstream.
@@ -320,22 +337,11 @@ private[spark] class Client(
320337
env("SPARK_YARN_MODE") = "true"
321338
env("SPARK_YARN_STAGING_DIR") = stagingDir
322339
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
323-
// If we logged in from keytab, make sure we copy the keytab to the staging directory on
324-
// HDFS, and setup the relevant environment vars, so the AM can login again.
325340
if (loginFromKeytab) {
326-
val fs = FileSystem.get(hadoopConf)
327-
val stagingDirPath = new Path(fs.getHomeDirectory, stagingDir)
328-
val localUri = new URI(args.keytab)
329-
val localPath = getQualifiedLocalPath(localUri, hadoopConf)
330-
val destinationPath = new Path(stagingDirPath, keytabFileName)
331-
val replication = sparkConf.getInt("spark.yarn.submit.file.replication",
332-
fs.getDefaultReplication(destinationPath)).toShort
333-
copyFileToRemote(destinationPath, localPath, replication)
334341
env("SPARK_PRINCIPAL") = args.principal
335342
env("SPARK_KEYTAB") = keytabFileName
336343
}
337344

338-
339345
// Set the environment variables to be passed on to the executors.
340346
distCacheMgr.setDistFilesEnv(env)
341347
distCacheMgr.setDistArchivesEnv(env)
@@ -571,7 +577,7 @@ private[spark] class Client(
571577
val f = new File(args.keytab)
572578
// Generate a file name that can be used for the keytab file, that does not conflict
573579
// with any user file.
574-
keytabFileName = f.getName + "-" + System.currentTimeMillis()
580+
keytabFileName = f.getName + "-" + UUID.randomUUID()
575581
val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(args.principal, args.keytab)
576582
credentials = ugi.getCredentials
577583
loginFromKeytab = true
@@ -903,28 +909,6 @@ object Client extends Logging {
903909
private def addClasspathEntry(path: String, env: HashMap[String, String]): Unit =
904910
YarnSparkHadoopUtil.addPathToEnvironment(env, Environment.CLASSPATH.name, path)
905911

906-
/**
907-
* Get the list of namenodes the user may access.
908-
*/
909-
private[yarn] def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = {
910-
SparkHadoopUtil.get.asInstanceOf[YarnSparkHadoopUtil].getNameNodesToAccess(sparkConf)
911-
}
912-
913-
private[yarn] def getTokenRenewer(conf: Configuration): String = {
914-
SparkHadoopUtil.get.asInstanceOf[YarnSparkHadoopUtil].getTokenRenewer(conf)
915-
}
916-
917-
/**
918-
* Obtains tokens for the namenodes passed in and adds them to the credentials.
919-
*/
920-
private def obtainTokensForNamenodes(
921-
paths: Set[Path],
922-
conf: Configuration,
923-
creds: Credentials): Unit = {
924-
SparkHadoopUtil.get.asInstanceOf[YarnSparkHadoopUtil]
925-
.obtainTokensForNamenodes(paths, conf, creds)
926-
}
927-
928912
/**
929913
* Return whether the two file systems are the same.
930914
*/

yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -100,18 +100,6 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
100100
private[spark] override def scheduleLoginFromKeytab(
101101
callback: (String) => Unit): Unit = {
102102
if (principal != null) {
103-
val stagingDir = System.getenv("SPARK_YARN_STAGING_DIR")
104-
val remoteFs = FileSystem.get(conf)
105-
val remoteKeytabPath = new Path(
106-
remoteFs.getHomeDirectory, stagingDir + Path.SEPARATOR + keytab)
107-
val localFS = FileSystem.getLocal(conf)
108-
// At this point, SparkEnv is likely no initialized, so create a dir, put the keytab there.
109-
val tempDir = Utils.createTempDir()
110-
Utils.chmod700(tempDir)
111-
val localURI = new URI(tempDir.getAbsolutePath + Path.SEPARATOR + keytab)
112-
val qualifiedURI = new URI(localFS.makeQualified(new Path(localURI)).toString)
113-
FileUtil.copy(
114-
remoteFs, remoteKeytabPath, localFS, new Path(qualifiedURI), false, false, conf)
115103
// Get the current credentials, find out when they expire.
116104
val creds = {
117105
if (loggedInUGI == null) {
@@ -131,13 +119,17 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
131119
new Runnable {
132120
override def run(): Unit = {
133121
if (!loggedInViaKeytab) {
122+
// Keytab is copied by YARN to the working directory of the AM, so full path is
123+
// not needed.
134124
loggedInUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(
135-
principal, tempDir.getAbsolutePath + Path.SEPARATOR + keytab)
125+
principal, keytab)
136126
loggedInViaKeytab = true
137127
}
138-
val nns = getNameNodesToAccess(sparkConf) + remoteKeytabPath
128+
val nns = getNameNodesToAccess(sparkConf)
139129
val newCredentials = loggedInUGI.getCredentials
140130
obtainTokensForNamenodes(nns, conf, newCredentials)
131+
val remoteFs = FileSystem.get(conf)
132+
val stagingDir = System.getenv("SPARK_YARN_STAGING_DIR")
141133
val tokenPath = new Path(remoteFs.getHomeDirectory, stagingDir + Path.SEPARATOR +
142134
"credentials - " + System.currentTimeMillis())
143135
val stream = remoteFs.create(tokenPath, true)

0 commit comments

Comments
 (0)