Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 108 additions & 74 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

package org.apache.spark.deploy.yarn

import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream, IOException}
import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream, IOException,
OutputStreamWriter}
import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
import java.nio.ByteBuffer
import java.security.PrivilegedExceptionAction
import java.util.UUID
import java.util.{Properties, UUID}
import java.util.zip.{ZipEntry, ZipOutputStream}

import scala.collection.JavaConversions._
Expand All @@ -30,6 +31,7 @@ import scala.reflect.runtime.universe
import scala.util.{Try, Success, Failure}
import scala.util.control.NonFatal

import com.google.common.base.Charsets.UTF_8
import com.google.common.base.Objects
import com.google.common.io.Files

Expand Down Expand Up @@ -278,20 +280,6 @@ private[spark] class Client(
"for alternatives.")
}

// If we passed in a keytab, make sure we copy the keytab to the staging directory on
// HDFS, and setup the relevant environment vars, so the AM can login again.
if (loginFromKeytab) {
logInfo("To enable the AM to login from keytab, credentials are being copied over to the AM" +
" via the YARN Secure Distributed Cache.")
val localUri = new URI(args.keytab)
val localPath = getQualifiedLocalPath(localUri, hadoopConf)
val destinationPath = copyFileToRemote(dst, localPath, replication)
val destFs = FileSystem.get(destinationPath.toUri(), hadoopConf)
distCacheMgr.addResource(
destFs, hadoopConf, destinationPath, localResources, LocalResourceType.FILE,
sparkConf.get("spark.yarn.keytab"), statCache, appMasterOnly = true)
}

