Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 43 additions & 23 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -208,14 +208,20 @@ object SparkSubmit extends CommandLineUtils {

/**
* Prepare the environment for submitting an application.
* This returns a 4-tuple:
* (1) the arguments for the child process,
* (2) a list of classpath entries for the child,
* (3) a map of system properties, and
* (4) the main class for the child
*
* @param args the parsed SparkSubmitArguments used for environment preparation.
* @param conf the Hadoop Configuration, this argument will only be set in unit test.
* @return a 4-tuple:
* (1) the arguments for the child process,
* (2) a list of classpath entries for the child,
* (3) a map of system properties, and
* (4) the main class for the child
*
* Exposed for testing.
*/
private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments)
private[deploy] def prepareSubmitEnvironment(
args: SparkSubmitArguments,
conf: Option[HadoopConfiguration] = None)
: (Seq[String], Seq[String], Map[String, String], String) = {
// Return values
val childArgs = new ArrayBuffer[String]()
Expand Down Expand Up @@ -311,12 +317,16 @@ object SparkSubmit extends CommandLineUtils {
}

// In client mode, download remote files.
var localPrimaryResource: String = null
var localJars: String = null
var localPyFiles: String = null
var localFiles: String = null
if (deployMode == CLIENT) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to avoid download for yarn, can we just check the cluster mode it here?

val hadoopConf = new HadoopConfiguration()
args.primaryResource = Option(args.primaryResource).map(downloadFile(_, hadoopConf)).orNull
args.jars = Option(args.jars).map(downloadFileList(_, hadoopConf)).orNull
args.pyFiles = Option(args.pyFiles).map(downloadFileList(_, hadoopConf)).orNull
args.files = Option(args.files).map(downloadFileList(_, hadoopConf)).orNull
val hadoopConf = conf.getOrElse(new HadoopConfiguration())
localPrimaryResource = Option(args.primaryResource).map(downloadFile(_, hadoopConf)).orNull
localJars = Option(args.jars).map(downloadFileList(_, hadoopConf)).orNull
localPyFiles = Option(args.pyFiles).map(downloadFileList(_, hadoopConf)).orNull
localFiles = Option(args.files).map(downloadFileList(_, hadoopConf)).orNull
}

