From 838d301b9a2ef7c549fdf42c8013e6477e5c12f5 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 24 Feb 2015 17:43:46 -0800 Subject: [PATCH 01/15] [SPARK-5979] Made exclusions more refined --- .../org/apache/spark/deploy/SparkSubmit.scala | 48 +++++++++++++------ .../spark/deploy/SparkSubmitUtilsSuite.scala | 17 ++++++- 2 files changed, 49 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 4c4110812e0a1..c826e679e82db 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -655,8 +655,7 @@ private[spark] object SparkSubmitUtils { /** * Extracts maven coordinates from a comma-delimited string. Coordinates should be provided - * in the format `groupId:artifactId:version` or `groupId/artifactId:version`. The latter provides - * simplicity for Spark Package users. + * in the format `groupId:artifactId:version` or `groupId/artifactId:version`. * @param coordinates Comma-delimited string of maven coordinates * @return Sequence of Maven coordinates */ @@ -748,6 +747,35 @@ private[spark] object SparkSubmitUtils { } } + /** Add exclusion rules for dependencies already included in the spark-assembly */ + private[spark] def addExclusionRules( + ivySettings: IvySettings, + ivyConfName: String, + md: DefaultModuleDescriptor): Unit = { + // Add scala exclusion rule + val scalaArtifacts = new ArtifactId(new ModuleId("*", "scala-library"), "*", "*", "*") + val scalaDependencyExcludeRule = + new DefaultExcludeRule(scalaArtifacts, ivySettings.getMatcher("glob"), null) + scalaDependencyExcludeRule.addConfiguration(ivyConfName) + md.addExcludeRule(scalaDependencyExcludeRule) + + // We need to specify each component explicitly, otherwise we miss spark-streaming-kafka and + // other spark-streaming utility components. Underscore is there to differentiate between + // spark-streaming_2.1x and spark-streaming-kafka-assembly_2.1x + val components = Seq("bagel_", "catalyst_", "core_", "graphx_", "hive_", "mllib_", "repl_", + "sql_", "streaming_", "yarn_", "network-common_", "network-shuffle_", "network-yarn_") + + components.foreach { comp => + val sparkArtifacts = + new ArtifactId(new ModuleId("org.apache.spark", s"spark-$comp*"), "*", "*", "*") + val sparkDependencyExcludeRule = + new DefaultExcludeRule(sparkArtifacts, ivySettings.getMatcher("glob"), null) + sparkDependencyExcludeRule.addConfiguration(ivyConfName) + + md.addExcludeRule(sparkDependencyExcludeRule) + } + } + /** A nice function to use in tests as well. Values are dummy strings. */ private[spark] def getModuleDescriptor = DefaultModuleDescriptor.newDefaultInstance( ModuleRevisionId.newInstance("org.apache.spark", "spark-submit-parent", "1.0")) @@ -811,19 +839,9 @@ private[spark] object SparkSubmitUtils { val md = getModuleDescriptor md.setDefaultConf(ivyConfName) - // Add an exclusion rule for Spark and Scala Library - val sparkArtifacts = new ArtifactId(new ModuleId("org.apache.spark", "*"), "*", "*", "*") - val sparkDependencyExcludeRule = - new DefaultExcludeRule(sparkArtifacts, ivySettings.getMatcher("glob"), null) - sparkDependencyExcludeRule.addConfiguration(ivyConfName) - val scalaArtifacts = new ArtifactId(new ModuleId("*", "scala-library"), "*", "*", "*") - val scalaDependencyExcludeRule = - new DefaultExcludeRule(scalaArtifacts, ivySettings.getMatcher("glob"), null) - scalaDependencyExcludeRule.addConfiguration(ivyConfName) - - // Exclude any Spark dependencies, and add all supplied maven artifacts as dependencies - md.addExcludeRule(sparkDependencyExcludeRule) - md.addExcludeRule(scalaDependencyExcludeRule) + // Add exclusion rules for Spark and Scala Library + addExclusionRules(ivySettings, ivyConfName, md) + // add all supplied maven artifacts as dependencies addDependenciesToIvy(md, artifacts, ivyConfName) // resolve dependencies diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala index ad62b35f624f6..ffba8e0c2c51a 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.deploy import java.io.{PrintStream, OutputStream, File} +import org.apache.spark.util.Utils + import scala.collection.mutable.ArrayBuffer import org.scalatest.{BeforeAndAfterAll, FunSuite} @@ -26,6 +28,8 @@ import org.scalatest.{BeforeAndAfterAll, FunSuite} import org.apache.ivy.core.module.descriptor.MDArtifact import org.apache.ivy.plugins.resolver.IBiblioResolver +import org.apache.spark.TestUtils + class SparkSubmitUtilsSuite extends FunSuite with BeforeAndAfterAll { private val noOpOutputStream = new OutputStream { @@ -117,8 +121,19 @@ class SparkSubmitUtilsSuite extends FunSuite with BeforeAndAfterAll { } test("neglects Spark and Spark's dependencies") { + val components = Seq("bagel_", "catalyst_", "core_", "graphx_", "hive_", "mllib_", "repl_", + "sql_", "streaming_", "yarn_", "network-common_", "network-shuffle_", "network-yarn_") + + val coordinates = + components.map(comp => s"org.apache.spark:spark-${comp}2.10:1.2.0").mkString(",") val path = SparkSubmitUtils.resolveMavenCoordinates( - "org.apache.spark:spark-core_2.10:1.2.0", None, None, true) + coordinates, None, None, true) assert(path === "", "should return empty path") + // Should not exclude the following dependency. Will throw an error, because it doesn't exist, + // but the fact that it is checking means that it wasn't excluded. + intercept[RuntimeException] { + SparkSubmitUtils.resolveMavenCoordinates(coordinates + + ",org.apache.spark:spark-streaming-kafka-assembly_2.10:1.2.0", None, None, true) + } } } From 9bacee80e4ae27398138c31262924ed92bcdfe07 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 24 Feb 2015 17:47:18 -0800 Subject: [PATCH 02/15] added fake core dependency that should be excluded --- .../scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala index ffba8e0c2c51a..b0a85b76f6758 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala @@ -125,7 +125,8 @@ class SparkSubmitUtilsSuite extends FunSuite with BeforeAndAfterAll { "sql_", "streaming_", "yarn_", "network-common_", "network-shuffle_", "network-yarn_") val coordinates = - components.map(comp => s"org.apache.spark:spark-${comp}2.10:1.2.0").mkString(",") + components.map(comp => s"org.apache.spark:spark-${comp}2.10:1.2.0").mkString(",") + + ",org.apache.spark:spark-core_fake:1.2.0" val path = SparkSubmitUtils.resolveMavenCoordinates( coordinates, None, None, true) assert(path === "", "should return empty path") From 941c65e61cfe5e7fb69c7ca12b19e54f2b5e8cd6 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 24 Feb 2015 17:48:54 -0800 Subject: [PATCH 03/15] removed unused imports --- .../scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala | 4 ---- 1 file changed, 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala index b0a85b76f6758..39d6f2fbb0428 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala @@ -19,8 +19,6 @@ package org.apache.spark.deploy import java.io.{PrintStream, OutputStream, File} -import org.apache.spark.util.Utils - import scala.collection.mutable.ArrayBuffer import org.scalatest.{BeforeAndAfterAll, FunSuite} @@ -28,8 +26,6 @@ import org.scalatest.{BeforeAndAfterAll, FunSuite} import org.apache.ivy.core.module.descriptor.MDArtifact import org.apache.ivy.plugins.resolver.IBiblioResolver -import org.apache.spark.TestUtils - class SparkSubmitUtilsSuite extends FunSuite with BeforeAndAfterAll { private val noOpOutputStream = new OutputStream { From e3ca1b73e99ee44dd99ff37b3d61fcc8f9b1eb51 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 24 Feb 2015 22:00:15 -0800 Subject: [PATCH 04/15] Rerouted ivy output to system.err --- core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index c826e679e82db..3622650e957d5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -796,6 +796,9 @@ private[spark] object SparkSubmitUtils { if (coordinates == null || coordinates.trim.isEmpty) { "" } else { + val sysOut = System.out + // To prevent ivy from logging to system out + System.setOut(printStream) val artifacts = extractMavenCoordinates(coordinates) // Default configuration name for ivy val ivyConfName = "default" @@ -854,6 +857,7 @@ private[spark] object SparkSubmitUtils { packagesDirectory.getAbsolutePath + File.separator + "[artifact](-[classifier]).[ext]", retrieveOptions.setConfs(Array(ivyConfName))) + System.setOut(sysOut) resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory) } } From 5191f3a09b3924824f9f1111b9d156cafc1443cc Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 24 Feb 2015 22:23:13 -0800 Subject: [PATCH 05/15] add ivy jars to driver-extra-classpath explicitly for pyspark issues with --jars --- .../org/apache/spark/deploy/SparkSubmit.scala | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 3622650e957d5..a8bfb5ef2b00b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -258,18 +258,18 @@ object SparkSubmit { SparkSubmitUtils.resolveMavenCoordinates( args.packages, Option(args.repositories), Option(args.ivyRepoPath)) if (!resolvedMavenCoordinates.trim.isEmpty) { - if (args.jars == null || args.jars.trim.isEmpty) { - args.jars = resolvedMavenCoordinates - } else { - args.jars += s",$resolvedMavenCoordinates" - } - if (args.isPython) { - if (args.pyFiles == null || args.pyFiles.trim.isEmpty) { - args.pyFiles = resolvedMavenCoordinates + def addMavenDependenciesToOption(option: String): String = { + if (option == null || option.trim.isEmpty) { + resolvedMavenCoordinates } else { - args.pyFiles += s",$resolvedMavenCoordinates" + option + s",$resolvedMavenCoordinates" } } + args.jars = addMavenDependenciesToOption(args.jars) + args.driverExtraClassPath = addMavenDependenciesToOption(args.driverExtraClassPath) + if (args.isPython) { + args.pyFiles = addMavenDependenciesToOption(args.pyFiles) + } } // Require all python files to be local, so we can add them to the PYTHONPATH From d9e3cf0272b5a4c882b55457354395d047b61df9 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 25 Feb 2015 14:51:45 -0800 Subject: [PATCH 06/15] removed redundant commands --- .../org/apache/spark/deploy/SparkSubmit.scala | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index a8bfb5ef2b00b..71d32d523f64a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -257,20 +257,9 @@ object SparkSubmit { val resolvedMavenCoordinates = SparkSubmitUtils.resolveMavenCoordinates( args.packages, Option(args.repositories), Option(args.ivyRepoPath)) - if (!resolvedMavenCoordinates.trim.isEmpty) { - def addMavenDependenciesToOption(option: String): String = { - if (option == null || option.trim.isEmpty) { - resolvedMavenCoordinates - } else { - option + s",$resolvedMavenCoordinates" - } - } - args.jars = addMavenDependenciesToOption(args.jars) - args.driverExtraClassPath = addMavenDependenciesToOption(args.driverExtraClassPath) - if (args.isPython) { - args.pyFiles = addMavenDependenciesToOption(args.pyFiles) - } - } + args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates) + if (args.isPython) args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates) + // Require all python files to be local, so we can add them to the PYTHONPATH // In YARN cluster mode, python files are distributed as regular files, which can be non-local From f25c55be9c78cd8bb386b1b5cacde02c72baea79 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 25 Feb 2015 17:53:23 -0800 Subject: [PATCH 07/15] added windows support --- bin/spark-submit | 6 +++- bin/spark-submit2.cmd | 7 +++++ .../org/apache/spark/deploy/SparkSubmit.scala | 31 +++++++++++-------- .../spark/deploy/SparkSubmitArguments.scala | 6 ++++ .../SparkSubmitDriverBootstrapper.scala | 16 +++++++++- .../spark/deploy/SparkSubmitUtilsSuite.scala | 9 +++--- 6 files changed, 56 insertions(+), 19 deletions(-) diff --git a/bin/spark-submit b/bin/spark-submit index 3e5cbdbb24394..bf826ff95b5ba 100755 --- a/bin/spark-submit +++ b/bin/spark-submit @@ -38,6 +38,10 @@ while (($#)); do export SPARK_SUBMIT_CLASSPATH=$2 elif [ "$1" = "--driver-java-options" ]; then export SPARK_SUBMIT_OPTS=$2 + elif [ "$1" = "--packages" ]; then + export SPARK_SUBMIT_PACKAGES=$2 + elif [ "$1" = "--repositories" ]; then + export SPARK_SUBMIT_REPOSITORIES=$2 elif [ "$1" = "--master" ]; then export MASTER=$2 fi @@ -65,7 +69,7 @@ if [[ "$SPARK_SUBMIT_DEPLOY_MODE" == "client" && -f "$SPARK_SUBMIT_PROPERTIES_FI grep -e "spark.driver.extra*\|spark.driver.memory" "$SPARK_SUBMIT_PROPERTIES_FILE" | \ grep -v "^[[:space:]]*#" ) - if [ -n "$contains_special_configs" ]; then + if [[ -n "$contains_special_configs" || -n "$SPARK_SUBMIT_PACKAGES" ]]; then export SPARK_SUBMIT_BOOTSTRAP_DRIVER=1 fi fi diff --git a/bin/spark-submit2.cmd b/bin/spark-submit2.cmd index 446cbc74b74f9..803d83338b3cd 100644 --- a/bin/spark-submit2.cmd +++ b/bin/spark-submit2.cmd @@ -49,6 +49,10 @@ if [%1] == [] goto continue set SPARK_SUBMIT_CLASSPATH=%2 ) else if [%1] == [--driver-java-options] ( set SPARK_SUBMIT_OPTS=%2 + ) else if [%1] == [--packages] ( + set SPARK_SUBMIT_PACKAGES=%2 + ) else if [%1] == [--repositories] ( + set SPARK_SUBMIT_REPOSITORIES=%2 ) else if [%1] == [--master] ( set MASTER=%2 ) @@ -73,6 +77,9 @@ if [%SPARK_SUBMIT_DEPLOY_MODE%] == [client] ( set SPARK_SUBMIT_BOOTSTRAP_DRIVER=1 ) ) + if [%SPARK_SUBMIT_PACKAGES%] != [] ( + set SPARK_SUBMIT_BOOTSTRAP_DRIVER=1 + ) ) cmd /V /E /C %SPARK_HOME%\bin\spark-class.cmd org.apache.spark.deploy.SparkSubmit %ORIG_ARGS% diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 71d32d523f64a..d67ea6a54e220 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -251,15 +251,20 @@ object SparkSubmit { } val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER - - // Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files - // too for packages that include Python code - val resolvedMavenCoordinates = - SparkSubmitUtils.resolveMavenCoordinates( - args.packages, Option(args.repositories), Option(args.ivyRepoPath)) - args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates) - if (args.isPython) args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates) - + println(s"\n\n${args.packagesResolved}\n\n") + if (args.packagesResolved != null) { + args.jars = mergeFileLists(args.jars, args.packagesResolved) + if (args.isPython) args.pyFiles = mergeFileLists(args.pyFiles, args.packagesResolved) + } else { + // This part needs to be added in case SparkSubmitDriverBootstrapper is not called + val resolvedMavenCoordinates = + SparkSubmitUtils.resolveMavenCoordinates( + args.packages, Option(args.repositories), Option(args.ivyRepoPath)).mkString(",") + if (resolvedMavenCoordinates.trim.length > 0) { + args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates) + if (args.isPython) args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates) + } + } // Require all python files to be local, so we can add them to the PYTHONPATH // In YARN cluster mode, python files are distributed as regular files, which can be non-local @@ -713,13 +718,13 @@ private[spark] object SparkSubmitUtils { */ private[spark] def resolveDependencyPaths( artifacts: Array[AnyRef], - cacheDirectory: File): String = { + cacheDirectory: File): Seq[String] = { artifacts.map { artifactInfo => val artifactString = artifactInfo.toString val jarName = artifactString.drop(artifactString.lastIndexOf("!") + 1) cacheDirectory.getAbsolutePath + File.separator + jarName.substring(0, jarName.lastIndexOf(".jar") + 4) - }.mkString(",") + } } /** Adds the given maven coordinates to Ivy's module descriptor. */ @@ -781,9 +786,9 @@ private[spark] object SparkSubmitUtils { coordinates: String, remoteRepos: Option[String], ivyPath: Option[String], - isTest: Boolean = false): String = { + isTest: Boolean = false): Seq[String] = { if (coordinates == null || coordinates.trim.isEmpty) { - "" + Seq("") } else { val sysOut = System.out // To prevent ivy from logging to system out diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 82e66a374249c..22f5926df8150 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -50,6 +50,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]() var jars: String = null var packages: String = null + var packagesResolved: String = null var repositories: String = null var ivyRepoPath: String = null var verbose: Boolean = false @@ -395,6 +396,11 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St packages = value parse(tail) + // Internal flag to receive the resolved maven jars and add to --jars + case ("--packages-resolved") :: value :: tail => + packagesResolved = Utils.resolveURIs(value) + parse(tail) + case ("--repositories") :: value :: tail => repositories = value parse(tail) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala index 2eab9981845e8..29c4d528b874b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala @@ -58,6 +58,8 @@ private[spark] object SparkSubmitDriverBootstrapper { val submitLibraryPath = sys.env.get("SPARK_SUBMIT_LIBRARY_PATH") val submitClasspath = sys.env.get("SPARK_SUBMIT_CLASSPATH") val submitJavaOpts = sys.env.get("SPARK_SUBMIT_OPTS") + val submitPackages = sys.env.getOrElse("SPARK_SUBMIT_PACKAGES", "") + val submitRepositories = sys.env.get("SPARK_SUBMIT_REPOSITORIES") assume(runner != null, "RUNNER must be set") assume(classpath != null, "CLASSPATH must be set") @@ -73,6 +75,7 @@ private[spark] object SparkSubmitDriverBootstrapper { val confLibraryPath = properties.get("spark.driver.extraLibraryPath") val confClasspath = properties.get("spark.driver.extraClassPath") val confJavaOpts = properties.get("spark.driver.extraJavaOptions") + val confIvyRepo = properties.get("spark.jars.ivy") // Favor Spark submit arguments over the equivalent configs in the properties file. // Note that we do not actually use the Spark submit values for library path, classpath, @@ -82,13 +85,23 @@ private[spark] object SparkSubmitDriverBootstrapper { .orElse(confDriverMemory) .getOrElse(defaultDriverMemory) - val newClasspath = + var newClasspath = if (submitClasspath.isDefined) { classpath } else { classpath + confClasspath.map(sys.props("path.separator") + _).getOrElse("") } + // Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files + // too for packages that include Python code + val resolvedMavenCoordinates = + SparkSubmitUtils.resolveMavenCoordinates( + submitPackages, submitRepositories, confIvyRepo) + if (resolvedMavenCoordinates.head.length > 0) { + newClasspath += sys.props("path.separator") + + resolvedMavenCoordinates.mkString(sys.props("path.separator")) + } + val newJavaOpts = if (submitJavaOpts.isDefined) { // SPARK_SUBMIT_OPTS is already captured in JAVA_OPTS @@ -108,6 +121,7 @@ private[spark] object SparkSubmitDriverBootstrapper { filteredJavaOpts ++ Seq(s"-Xms$newDriverMemory", s"-Xmx$newDriverMemory") ++ Seq("org.apache.spark.deploy.SparkSubmit") ++ + Seq("--packages-resolved", resolvedMavenCoordinates.mkString(",")) ++ submitArgs // Print the launch command. This follows closely the format used in `bin/spark-class`. diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala index 39d6f2fbb0428..def7056e4ac0f 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala @@ -91,7 +91,8 @@ class SparkSubmitUtilsSuite extends FunSuite with BeforeAndAfterAll { val ivyPath = "dummy/ivy" val md = SparkSubmitUtils.getModuleDescriptor val artifacts = for (i <- 0 until 3) yield new MDArtifact(md, s"jar-$i", "jar", "jar") - var jPaths = SparkSubmitUtils.resolveDependencyPaths(artifacts.toArray, new File(ivyPath)) + var jPaths = + SparkSubmitUtils.resolveDependencyPaths(artifacts.toArray, new File(ivyPath)).mkString(",") for (i <- 0 until 3) { val index = jPaths.indexOf(ivyPath) assert(index >= 0) @@ -99,13 +100,13 @@ class SparkSubmitUtilsSuite extends FunSuite with BeforeAndAfterAll { } // end to end val jarPath = SparkSubmitUtils.resolveMavenCoordinates( - "com.databricks:spark-csv_2.10:0.1", None, Option(ivyPath), true) + "com.databricks:spark-csv_2.10:0.1", None, Option(ivyPath), true).mkString(",") assert(jarPath.indexOf(ivyPath) >= 0, "should use non-default ivy path") } test("search for artifact at other repositories") { val path = SparkSubmitUtils.resolveMavenCoordinates("com.agimatec:agimatec-validation:0.9.3", - Option("https://oss.sonatype.org/content/repositories/agimatec/"), None, true) + Option("https://oss.sonatype.org/content/repositories/agimatec/"), None, true).mkString(",") assert(path.indexOf("agimatec-validation") >= 0, "should find package. If it doesn't, check" + "if package still exists. If it has been removed, replace the example in this test.") } @@ -124,7 +125,7 @@ class SparkSubmitUtilsSuite extends FunSuite with BeforeAndAfterAll { components.map(comp => s"org.apache.spark:spark-${comp}2.10:1.2.0").mkString(",") + ",org.apache.spark:spark-core_fake:1.2.0" val path = SparkSubmitUtils.resolveMavenCoordinates( - coordinates, None, None, true) + coordinates, None, None, true).mkString(",") assert(path === "", "should return empty path") // Should not exclude the following dependency. Will throw an error, because it doesn't exist, // but the fact that it is checking means that it wasn't excluded. From 43c3cb28318b81c8416c158bc328a86b9a0b5847 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 25 Feb 2015 17:59:48 -0800 Subject: [PATCH 08/15] removed debug line --- core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index d67ea6a54e220..1ad11cb93557e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -251,7 +251,6 @@ object SparkSubmit { } val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER - println(s"\n\n${args.packagesResolved}\n\n") if (args.packagesResolved != null) { args.jars = mergeFileLists(args.jars, args.packagesResolved) if (args.isPython) args.pyFiles = mergeFileLists(args.pyFiles, args.packagesResolved) From 12e2764aa6c140ffa2b1e80d69483b3f19467f07 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 25 Feb 2015 20:47:34 -0800 Subject: [PATCH 09/15] tried to fix yarn test --- .../apache/spark/deploy/SparkSubmitDriverBootstrapper.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala index 29c4d528b874b..ad933f993de9b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala @@ -44,7 +44,7 @@ private[spark] object SparkSubmitDriverBootstrapper { System.exit(1) } - val submitArgs = args + var submitArgs = args val runner = sys.env("RUNNER") val classpath = sys.env("CLASSPATH") val javaOpts = sys.env("JAVA_OPTS") @@ -100,6 +100,7 @@ private[spark] object SparkSubmitDriverBootstrapper { if (resolvedMavenCoordinates.head.length > 0) { newClasspath += sys.props("path.separator") + resolvedMavenCoordinates.mkString(sys.props("path.separator")) + submitArgs ++= Seq("--packages-resolved", resolvedMavenCoordinates.mkString(",")) } val newJavaOpts = @@ -121,7 +122,6 @@ private[spark] object SparkSubmitDriverBootstrapper { filteredJavaOpts ++ Seq(s"-Xms$newDriverMemory", s"-Xmx$newDriverMemory") ++ Seq("org.apache.spark.deploy.SparkSubmit") ++ - Seq("--packages-resolved", resolvedMavenCoordinates.mkString(",")) ++ submitArgs // Print the launch command. This follows closely the format used in `bin/spark-class`. From f06a7543ab17b977cda62203d939dec6f4f24882 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 25 Feb 2015 20:52:19 -0800 Subject: [PATCH 10/15] prepend --packages-resolved instead of append --- .../apache/spark/deploy/SparkSubmitDriverBootstrapper.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala index ad933f993de9b..29e3cc268eb43 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala @@ -100,7 +100,8 @@ private[spark] object SparkSubmitDriverBootstrapper { if (resolvedMavenCoordinates.head.length > 0) { newClasspath += sys.props("path.separator") + resolvedMavenCoordinates.mkString(sys.props("path.separator")) - submitArgs ++= Seq("--packages-resolved", resolvedMavenCoordinates.mkString(",")) + submitArgs = + Array("--packages-resolved", resolvedMavenCoordinates.mkString(",")) ++ submitArgs } val newJavaOpts = From c73aabe78972f30903799dbeef1a5c411a729316 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 25 Feb 2015 22:47:52 -0800 Subject: [PATCH 11/15] address review comments --- .../org/apache/spark/deploy/SparkSubmit.scala | 30 ++++++++++--------- .../spark/deploy/SparkSubmitArguments.scala | 3 +- .../SparkSubmitDriverBootstrapper.scala | 2 +- 3 files changed, 19 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 1ad11cb93557e..6bf9414570120 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -251,17 +251,19 @@ object SparkSubmit { } val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER - if (args.packagesResolved != null) { - args.jars = mergeFileLists(args.jars, args.packagesResolved) - if (args.isPython) args.pyFiles = mergeFileLists(args.pyFiles, args.packagesResolved) - } else { - // This part needs to be added in case SparkSubmitDriverBootstrapper is not called - val resolvedMavenCoordinates = - SparkSubmitUtils.resolveMavenCoordinates( - args.packages, Option(args.repositories), Option(args.ivyRepoPath)).mkString(",") - if (resolvedMavenCoordinates.trim.length > 0) { - args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates) - if (args.isPython) args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates) + val packagesResolved = + if (args.packagesResolved != null) { + // SparkSubmitDriverBootstrapper already downloaded the jars for us + args.packagesResolved + } else { + SparkSubmitUtils.resolveMavenCoordinates(args.packages, Option(args.repositories), + Option(args.ivyRepoPath)).mkString(",") + } + + if (packagesResolved.nonEmpty) { + args.jars = mergeFileLists(args.jars, packagesResolved) + if (args.isPython) { + args.pyFiles = mergeFileLists(args.pyFiles, packagesResolved) } } @@ -713,7 +715,7 @@ private[spark] object SparkSubmitUtils { * after a '!' by Ivy. It also sometimes contains '(bundle)' after '.jar'. Remove that as well. * @param artifacts Sequence of dependencies that were resolved and retrieved * @param cacheDirectory directory where jars are cached - * @return a comma-delimited list of paths for the dependencies + * @return A sequence of paths for the dependencies */ private[spark] def resolveDependencyPaths( artifacts: Array[AnyRef], @@ -778,7 +780,7 @@ private[spark] object SparkSubmitUtils { * @param coordinates Comma-delimited string of maven coordinates * @param remoteRepos Comma-delimited string of remote repositories other than maven central * @param ivyPath The path to the local ivy repository - * @return The comma-delimited path to the jars of the given maven artifacts including their + * @return A sequence of paths to the jars of the given maven artifacts including their * transitive dependencies */ private[spark] def resolveMavenCoordinates( @@ -787,7 +789,7 @@ private[spark] object SparkSubmitUtils { ivyPath: Option[String], isTest: Boolean = false): Seq[String] = { if (coordinates == null || coordinates.trim.isEmpty) { - Seq("") + Seq.empty } else { val sysOut = System.out // To prevent ivy from logging to system out diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 22f5926df8150..5ff3369e46921 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -396,7 +396,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St packages = value parse(tail) - // Internal flag to receive the resolved maven jars and add to --jars + // Spark-6031 Internal flag to receive the resolved maven jars and add to --jars. + // This is only passed through the Bootstrapper case ("--packages-resolved") :: value :: tail => packagesResolved = Utils.resolveURIs(value) parse(tail) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala index 29e3cc268eb43..8a48c5bc32536 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala @@ -97,7 +97,7 @@ private[spark] object SparkSubmitDriverBootstrapper { val resolvedMavenCoordinates = SparkSubmitUtils.resolveMavenCoordinates( submitPackages, submitRepositories, confIvyRepo) - if (resolvedMavenCoordinates.head.length > 0) { + if (resolvedMavenCoordinates.nonEmpty) { newClasspath += sys.props("path.separator") + resolvedMavenCoordinates.mkString(sys.props("path.separator")) submitArgs = From 7f958c18afcd50b331ec11b5c4fb4b5db483758c Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 25 Feb 2015 22:56:44 -0800 Subject: [PATCH 12/15] fixed wrong comment --- .../apache/spark/deploy/SparkSubmitDriverBootstrapper.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala index 8a48c5bc32536..c56ad49df3efa 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala @@ -92,8 +92,8 @@ private[spark] object SparkSubmitDriverBootstrapper { classpath + confClasspath.map(sys.props("path.separator") + _).getOrElse("") } - // Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files - // too for packages that include Python code + // Resolve maven dependencies if there are any and add them to classpath. Also send them + // to SparkSubmit so that they can be shipped to executors. val resolvedMavenCoordinates = SparkSubmitUtils.resolveMavenCoordinates( submitPackages, submitRepositories, confIvyRepo) From b7a9e93ef990317484a1150aef03cdba1c05e9c9 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 26 Feb 2015 07:37:05 -0800 Subject: [PATCH 13/15] move if condition --- bin/spark-submit | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/bin/spark-submit b/bin/spark-submit index bf826ff95b5ba..d5bf0ed5f082e 100755 --- a/bin/spark-submit +++ b/bin/spark-submit @@ -63,12 +63,14 @@ export SPARK_SUBMIT_PROPERTIES_FILE=${SPARK_SUBMIT_PROPERTIES_FILE:-"$DEFAULT_PR # paths, library paths, java options and memory early on. Otherwise, it will # be too late by the time the driver JVM has started. -if [[ "$SPARK_SUBMIT_DEPLOY_MODE" == "client" && -f "$SPARK_SUBMIT_PROPERTIES_FILE" ]]; then - # Parse the properties file only if the special configs exist - contains_special_configs=$( - grep -e "spark.driver.extra*\|spark.driver.memory" "$SPARK_SUBMIT_PROPERTIES_FILE" | \ - grep -v "^[[:space:]]*#" - ) +if [ "$SPARK_SUBMIT_DEPLOY_MODE" == "client" ]; then + if [ -f "$SPARK_SUBMIT_PROPERTIES_FILE" ]; then + # Parse the properties file only if the special configs exist + contains_special_configs=$( + grep -e "spark.driver.extra*\|spark.driver.memory" "$SPARK_SUBMIT_PROPERTIES_FILE" | \ + grep -v "^[[:space:]]*#" + ) + fi if [[ -n "$contains_special_configs" || -n "$SPARK_SUBMIT_PACKAGES" ]]; then export SPARK_SUBMIT_BOOTSTRAP_DRIVER=1 fi From 994869e3c2b279592bac6c858ca41e6f186b0d60 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 26 Feb 2015 12:15:50 -0800 Subject: [PATCH 14/15] Revert "move if condition" This reverts commit b7a9e93ef990317484a1150aef03cdba1c05e9c9. --- bin/spark-submit | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/bin/spark-submit b/bin/spark-submit index d5bf0ed5f082e..bf826ff95b5ba 100755 --- a/bin/spark-submit +++ b/bin/spark-submit @@ -63,14 +63,12 @@ export SPARK_SUBMIT_PROPERTIES_FILE=${SPARK_SUBMIT_PROPERTIES_FILE:-"$DEFAULT_PR # paths, library paths, java options and memory early on. Otherwise, it will # be too late by the time the driver JVM has started. -if [ "$SPARK_SUBMIT_DEPLOY_MODE" == "client" ]; then - if [ -f "$SPARK_SUBMIT_PROPERTIES_FILE" ]; then - # Parse the properties file only if the special configs exist - contains_special_configs=$( - grep -e "spark.driver.extra*\|spark.driver.memory" "$SPARK_SUBMIT_PROPERTIES_FILE" | \ - grep -v "^[[:space:]]*#" - ) - fi +if [[ "$SPARK_SUBMIT_DEPLOY_MODE" == "client" && -f "$SPARK_SUBMIT_PROPERTIES_FILE" ]]; then + # Parse the properties file only if the special configs exist + contains_special_configs=$( + grep -e "spark.driver.extra*\|spark.driver.memory" "$SPARK_SUBMIT_PROPERTIES_FILE" | \ + grep -v "^[[:space:]]*#" + ) if [[ -n "$contains_special_configs" || -n "$SPARK_SUBMIT_PACKAGES" ]]; then export SPARK_SUBMIT_BOOTSTRAP_DRIVER=1 fi From 44dbf671887476de2c2b1723bec18f144ab769ff Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 26 Feb 2015 13:31:18 -0800 Subject: [PATCH 15/15] fix windows checking --- bin/spark-submit2.cmd | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/bin/spark-submit2.cmd b/bin/spark-submit2.cmd index 803d83338b3cd..1b638553e86ca 100644 --- a/bin/spark-submit2.cmd +++ b/bin/spark-submit2.cmd @@ -76,9 +76,9 @@ if [%SPARK_SUBMIT_DEPLOY_MODE%] == [client] ( %SPARK_SUBMIT_PROPERTIES_FILE%') do ( set SPARK_SUBMIT_BOOTSTRAP_DRIVER=1 ) - ) - if [%SPARK_SUBMIT_PACKAGES%] != [] ( - set SPARK_SUBMIT_BOOTSTRAP_DRIVER=1 + if [%SPARK_SUBMIT_PACKAGES%] NEQ [] ( + set SPARK_SUBMIT_BOOTSTRAP_DRIVER=1 + ) ) )