def addDistributedUri(uri: URI): Boolean = {
val uriStr = uri.toString()
if (distributedUris.contains(uriStr)) {
Expand All @@ -303,6 +291,58 @@ private[spark] class Client(
}
}

/**
* Distribute a file to the cluster.
*
* If the file's path is a "local:" URI, it's actually not distributed. Other files are copied
* to HDFS (if not already there) and added to the application's distributed cache.
*
* @param path URI of the file to distribute.
* @param resType Type of resource being distributed.
* @param destName Name of the file in the distributed cache.
* @param targetDir Subdirectory where to place the file.
* @param appMasterOnly Whether to distribute only to the AM.
* @return A 2-tuple. First item is whether the file is a "local:" URI. Second item is the
* localized path for non-local paths, or the input `path` for local paths.
* The localized path will be null if the URI has already been added to the cache.
*/
def distribute(
path: String,
resType: LocalResourceType = LocalResourceType.FILE,
destName: Option[String] = None,
targetDir: Option[String] = None,
appMasterOnly: Boolean = false): (Boolean, String) = {
val localURI = new URI(path.trim())
if (localURI.getScheme != LOCAL_SCHEME) {
if (addDistributedUri(localURI)) {
val localPath = getQualifiedLocalPath(localURI, hadoopConf)
val linkname = targetDir.map(_ + "/").getOrElse("") +
destName.orElse(Option(localURI.getFragment())).getOrElse(localPath.getName())
val destPath = copyFileToRemote(dst, localPath, replication)
val destFs = FileSystem.get(destPath.toUri(), hadoopConf)
distCacheMgr.addResource(
destFs, hadoopConf, destPath, localResources, resType, linkname, statCache,
appMasterOnly = appMasterOnly)
(false, linkname)
} else {
(false, null)
}
} else {
(true, path.trim())
}
}

// If we passed in a keytab, make sure we copy the keytab to the staging directory on
// HDFS, and setup the relevant environment vars, so the AM can login again.
if (loginFromKeytab) {
logInfo("To enable the AM to login from keytab, credentials are being copied " +
"over to the AM via the YARN Secure Distributed Cache.")
val (_, localizedPath) = distribute(args.keytab,
destName = Some(sparkConf.get("spark.yarn.keytab")),
appMasterOnly = true)
require(localizedPath != null, "Keytab file already distributed.")
}

/**
* Copy the given main resource to the distributed cache if the scheme is not "local".
* Otherwise, set the corresponding key in our SparkConf to handle it downstream.
Expand All @@ -315,33 +355,17 @@ private[spark] class Client(
(SPARK_JAR, sparkJar(sparkConf), CONF_SPARK_JAR),
(APP_JAR, args.userJar, CONF_SPARK_USER_JAR),
("log4j.properties", oldLog4jConf.orNull, null)
).foreach { case (destName, _localPath, confKey) =>
val localPath: String = if (_localPath != null) _localPath.trim() else ""
if (!localPath.isEmpty()) {
val localURI = new URI(localPath)
if (localURI.getScheme != LOCAL_SCHEME) {
if (addDistributedUri(localURI)) {
val src = getQualifiedLocalPath(localURI, hadoopConf)
val destPath = copyFileToRemote(dst, src, replication)
val destFs = FileSystem.get(destPath.toUri(), hadoopConf)
distCacheMgr.addResource(destFs, hadoopConf, destPath,
localResources, LocalResourceType.FILE, destName, statCache)
}
} else if (confKey != null) {
).foreach { case (destName, path, confKey) =>
if (path != null && !path.trim().isEmpty()) {
val (isLocal, localizedPath) = distribute(path, destName = Some(destName))
if (isLocal && confKey != null) {
require(localizedPath != null, s"Path $path already distributed.")
// If the resource is intended for local use only, handle this downstream
// by setting the appropriate property
sparkConf.set(confKey, localPath)
sparkConf.set(confKey, localizedPath)
}
}
}

createConfArchive().foreach { file =>
require(addDistributedUri(file.toURI()))
val destPath = copyFileToRemote(dst, new Path(file.toURI()), replication)
distCacheMgr.addResource(fs, hadoopConf, destPath, localResources, LocalResourceType.ARCHIVE,
LOCALIZED_HADOOP_CONF_DIR, statCache, appMasterOnly = true)
}

/**
* Do the same for any additional resources passed in through ClientArguments.
* Each resource category is represented by a 3-tuple of:
Expand All @@ -357,21 +381,10 @@ private[spark] class Client(
).foreach { case (flist, resType, addToClasspath) =>
if (flist != null && !flist.isEmpty()) {
flist.split(',').foreach { file =>
val localURI = new URI(file.trim())
if (localURI.getScheme != LOCAL_SCHEME) {
if (addDistributedUri(localURI)) {
val localPath = new Path(localURI)
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
val destPath = copyFileToRemote(dst, localPath, replication)
distCacheMgr.addResource(
fs, hadoopConf, destPath, localResources, resType, linkname, statCache)
if (addToClasspath) {
cachedSecondaryJarLinks += linkname
}
}
} else if (addToClasspath) {
// Resource is intended for local use only and should be added to the class path
cachedSecondaryJarLinks += file.trim()
val (_, localizedPath) = distribute(file, resType = resType)
require(localizedPath != null)
if (addToClasspath) {
cachedSecondaryJarLinks += localizedPath
}
}
}
Expand All @@ -380,11 +393,22 @@ private[spark] class Client(
sparkConf.set(CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(","))
}

if (isClusterMode && args.primaryPyFile != null) {
distribute(args.primaryPyFile, appMasterOnly = true)
}

// Distribute an archive with Hadoop and Spark configuration for the AM.
val (_, confLocalizedPath) = distribute(createConfArchive().getAbsolutePath(),
resType = LocalResourceType.ARCHIVE,
destName = Some(LOCALIZED_CONF_DIR),
appMasterOnly = true)
require(confLocalizedPath != null)

localResources
}

/**
* Create an archive with the Hadoop config files for distribution.
* Create an archive with the config files for distribution.
*
* These are only used by the AM, since executors will use the configuration object broadcast by
* the driver. The files are zipped and added to the job as an archive, so that YARN will explode
Expand All @@ -396,8 +420,11 @@ private[spark] class Client(
*
* Currently this makes a shallow copy of the conf directory. If there are cases where a
* Hadoop config directory contains subdirectories, this code will have to be fixed.
*
* The archive also contains some Spark configuration. Namely, it saves the contents of
* SparkConf in a file to be loaded by the AM process.
*/
private def createConfArchive(): Option[File] = {
private def createConfArchive(): File = {
val hadoopConfFiles = new HashMap[String, File]()
Seq("HADOOP_CONF_DIR", "YARN_CONF_DIR").foreach { envKey =>
sys.env.get(envKey).foreach { path =>
Expand All @@ -412,28 +439,32 @@ private[spark] class Client(
}
}

if (!hadoopConfFiles.isEmpty) {
val hadoopConfArchive = File.createTempFile(LOCALIZED_HADOOP_CONF_DIR, ".zip",
new File(Utils.getLocalDir(sparkConf)))
val confArchive = File.createTempFile(LOCALIZED_CONF_DIR, ".zip",
new File(Utils.getLocalDir(sparkConf)))
val confStream = new ZipOutputStream(new FileOutputStream(confArchive))

val hadoopConfStream = new ZipOutputStream(new FileOutputStream(hadoopConfArchive))
try {
hadoopConfStream.setLevel(0)
hadoopConfFiles.foreach { case (name, file) =>
if (file.canRead()) {
hadoopConfStream.putNextEntry(new ZipEntry(name))
Files.copy(file, hadoopConfStream)
hadoopConfStream.closeEntry()
}
try {
confStream.setLevel(0)
hadoopConfFiles.foreach { case (name, file) =>
if (file.canRead()) {
confStream.putNextEntry(new ZipEntry(name))
Files.copy(file, confStream)
confStream.closeEntry()
}
} finally {
hadoopConfStream.close()
}

Some(hadoopConfArchive)
} else {
None
// Save Spark configuration to a file in the archive.
val props = new Properties()
sparkConf.getAll.foreach { case (k, v) => props.setProperty(k, v) }
confStream.putNextEntry(new ZipEntry(SPARK_CONF_FILE))
val writer = new OutputStreamWriter(confStream, UTF_8)
props.store(writer, "Spark configuration.")
writer.flush()
confStream.closeEntry()
} finally {
confStream.close()
}
confArchive
}

/**
Expand Down Expand Up @@ -912,7 +943,10 @@ object Client extends Logging {
val ENV_DIST_CLASSPATH = "SPARK_DIST_CLASSPATH"

// Subdirectory where the user's hadoop config files will be placed.
val LOCALIZED_HADOOP_CONF_DIR = "__hadoop_conf__"
val LOCALIZED_CONF_DIR = "__hadoop_conf__"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did you change the name of this? please revert as previous name was more descriptive and the change is unnecessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I got it


// Name of the file in the conf archive containing Spark configuration.
val SPARK_CONF_FILE = "__spark_conf__.properties"

/**
* Find the user-defined Spark jar if configured, or return the jar containing this
Expand Down Expand Up @@ -1037,7 +1071,7 @@ object Client extends Logging {
if (isAM) {
addClasspathEntry(
YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR +
LOCALIZED_HADOOP_CONF_DIR, env)
LOCALIZED_CONF_DIR, env)
}

if (sparkConf.getBoolean("spark.yarn.user.classpath.first", false)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll {
Environment.PWD.$()
}
cp should contain(pwdVar)
cp should contain (s"$pwdVar${Path.SEPARATOR}${Client.LOCALIZED_HADOOP_CONF_DIR}")
cp should contain (s"$pwdVar${Path.SEPARATOR}${Client.LOCALIZED_CONF_DIR}")
cp should not contain (Client.SPARK_JAR)
cp should not contain (Client.APP_JAR)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class YarnClusterSuite extends SparkFunSuite with BeforeAndAfterAll with Matcher
logInfo(s"RM address in configuration is ${config.get(YarnConfiguration.RM_ADDRESS)}")

fakeSparkJar = File.createTempFile("sparkJar", null, tempDir)
hadoopConfDir = new File(tempDir, Client.LOCALIZED_HADOOP_CONF_DIR)
hadoopConfDir = new File(tempDir, Client.LOCALIZED_CONF_DIR)
assert(hadoopConfDir.mkdir())
File.createTempFile("token", ".txt", hadoopConfDir)
}
Expand Down