diff --git a/bin/spark-submit b/bin/spark-submit index 3e5cbdbb24394..bf826ff95b5ba 100755 --- a/bin/spark-submit +++ b/bin/spark-submit @@ -38,6 +38,10 @@ while (($#)); do export SPARK_SUBMIT_CLASSPATH=$2 elif [ "$1" = "--driver-java-options" ]; then export SPARK_SUBMIT_OPTS=$2 + elif [ "$1" = "--packages" ]; then + export SPARK_SUBMIT_PACKAGES=$2 + elif [ "$1" = "--repositories" ]; then + export SPARK_SUBMIT_REPOSITORIES=$2 elif [ "$1" = "--master" ]; then export MASTER=$2 fi @@ -65,7 +69,7 @@ if [[ "$SPARK_SUBMIT_DEPLOY_MODE" == "client" && -f "$SPARK_SUBMIT_PROPERTIES_FI grep -e "spark.driver.extra*\|spark.driver.memory" "$SPARK_SUBMIT_PROPERTIES_FILE" | \ grep -v "^[[:space:]]*#" ) - if [ -n "$contains_special_configs" ]; then + if [[ -n "$contains_special_configs" || -n "$SPARK_SUBMIT_PACKAGES" ]]; then export SPARK_SUBMIT_BOOTSTRAP_DRIVER=1 fi fi diff --git a/bin/spark-submit2.cmd b/bin/spark-submit2.cmd index 446cbc74b74f9..1b638553e86ca 100644 --- a/bin/spark-submit2.cmd +++ b/bin/spark-submit2.cmd @@ -49,6 +49,10 @@ if [%1] == [] goto continue set SPARK_SUBMIT_CLASSPATH=%2 ) else if [%1] == [--driver-java-options] ( set SPARK_SUBMIT_OPTS=%2 + ) else if [%1] == [--packages] ( + set SPARK_SUBMIT_PACKAGES=%2 + ) else if [%1] == [--repositories] ( + set SPARK_SUBMIT_REPOSITORIES=%2 ) else if [%1] == [--master] ( set MASTER=%2 ) @@ -72,6 +76,9 @@ if [%SPARK_SUBMIT_DEPLOY_MODE%] == [client] ( %SPARK_SUBMIT_PROPERTIES_FILE%') do ( set SPARK_SUBMIT_BOOTSTRAP_DRIVER=1 ) + if [%SPARK_SUBMIT_PACKAGES%] NEQ [] ( + set SPARK_SUBMIT_BOOTSTRAP_DRIVER=1 + ) ) ) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 4c4110812e0a1..6bf9414570120 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -251,24 +251,19 @@ object SparkSubmit { } val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER - - // Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files - // too for packages that include Python code - val resolvedMavenCoordinates = - SparkSubmitUtils.resolveMavenCoordinates( - args.packages, Option(args.repositories), Option(args.ivyRepoPath)) - if (!resolvedMavenCoordinates.trim.isEmpty) { - if (args.jars == null || args.jars.trim.isEmpty) { - args.jars = resolvedMavenCoordinates + val packagesResolved = + if (args.packagesResolved != null) { + // SparkSubmitDriverBootstrapper already downloaded the jars for us + args.packagesResolved } else { - args.jars += s",$resolvedMavenCoordinates" + SparkSubmitUtils.resolveMavenCoordinates(args.packages, Option(args.repositories), + Option(args.ivyRepoPath)).mkString(",") } + + if (packagesResolved.nonEmpty) { + args.jars = mergeFileLists(args.jars, packagesResolved) if (args.isPython) { - if (args.pyFiles == null || args.pyFiles.trim.isEmpty) { - args.pyFiles = resolvedMavenCoordinates - } else { - args.pyFiles += s",$resolvedMavenCoordinates" - } + args.pyFiles = mergeFileLists(args.pyFiles, packagesResolved) } } @@ -655,8 +650,7 @@ private[spark] object SparkSubmitUtils { /** * Extracts maven coordinates from a comma-delimited string. Coordinates should be provided - * in the format `groupId:artifactId:version` or `groupId/artifactId:version`. The latter provides - * simplicity for Spark Package users. + * in the format `groupId:artifactId:version` or `groupId/artifactId:version`. * @param coordinates Comma-delimited string of maven coordinates * @return Sequence of Maven coordinates */ @@ -721,17 +715,17 @@ private[spark] object SparkSubmitUtils { * after a '!' by Ivy. It also sometimes contains '(bundle)' after '.jar'. Remove that as well. * @param artifacts Sequence of dependencies that were resolved and retrieved * @param cacheDirectory directory where jars are cached - * @return a comma-delimited list of paths for the dependencies + * @return A sequence of paths for the dependencies */ private[spark] def resolveDependencyPaths( artifacts: Array[AnyRef], - cacheDirectory: File): String = { + cacheDirectory: File): Seq[String] = { artifacts.map { artifactInfo => val artifactString = artifactInfo.toString val jarName = artifactString.drop(artifactString.lastIndexOf("!") + 1) cacheDirectory.getAbsolutePath + File.separator + jarName.substring(0, jarName.lastIndexOf(".jar") + 4) - }.mkString(",") + } } /** Adds the given maven coordinates to Ivy's module descriptor. */ @@ -748,6 +742,35 @@ private[spark] object SparkSubmitUtils { } } + /** Add exclusion rules for dependencies already included in the spark-assembly */ + private[spark] def addExclusionRules( + ivySettings: IvySettings, + ivyConfName: String, + md: DefaultModuleDescriptor): Unit = { + // Add scala exclusion rule + val scalaArtifacts = new ArtifactId(new ModuleId("*", "scala-library"), "*", "*", "*") + val scalaDependencyExcludeRule = + new DefaultExcludeRule(scalaArtifacts, ivySettings.getMatcher("glob"), null) + scalaDependencyExcludeRule.addConfiguration(ivyConfName) + md.addExcludeRule(scalaDependencyExcludeRule) + + // We need to specify each component explicitly, otherwise we miss spark-streaming-kafka and + // other spark-streaming utility components. Underscore is there to differentiate between + // spark-streaming_2.1x and spark-streaming-kafka-assembly_2.1x + val components = Seq("bagel_", "catalyst_", "core_", "graphx_", "hive_", "mllib_", "repl_", + "sql_", "streaming_", "yarn_", "network-common_", "network-shuffle_", "network-yarn_") + + components.foreach { comp => + val sparkArtifacts = + new ArtifactId(new ModuleId("org.apache.spark", s"spark-$comp*"), "*", "*", "*") + val sparkDependencyExcludeRule = + new DefaultExcludeRule(sparkArtifacts, ivySettings.getMatcher("glob"), null) + sparkDependencyExcludeRule.addConfiguration(ivyConfName) + + md.addExcludeRule(sparkDependencyExcludeRule) + } + } + /** A nice function to use in tests as well. Values are dummy strings. */ private[spark] def getModuleDescriptor = DefaultModuleDescriptor.newDefaultInstance( ModuleRevisionId.newInstance("org.apache.spark", "spark-submit-parent", "1.0")) @@ -757,17 +780,20 @@ private[spark] object SparkSubmitUtils { * @param coordinates Comma-delimited string of maven coordinates * @param remoteRepos Comma-delimited string of remote repositories other than maven central * @param ivyPath The path to the local ivy repository - * @return The comma-delimited path to the jars of the given maven artifacts including their + * @return A sequence of paths to the jars of the given maven artifacts including their * transitive dependencies */ private[spark] def resolveMavenCoordinates( coordinates: String, remoteRepos: Option[String], ivyPath: Option[String], - isTest: Boolean = false): String = { + isTest: Boolean = false): Seq[String] = { if (coordinates == null || coordinates.trim.isEmpty) { - "" + Seq.empty } else { + val sysOut = System.out + // To prevent ivy from logging to system out + System.setOut(printStream) val artifacts = extractMavenCoordinates(coordinates) // Default configuration name for ivy val ivyConfName = "default" @@ -811,19 +837,9 @@ private[spark] object SparkSubmitUtils { val md = getModuleDescriptor md.setDefaultConf(ivyConfName) - // Add an exclusion rule for Spark and Scala Library - val sparkArtifacts = new ArtifactId(new ModuleId("org.apache.spark", "*"), "*", "*", "*") - val sparkDependencyExcludeRule = - new DefaultExcludeRule(sparkArtifacts, ivySettings.getMatcher("glob"), null) - sparkDependencyExcludeRule.addConfiguration(ivyConfName) - val scalaArtifacts = new ArtifactId(new ModuleId("*", "scala-library"), "*", "*", "*") - val scalaDependencyExcludeRule = - new DefaultExcludeRule(scalaArtifacts, ivySettings.getMatcher("glob"), null) - scalaDependencyExcludeRule.addConfiguration(ivyConfName) - - // Exclude any Spark dependencies, and add all supplied maven artifacts as dependencies - md.addExcludeRule(sparkDependencyExcludeRule) - md.addExcludeRule(scalaDependencyExcludeRule) + // Add exclusion rules for Spark and Scala Library + addExclusionRules(ivySettings, ivyConfName, md) + // add all supplied maven artifacts as dependencies addDependenciesToIvy(md, artifacts, ivyConfName) // resolve dependencies @@ -836,6 +852,7 @@ private[spark] object SparkSubmitUtils { packagesDirectory.getAbsolutePath + File.separator + "[artifact](-[classifier]).[ext]", retrieveOptions.setConfs(Array(ivyConfName))) + System.setOut(sysOut) resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 82e66a374249c..5ff3369e46921 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -50,6 +50,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]() var jars: String = null var packages: String = null + var packagesResolved: String = null var repositories: String = null var ivyRepoPath: String = null var verbose: Boolean = false @@ -395,6 +396,12 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St packages = value parse(tail) + // Spark-6031 Internal flag to receive the resolved maven jars and add to --jars. + // This is only passed through the Bootstrapper + case ("--packages-resolved") :: value :: tail => + packagesResolved = Utils.resolveURIs(value) + parse(tail) + case ("--repositories") :: value :: tail => repositories = value parse(tail) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala index 2eab9981845e8..c56ad49df3efa 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala @@ -44,7 +44,7 @@ private[spark] object SparkSubmitDriverBootstrapper { System.exit(1) } - val submitArgs = args + var submitArgs = args val runner = sys.env("RUNNER") val classpath = sys.env("CLASSPATH") val javaOpts = sys.env("JAVA_OPTS") @@ -58,6 +58,8 @@ private[spark] object SparkSubmitDriverBootstrapper { val submitLibraryPath = sys.env.get("SPARK_SUBMIT_LIBRARY_PATH") val submitClasspath = sys.env.get("SPARK_SUBMIT_CLASSPATH") val submitJavaOpts = sys.env.get("SPARK_SUBMIT_OPTS") + val submitPackages = sys.env.getOrElse("SPARK_SUBMIT_PACKAGES", "") + val submitRepositories = sys.env.get("SPARK_SUBMIT_REPOSITORIES") assume(runner != null, "RUNNER must be set") assume(classpath != null, "CLASSPATH must be set") @@ -73,6 +75,7 @@ private[spark] object SparkSubmitDriverBootstrapper { val confLibraryPath = properties.get("spark.driver.extraLibraryPath") val confClasspath = properties.get("spark.driver.extraClassPath") val confJavaOpts = properties.get("spark.driver.extraJavaOptions") + val confIvyRepo = properties.get("spark.jars.ivy") // Favor Spark submit arguments over the equivalent configs in the properties file. // Note that we do not actually use the Spark submit values for library path, classpath, @@ -82,13 +85,25 @@ private[spark] object SparkSubmitDriverBootstrapper { .orElse(confDriverMemory) .getOrElse(defaultDriverMemory) - val newClasspath = + var newClasspath = if (submitClasspath.isDefined) { classpath } else { classpath + confClasspath.map(sys.props("path.separator") + _).getOrElse("") } + // Resolve maven dependencies if there are any and add them to classpath. Also send them + // to SparkSubmit so that they can be shipped to executors. + val resolvedMavenCoordinates = + SparkSubmitUtils.resolveMavenCoordinates( + submitPackages, submitRepositories, confIvyRepo) + if (resolvedMavenCoordinates.nonEmpty) { + newClasspath += sys.props("path.separator") + + resolvedMavenCoordinates.mkString(sys.props("path.separator")) + submitArgs = + Array("--packages-resolved", resolvedMavenCoordinates.mkString(",")) ++ submitArgs + } + val newJavaOpts = if (submitJavaOpts.isDefined) { // SPARK_SUBMIT_OPTS is already captured in JAVA_OPTS diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala index ad62b35f624f6..def7056e4ac0f 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala @@ -91,7 +91,8 @@ class SparkSubmitUtilsSuite extends FunSuite with BeforeAndAfterAll { val ivyPath = "dummy/ivy" val md = SparkSubmitUtils.getModuleDescriptor val artifacts = for (i <- 0 until 3) yield new MDArtifact(md, s"jar-$i", "jar", "jar") - var jPaths = SparkSubmitUtils.resolveDependencyPaths(artifacts.toArray, new File(ivyPath)) + var jPaths = + SparkSubmitUtils.resolveDependencyPaths(artifacts.toArray, new File(ivyPath)).mkString(",") for (i <- 0 until 3) { val index = jPaths.indexOf(ivyPath) assert(index >= 0) @@ -99,13 +100,13 @@ class SparkSubmitUtilsSuite extends FunSuite with BeforeAndAfterAll { } // end to end val jarPath = SparkSubmitUtils.resolveMavenCoordinates( - "com.databricks:spark-csv_2.10:0.1", None, Option(ivyPath), true) + "com.databricks:spark-csv_2.10:0.1", None, Option(ivyPath), true).mkString(",") assert(jarPath.indexOf(ivyPath) >= 0, "should use non-default ivy path") } test("search for artifact at other repositories") { val path = SparkSubmitUtils.resolveMavenCoordinates("com.agimatec:agimatec-validation:0.9.3", - Option("https://oss.sonatype.org/content/repositories/agimatec/"), None, true) + Option("https://oss.sonatype.org/content/repositories/agimatec/"), None, true).mkString(",") assert(path.indexOf("agimatec-validation") >= 0, "should find package. If it doesn't, check" + "if package still exists. If it has been removed, replace the example in this test.") } @@ -117,8 +118,20 @@ class SparkSubmitUtilsSuite extends FunSuite with BeforeAndAfterAll { } test("neglects Spark and Spark's dependencies") { + val components = Seq("bagel_", "catalyst_", "core_", "graphx_", "hive_", "mllib_", "repl_", + "sql_", "streaming_", "yarn_", "network-common_", "network-shuffle_", "network-yarn_") + + val coordinates = + components.map(comp => s"org.apache.spark:spark-${comp}2.10:1.2.0").mkString(",") + + ",org.apache.spark:spark-core_fake:1.2.0" val path = SparkSubmitUtils.resolveMavenCoordinates( - "org.apache.spark:spark-core_2.10:1.2.0", None, None, true) + coordinates, None, None, true).mkString(",") assert(path === "", "should return empty path") + // Should not exclude the following dependency. Will throw an error, because it doesn't exist, + // but the fact that it is checking means that it wasn't excluded. + intercept[RuntimeException] { + SparkSubmitUtils.resolveMavenCoordinates(coordinates + + ",org.apache.spark:spark-streaming-kafka-assembly_2.10:1.2.0", None, None, true) + } } }