// Require all python files to be local, so we can add them to the PYTHONPATH
Expand Down Expand Up @@ -366,7 +376,7 @@ object SparkSubmit extends CommandLineUtils {
// If a python file is provided, add it to the child arguments and list of files to deploy.
// Usage: PythonAppRunner <main python file> <extra python files> [app arguments]
args.mainClass = "org.apache.spark.deploy.PythonRunner"
args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs
args.childArgs = ArrayBuffer(localPrimaryResource, localPyFiles) ++ args.childArgs
if (clusterManager != YARN) {
// The YARN backend distributes the primary file differently, so don't merge it.
args.files = mergeFileLists(args.files, args.primaryResource)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a behavior change here. I think we should use localFiles and localPrimaryResource instead of args.files and args.primaryResource.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you think it should use local files/primary resource here? args.files will be assigned to "spark.files" for non-yarn deploy, and Spark's fileserver will download them to local for all the executors, so it should be fine to keep as remote resources.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also here the changes restricted to client deploy mode, I'm not sure why it is related to standalone cluster mode?

Expand All @@ -376,8 +386,8 @@ object SparkSubmit extends CommandLineUtils {
// The YARN backend handles python files differently, so don't merge the lists.
args.files = mergeFileLists(args.files, args.pyFiles)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same reason as above.

}
if (args.pyFiles != null) {
sysProps("spark.submit.pyFiles") = args.pyFiles
if (localPyFiles != null) {
sysProps("spark.submit.pyFiles") = localPyFiles
}
}

Expand Down Expand Up @@ -431,7 +441,7 @@ object SparkSubmit extends CommandLineUtils {
// If an R file is provided, add it to the child arguments and list of files to deploy.
// Usage: RRunner <main R file> [app arguments]
args.mainClass = "org.apache.spark.deploy.RRunner"
args.childArgs = ArrayBuffer(args.primaryResource) ++ args.childArgs
args.childArgs = ArrayBuffer(localPrimaryResource) ++ args.childArgs
args.files = mergeFileLists(args.files, args.primaryResource)
}
}
Expand Down Expand Up @@ -468,6 +478,7 @@ object SparkSubmit extends CommandLineUtils {
OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.queue"),
OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES,
sysProp = "spark.executor.instances"),
OptionAssigner(args.pyFiles, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.pyFiles"),
OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.jars"),
OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.files"),
OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.archives"),
Expand All @@ -491,15 +502,28 @@ object SparkSubmit extends CommandLineUtils {
sysProp = "spark.driver.cores"),
OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER,
sysProp = "spark.driver.supervise"),
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy")
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy"),

// An internal option used only for spark-shell to add user jars to repl's classloader,
// previously it uses "spark.jars" or "spark.yarn.dist.jars" which now may be pointed to
// remote jars, so adding a new option to only specify local jars for spark-shell internally.
OptionAssigner(localJars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.repl.local.jars")
)

// In client mode, launch the application main class directly
// In addition, add the main application jar and any added jars (if any) to the classpath
// Also add the main application jar and any added jars to classpath in case YARN client
// requires these jars.
if (deployMode == CLIENT || isYarnCluster) {
if (deployMode == CLIENT) {
childMainClass = args.mainClass
if (localPrimaryResource != null && isUserJar(localPrimaryResource)) {
childClasspath += localPrimaryResource
}
if (localJars != null) { childClasspath ++= localJars.split(",") }
}
// Add the main application jar and any added jars to classpath in case YARN client
// requires these jars.
// This assumes both primaryResource and user jars are local jars, otherwise it will not be
// added to the classpath of YARN client.
if (isYarnCluster) {
if (isUserJar(args.primaryResource)) {
childClasspath += args.primaryResource
}
Expand Down Expand Up @@ -556,10 +580,6 @@ object SparkSubmit extends CommandLineUtils {
if (args.isPython) {
sysProps.put("spark.yarn.isPython", "true")
}

if (args.pyFiles != null) {
sysProps("spark.submit.pyFiles") = args.pyFiles
}
}

// assure a keytab is available from any place in a JVM
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ package object config {
.intConf
.createOptional

private[spark] val PY_FILES = ConfigBuilder("spark.submit.pyFiles")
private[spark] val PY_FILES = ConfigBuilder("spark.yarn.dist.pyFiles")
.internal()
.stringConf
.toSequence
Expand Down
25 changes: 15 additions & 10 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2580,18 +2580,23 @@ private[spark] object Utils extends Logging {
}

/**
* In YARN mode this method returns a union of the jar files pointed by "spark.jars" and the
* "spark.yarn.dist.jars" properties, while in other modes it returns the jar files pointed by
* only the "spark.jars" property.
* Return the jar files pointed by the "spark.jars" property. Spark internally will distribute
* these jars through file server. In the YARN mode, it will return an empty list, since YARN
* has its own mechanism to distribute jars.
*/
def getUserJars(conf: SparkConf, isShell: Boolean = false): Seq[String] = {
def getUserJars(conf: SparkConf): Seq[String] = {
val sparkJars = conf.getOption("spark.jars")
if (conf.get("spark.master") == "yarn" && isShell) {
val yarnJars = conf.getOption("spark.yarn.dist.jars")
unionFileLists(sparkJars, yarnJars).toSeq
} else {
sparkJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
}
sparkJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
}

/**
* Return the local jar files which will be added to REPL's classpath. These jar files are
* specified by --jars (spark.jars) or --packages, remote jars will be downloaded to local by
* SparkSubmit at first.
*/
def getLocalUserJarsForShell(conf: SparkConf): Seq[String] = {
val localJars = conf.getOption("spark.repl.local.jars")
localJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
}

private[spark] val REDACTION_REPLACEMENT_TEXT = "*********(redacted)"
Expand Down
68 changes: 55 additions & 13 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.io.Source
import com.google.common.io.ByteStreams
import org.apache.commons.io.{FilenameUtils, FileUtils}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
import org.scalatest.{BeforeAndAfterEach, Matchers}
import org.scalatest.concurrent.Timeouts
import org.scalatest.time.SpanSugar._
Expand Down Expand Up @@ -738,10 +738,7 @@ class SparkSubmitSuite

test("downloadFile - file doesn't exist") {
val hadoopConf = new Configuration()
// Set s3a implementation to local file system for testing.
hadoopConf.set("fs.s3a.impl", "org.apache.spark.deploy.TestFileSystem")
// Disable file system impl cache to make sure the test file system is picked up.
hadoopConf.set("fs.s3a.impl.disable.cache", "true")
updateConfWithFakeS3Fs(hadoopConf)
intercept[FileNotFoundException] {
SparkSubmit.downloadFile("s3a:/no/such/file", hadoopConf)
}
Expand All @@ -759,10 +756,7 @@ class SparkSubmitSuite
val content = "hello, world"
FileUtils.write(jarFile, content)
val hadoopConf = new Configuration()
// Set s3a implementation to local file system for testing.
hadoopConf.set("fs.s3a.impl", "org.apache.spark.deploy.TestFileSystem")
// Disable file system impl cache to make sure the test file system is picked up.
hadoopConf.set("fs.s3a.impl.disable.cache", "true")
updateConfWithFakeS3Fs(hadoopConf)
val sourcePath = s"s3a://${jarFile.getAbsolutePath}"
val outputPath = SparkSubmit.downloadFile(sourcePath, hadoopConf)
checkDownloadedFile(sourcePath, outputPath)
Expand All @@ -775,10 +769,7 @@ class SparkSubmitSuite
val content = "hello, world"
FileUtils.write(jarFile, content)
val hadoopConf = new Configuration()
// Set s3a implementation to local file system for testing.
hadoopConf.set("fs.s3a.impl", "org.apache.spark.deploy.TestFileSystem")
// Disable file system impl cache to make sure the test file system is picked up.
hadoopConf.set("fs.s3a.impl.disable.cache", "true")
updateConfWithFakeS3Fs(hadoopConf)
val sourcePaths = Seq("/local/file", s"s3a://${jarFile.getAbsolutePath}")
val outputPaths = SparkSubmit.downloadFileList(sourcePaths.mkString(","), hadoopConf).split(",")

Expand All @@ -789,6 +780,43 @@ class SparkSubmitSuite
}
}

test("Avoid re-upload remote resources in yarn client mode") {
val hadoopConf = new Configuration()
updateConfWithFakeS3Fs(hadoopConf)

val tmpDir = Utils.createTempDir()
val file = File.createTempFile("tmpFile", "", tmpDir)
val pyFile = File.createTempFile("tmpPy", ".egg", tmpDir)
val mainResource = File.createTempFile("tmpPy", ".py", tmpDir)
val tmpJar = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"), tmpDir)
val tmpJarPath = s"s3a://${new File(tmpJar.toURI).getAbsolutePath}"

val args = Seq(
"--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"),
"--name", "testApp",
"--master", "yarn",
"--deploy-mode", "client",
"--jars", tmpJarPath,
"--files", s"s3a://${file.getAbsolutePath}",
"--py-files", s"s3a://${pyFile.getAbsolutePath}",
s"s3a://$mainResource"
)

val appArgs = new SparkSubmitArguments(args)
val sysProps = SparkSubmit.prepareSubmitEnvironment(appArgs, Some(hadoopConf))._3

// All the resources should still be remote paths, so that YARN client will not upload again.
sysProps("spark.yarn.dist.jars") should be (tmpJarPath)
sysProps("spark.yarn.dist.files") should be (s"s3a://${file.getAbsolutePath}")
sysProps("spark.yarn.dist.pyFiles") should be (s"s3a://${pyFile.getAbsolutePath}")

// Local repl jars should be a local path.
sysProps("spark.repl.local.jars") should (startWith("file:"))

// local py files should not be a URI format.
sysProps("spark.submit.pyFiles") should (startWith("/"))
}

// NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
private def runSparkSubmit(args: Seq[String]): Unit = {
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
Expand Down Expand Up @@ -828,6 +856,11 @@ class SparkSubmitSuite
Utils.deleteRecursively(tmpDir)
}
}

private def updateConfWithFakeS3Fs(conf: Configuration): Unit = {
conf.set("fs.s3a.impl", classOf[TestFileSystem].getCanonicalName)
conf.set("fs.s3a.impl.disable.cache", "true")
}
}

object JarCreationTest extends Logging {
Expand Down Expand Up @@ -897,4 +930,13 @@ class TestFileSystem extends org.apache.hadoop.fs.LocalFileSystem {
// Ignore the scheme for testing.
super.copyToLocalFile(new Path(src.toUri.getPath), dst)
}

override def globStatus(pathPattern: Path): Array[FileStatus] = {
val newPath = new Path(pathPattern.toUri.getPath)
super.globStatus(newPath).map { status =>
val path = s"s3a://${status.getPath.toUri.getPath}"
status.setPath(new Path(path))
status
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1066,7 +1066,7 @@ class SparkILoop(
logWarning("ADD_JARS environment variable is deprecated, use --jar spark submit argument instead")
}
val jars = {
val userJars = Utils.getUserJars(conf, isShell = true)
val userJars = Utils.getLocalUserJarsForShell(conf)
if (userJars.isEmpty) {
envJars.getOrElse("")
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ object Main extends Logging {
// Visible for testing
private[repl] def doMain(args: Array[String], _interp: SparkILoop): Unit = {
interp = _interp
val jars = Utils.getUserJars(conf, isShell = true)
val jars = Utils.getLocalUserJarsForShell(conf)
// Remove file:///, file:// or file:/ scheme if exists for each jar
.map { x => if (x.startsWith("file:")) new File(new URI(x)).getPath else x }
.mkString(File.pathSeparator)
Expand Down