Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ private[deploy] object DependencyUtils {
hadoopConf: Configuration,
secMgr: SecurityManager): String = {
require(fileList != null, "fileList cannot be null.")
fileList.split(",")
Utils.stringToSeq(fileList)
.map(downloadFile(_, targetDir, sparkConf, hadoopConf, secMgr))
.mkString(",")
}
Expand All @@ -121,6 +121,11 @@ private[deploy] object DependencyUtils {

uri.getScheme match {
case "file" | "local" => path
case "http" | "https" | "ftp" if Utils.isTesting =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vanzin , it is a little difficult to mock the download behavior, so here I check if "spark.testing" is configured, return a dummy local path if it is configured. What do you think about this approach?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

// This is only used for SparkSubmitSuite unit test. Instead of downloading file remotely,
// return a dummy local path instead.
val file = new File(uri.getPath)
new File(targetDir, file.getName).toURI.toString
case _ =>
val fname = new Path(uri).getName()
val localFile = Utils.doFetchFile(uri.toString(), targetDir, fname, sparkConf, secMgr,
Expand All @@ -131,7 +136,7 @@ private[deploy] object DependencyUtils {

def resolveGlobPaths(paths: String, hadoopConf: Configuration): String = {
require(paths != null, "paths cannot be null.")
paths.split(",").map(_.trim).filter(_.nonEmpty).flatMap { path =>
Utils.stringToSeq(paths).flatMap { path =>
val uri = Utils.resolveURI(path)
uri.getScheme match {
case "local" | "http" | "https" | "ftp" => Array(path)
Expand Down
51 changes: 49 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ import java.text.ParseException

import scala.annotation.tailrec
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
import scala.util.Properties
import scala.util.{Properties, Try}

import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.ivy.Ivy
Expand All @@ -48,6 +48,7 @@ import org.apache.spark._
import org.apache.spark.api.r.RUtils
import org.apache.spark.deploy.rest._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util._

Expand Down Expand Up @@ -367,6 +368,52 @@ object SparkSubmit extends CommandLineUtils with Logging {
}.orNull
}

// When running in YARN, for some remote resources with scheme:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it a problem only for YARN? Do standalone and Mesos have this problem?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a problem for YARN currently, because YARN uses dist cache to distribute resources to yarn cluster, dist cache requires supported Hadoop FS to copy resources, if our resource scheme is http, it will try to find http FS to handle such resource, which will be failed since no http FS supported in current Hadoop.

In standalone and Mesos cluster, we use Spark's internal logic to handle http resources, this logic handles well for the http(s) resources, so there should be no issue for standalone and mesos mode.

// 1. Hadoop FileSystem doesn't support them.
// 2. We explicitly bypass Hadoop FileSystem with "spark.yarn.dist.forceDownloadSchemes".
// We will download them to local disk prior to add to YARN's distributed cache.
// For yarn client mode, since we already download them with above code, so we only need to
// figure out the local path and replace the remote one.
if (clusterManager == YARN) {
sparkConf.setIfMissing(SecurityManager.SPARK_AUTH_SECRET_CONF, "unused")
val secMgr = new SecurityManager(sparkConf)
val forceDownloadSchemes = sparkConf.get(FORCE_DOWNLOAD_SCHEMES)

def shouldDownload(scheme: String): Boolean = {
forceDownloadSchemes.contains(scheme) ||
Try { FileSystem.getFileSystemClass(scheme, hadoopConf) }.isFailure
}

def downloadResource(resource: String): String = {
val uri = Utils.resolveURI(resource)
uri.getScheme match {
case "local" | "file" => resource
case e if shouldDownload(e) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we explicitly list "http" | "https" | "ftp"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it is not required, because shouldDownload logic will handle this. If 1) this resource scheme is blacklisted, or 2) it is not support by Hadoop, then Spark will handle this through downloadFile method. Since "http" | "https" | "ftp" is not supported by Hadoop before 2.9, so it implies that resources with such scheme will be handled by Spark itself.

val file = new File(targetDir, new Path(uri).getName)
if (file.exists()) {
file.toURI.toString
} else {
downloadFile(resource, targetDir, sparkConf, hadoopConf, secMgr)
}
case _ => uri.toString
}
}

args.primaryResource = Option(args.primaryResource).map { downloadResource }.orNull
args.files = Option(args.files).map { files =>
Utils.stringToSeq(files).map(downloadResource).mkString(",")
}.orNull
args.pyFiles = Option(args.pyFiles).map { pyFiles =>
Utils.stringToSeq(pyFiles).map(downloadResource).mkString(",")
}.orNull
args.jars = Option(args.jars).map { jars =>
Utils.stringToSeq(jars).map(downloadResource).mkString(",")
}.orNull
args.archives = Option(args.archives).map { archives =>
Utils.stringToSeq(archives).map(downloadResource).mkString(",")
}.orNull
}

// If we're running a python app, set the main class to our specific python runner
if (args.isPython && deployMode == CLIENT) {
if (args.primaryResource == PYSPARK_SHELL) {
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -400,4 +400,14 @@ package object config {
.doc("Memory to request as a multiple of the size that used to unroll the block.")
.doubleConf
.createWithDefault(1.5)

private[spark] val FORCE_DOWNLOAD_SCHEMES =
ConfigBuilder("spark.yarn.dist.forceDownloadSchemes")
.doc("Comma-separated list of schemes for which files will be downloaded to the " +
"local disk prior to being added to YARN's distributed cache. For use in cases " +
"where the YARN service does not support schemes that are supported by Spark, like http, " +
"https and ftp.")
.stringConf
.toSequence
.createWithDefault(Nil)
}
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2684,6 +2684,9 @@ private[spark] object Utils extends Logging {
redact(redactionPattern, kvs.toArray)
}

def stringToSeq(str: String): Seq[String] = {
str.split(",").map(_.trim()).filter(_.nonEmpty)
}
}

private[util] object CallerContext extends Logging {
Expand Down
65 changes: 65 additions & 0 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -897,6 +897,71 @@ class SparkSubmitSuite
sysProps("spark.submit.pyFiles") should (startWith("/"))
}

test("download remote resource if it is not supported by yarn service") {
testRemoteResources(isHttpSchemeBlacklisted = false, supportMockHttpFs = false)
}

test("avoid downloading remote resource if it is supported by yarn service") {
testRemoteResources(isHttpSchemeBlacklisted = false, supportMockHttpFs = true)
}

test("force download from blacklisted schemes") {
testRemoteResources(isHttpSchemeBlacklisted = true, supportMockHttpFs = true)
}

private def testRemoteResources(isHttpSchemeBlacklisted: Boolean,
supportMockHttpFs: Boolean): Unit = {
val hadoopConf = new Configuration()
updateConfWithFakeS3Fs(hadoopConf)
if (supportMockHttpFs) {
hadoopConf.set("fs.http.impl", classOf[TestFileSystem].getCanonicalName)
hadoopConf.set("fs.http.impl.disable.cache", "true")
}

val tmpDir = Utils.createTempDir()
val mainResource = File.createTempFile("tmpPy", ".py", tmpDir)
val tmpS3Jar = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"), tmpDir)
val tmpS3JarPath = s"s3a://${new File(tmpS3Jar.toURI).getAbsolutePath}"
val tmpHttpJar = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"), tmpDir)
val tmpHttpJarPath = s"http://${new File(tmpHttpJar.toURI).getAbsolutePath}"

val args = Seq(
"--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"),
"--name", "testApp",
"--master", "yarn",
"--deploy-mode", "client",
"--jars", s"$tmpS3JarPath,$tmpHttpJarPath",
s"s3a://$mainResource"
) ++ (
if (isHttpSchemeBlacklisted) {
Seq("--conf", "spark.yarn.dist.forceDownloadSchemes=http,https")
} else {
Nil
}
)

val appArgs = new SparkSubmitArguments(args)
val sysProps = SparkSubmit.prepareSubmitEnvironment(appArgs, Some(hadoopConf))._3

val jars = sysProps("spark.yarn.dist.jars").split(",").toSet

// The URI of remote S3 resource should still be remote.
assert(jars.contains(tmpS3JarPath))

if (supportMockHttpFs) {
// If Http FS is supported by yarn service, the URI of remote http resource should
// still be remote.
assert(jars.contains(tmpHttpJarPath))
} else {
// If Http FS is not supported by yarn service, or http scheme is configured to be force
// downloading, the URI of remote http resource should be changed to a local one.
val jarName = new File(tmpHttpJar.toURI).getName
val localHttpJar = jars.filter(_.contains(jarName))
localHttpJar.size should be(1)
localHttpJar.head should startWith("file:")
}
}

// NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
private def runSparkSubmit(args: Seq[String]): Unit = {
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
Expand Down
9 changes: 9 additions & 0 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,15 @@ To use a custom metrics.properties for the application master and executors, upd
Comma-separated list of jars to be placed in the working directory of each executor.
</td>
</tr>
<tr>
<td><code>spark.yarn.dist.forceDownloadSchemes</code></td>
<td><code>(none)</code></td>
<td>
Comma-separated list of schemes for which files will be downloaded to the local disk prior to
being added to YARN's distributed cache. For use in cases where the YARN service does not
support schemes that are supported by Spark, like http, https and ftp.
</td>
</tr>
<tr>
<td><code>spark.executor.instances</code></td>
<td><code>2</code></td>
Expand Down