From c60156dd67900db25f7071642f96afde9461c8c7 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Sat, 14 Feb 2015 11:08:52 -0800 Subject: [PATCH 1/8] [SPARK-5811] Added documentation for maven coordinates --- .../org/apache/spark/deploy/SparkSubmit.scala | 43 +++++++++++++------ docs/programming-guide.md | 19 ++++++-- docs/submitting-applications.md | 5 +++ 3 files changed, 50 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 54399e99c98f0..2e47441f3060e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -253,6 +253,24 @@ object SparkSubmit { val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER + // Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files + // too for Spark Package compatibility. + val resolvedMavenCoordinates = + SparkSubmitUtils.resolveMavenCoordinates( + args.packages, Option(args.repositories), Option(args.ivyRepoPath)) + if (!resolvedMavenCoordinates.trim.isEmpty) { + if (args.jars == null || args.jars.trim.isEmpty) { + args.jars = resolvedMavenCoordinates + } else { + args.jars += s",$resolvedMavenCoordinates" + } + if (args.pyFiles == null || args.pyFiles.trim.isEmpty) { + args.pyFiles = resolvedMavenCoordinates + } else { + args.pyFiles += s",$resolvedMavenCoordinates" + } + } + // Require all python files to be local, so we can add them to the PYTHONPATH // In YARN cluster mode, python files are distributed as regular files, which can be non-local if (args.isPython && !isYarnCluster) { @@ -309,18 +327,6 @@ object SparkSubmit { // Special flag to avoid deprecation warnings at the client sysProps("SPARK_SUBMIT") = "true" - // Resolve maven dependencies if there are any and add classpath to jars - val resolvedMavenCoordinates = - SparkSubmitUtils.resolveMavenCoordinates( - args.packages, Option(args.repositories), Option(args.ivyRepoPath)) - if (!resolvedMavenCoordinates.trim.isEmpty) { - if (args.jars == null || args.jars.trim.isEmpty) { - args.jars = resolvedMavenCoordinates - } else { - args.jars += s",$resolvedMavenCoordinates" - } - } - // A list of rules to map each argument to system properties or command-line options in // each deploy mode; we iterate through these below val options = List[OptionAssigner]( @@ -648,13 +654,15 @@ private[spark] object SparkSubmitUtils { private[spark] case class MavenCoordinate(groupId: String, artifactId: String, version: String) /** - * Extracts maven coordinates from a comma-delimited string + * Extracts maven coordinates from a comma-delimited string. Coordinates should be provided + * in the format `groupId:artifactId:version` or `groupId/artifactId:version`. The latter provides + * simplicity for Spark Package users. * @param coordinates Comma-delimited string of maven coordinates * @return Sequence of Maven coordinates */ private[spark] def extractMavenCoordinates(coordinates: String): Seq[MavenCoordinate] = { coordinates.split(",").map { p => - val splits = p.split(":") + val splits = p.replace("/", ":").split(":") require(splits.length == 3, s"Provided Maven Coordinates must be in the form " + s"'groupId:artifactId:version'. The coordinate provided is: $p") require(splits(0) != null && splits(0).trim.nonEmpty, s"The groupId cannot be null or " + @@ -684,6 +692,13 @@ private[spark] object SparkSubmitUtils { br.setName("central") cr.add(br) + val sp: IBiblioResolver = new IBiblioResolver + sp.setM2compatible(true) + sp.setUsepoms(true) + sp.setRoot("http://dl.bintray.com/spark-packages/maven") + sp.setName("spark-packages") + cr.add(sp) + val repositoryList = remoteRepos.getOrElse("") // add any other remote repositories other than maven central if (repositoryList.trim.nonEmpty) { diff --git a/docs/programming-guide.md b/docs/programming-guide.md index 118701549a759..4e4af76316863 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -173,8 +173,11 @@ in-process. In the Spark shell, a special interpreter-aware SparkContext is already created for you, in the variable called `sc`. Making your own SparkContext will not work. You can set which master the context connects to using the `--master` argument, and you can add JARs to the classpath -by passing a comma-separated list to the `--jars` argument. -For example, to run `bin/spark-shell` on exactly four cores, use: +by passing a comma-separated list to the `--jars` argument. You can also add dependencies +(e.g. Spark Packages) to your shell session by supplying a comma-separated list of maven coordinates +to the `--packages` argument. Any additional repositories where dependencies might exist (e.g. SonaType) +can be passed to the `--repositories` argument. For example, to run `bin/spark-shell` on exactly +four cores, use: {% highlight bash %} $ ./bin/spark-shell --master local[4] @@ -186,6 +189,12 @@ Or, to also add `code.jar` to its classpath, use: $ ./bin/spark-shell --master local[4] --jars code.jar {% endhighlight %} +To include a dependency using maven coordinates: + +{% highlight bash %} +$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1" +{% endhighlight %} + For a complete list of options, run `spark-shell --help`. Behind the scenes, `spark-shell` invokes the more general [`spark-submit` script](submitting-applications.html). @@ -196,7 +205,11 @@ For a complete list of options, run `spark-shell --help`. Behind the scenes, In the PySpark shell, a special interpreter-aware SparkContext is already created for you, in the variable called `sc`. Making your own SparkContext will not work. You can set which master the context connects to using the `--master` argument, and you can add Python .zip, .egg or .py files -to the runtime path by passing a comma-separated list to `--py-files`. +to the runtime path by passing a comma-separated list to `--py-files`. You can also add dependencies +(e.g. Spark Packages) to your shell session by supplying a comma-separated list of maven coordinates +to the `--packages` argument. Any additional repositories where dependencies might exist (e.g. SonaType) +can be passed to the `--repositories` argument. Any python dependencies a Spark Package has (listed in +the requirements.txt of that package) must be manually installed using pip when necessary. For example, to run `bin/pyspark` on exactly four cores, use: {% highlight bash %} diff --git a/docs/submitting-applications.md b/docs/submitting-applications.md index 14a87f8436984..57b074778f2b0 100644 --- a/docs/submitting-applications.md +++ b/docs/submitting-applications.md @@ -174,6 +174,11 @@ This can use up a significant amount of space over time and will need to be clea is handled automatically, and with Spark standalone, automatic cleanup can be configured with the `spark.worker.cleanup.appDataTtl` property. +Users may also include any other dependencies by supplying a comma-delimited list of maven coordinates +with `--packages`. All transitive dependencies will be handled when using this command. Additional +repositories (or resolvers in SBT) can be added in a comma-delimited fashion with the flag `--repositories`. +These commands can be used with `pyspark`, `spark-shell`, and `spark-submit` to include Spark Packages. + For Python, the equivalent `--py-files` option can be used to distribute `.egg`, `.zip` and `.py` libraries to executors. From 17d3f76fa4a542a0e47fc60579b17409ebeebc7d Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 17 Feb 2015 10:09:21 -0800 Subject: [PATCH 2/8] support .jar as python package --- python/pyspark/context.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index bf1f61c8504ed..40b3152b23843 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -64,6 +64,8 @@ class SparkContext(object): _lock = Lock() _python_includes = None # zip and egg files that need to be added to PYTHONPATH + PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar') + def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=PickleSerializer(), conf=None, gateway=None, jsc=None, profiler_cls=BasicProfiler): @@ -185,7 +187,7 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, for path in self._conf.get("spark.submit.pyFiles", "").split(","): if path != "": (dirname, filename) = os.path.split(path) - if filename.lower().endswith("zip") or filename.lower().endswith("egg"): + if filename[-4:].lower() in self.PACKAGE_EXTENSIONS: self._python_includes.append(filename) sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), filename)) @@ -705,7 +707,7 @@ def addPyFile(self, path): self.addFile(path) (dirname, filename) = os.path.split(path) # dirname may be directory or HDFS/S3 prefix - if filename.endswith('.zip') or filename.endswith('.ZIP') or filename.endswith('.egg'): + if filename[-4:].lower() in self.PACKAGE_EXTENSIONS: self._python_includes.append(filename) # for tests in local mode sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), filename)) From 560d13bee3be24104d2725a31e8036d9b8448e08 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 17 Feb 2015 10:47:06 -0800 Subject: [PATCH 3/8] before PR --- .../org/apache/spark/deploy/SparkSubmit.scala | 17 ++++++++++++----- python/pyspark/context.py | 3 ++- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 78913cc2c84af..f598cd93f87f2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -263,10 +263,12 @@ object SparkSubmit { } else { args.jars += s",$resolvedMavenCoordinates" } - if (args.pyFiles == null || args.pyFiles.trim.isEmpty) { - args.pyFiles = resolvedMavenCoordinates - } else { - args.pyFiles += s",$resolvedMavenCoordinates" + if (args.isPython) { + if (args.pyFiles == null || args.pyFiles.trim.isEmpty) { + args.pyFiles = resolvedMavenCoordinates + } else { + args.pyFiles += s",$resolvedMavenCoordinates" + } } } @@ -809,14 +811,19 @@ private[spark] object SparkSubmitUtils { val md = getModuleDescriptor md.setDefaultConf(ivyConfName) - // Add an exclusion rule for Spark + // Add an exclusion rule for Spark and Scala Library val sparkArtifacts = new ArtifactId(new ModuleId("org.apache.spark", "*"), "*", "*", "*") val sparkDependencyExcludeRule = new DefaultExcludeRule(sparkArtifacts, ivySettings.getMatcher("glob"), null) sparkDependencyExcludeRule.addConfiguration(ivyConfName) + val scalaArtifacts = new ArtifactId(new ModuleId("*", "scala-library"), "*", "*", "*") + val scalaDependencyExcludeRule = + new DefaultExcludeRule(scalaArtifacts, ivySettings.getMatcher("glob"), null) + scalaDependencyExcludeRule.addConfiguration(ivyConfName) // Exclude any Spark dependencies, and add all supplied maven artifacts as dependencies md.addExcludeRule(sparkDependencyExcludeRule) + md.addExcludeRule(scalaDependencyExcludeRule) addDependenciesToIvy(md, artifacts, ivyConfName) // resolve dependencies diff --git a/python/pyspark/context.py b/python/pyspark/context.py index bf1f61c8504ed..d3846f5a2e69c 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -185,7 +185,8 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, for path in self._conf.get("spark.submit.pyFiles", "").split(","): if path != "": (dirname, filename) = os.path.split(path) - if filename.lower().endswith("zip") or filename.lower().endswith("egg"): + if filename.lower().endswith("zip") or filename.lower().endswith("egg") \ + or filename.lower().endswith("jar"): self._python_includes.append(filename) sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), filename)) From 4ef4046261fe43f59523f3fb84c593b7ebefe3e8 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 17 Feb 2015 13:05:01 -0800 Subject: [PATCH 4/8] ready for PR --- python/pyspark/tests.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index d5d67381e1843..b42bd266da402 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -1451,7 +1451,7 @@ def create_spark_package(self, artifact_name): | """ % (group_id, artifact_id, version)).lstrip(), os.path.join(group_id, artifact_id, version)) - self.createFileInZip("%s/%s.py" % (artifact_id, artifact_id), """ + self.createFileInZip("%s.py" % artifact_id, """ |def myfunc(x): | return x + 1 """, ".jar", os.path.join(group_id, artifact_id, version), @@ -1539,8 +1539,6 @@ def test_package_dependency(self): "file:" + self.programDir, script], stdout=subprocess.PIPE) out, err = proc.communicate() - print "\n\nout: %s\n\n" % out - print "\n\nerr: %s\n\n" % err self.assertEqual(0, proc.returncode) self.assertIn("[2, 3, 4]", out) From a8bd6b789e3f37a38d0fa9456d9d4c30b1a8d56d Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 17 Feb 2015 13:13:04 -0800 Subject: [PATCH 5/8] submit PR --- .../scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala | 2 +- python/pyspark/tests.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala index 53665350013cd..4dc5da6639a2d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala @@ -58,7 +58,7 @@ class SparkSubmitUtilsSuite extends FunSuite with BeforeAndAfterAll { test("create repo resolvers") { val resolver1 = SparkSubmitUtils.createRepoResolvers(None) // should have central by default - assert(resolver1.getResolvers.size() === 1) + assert(resolver1.getResolvers.size() === 2) assert(resolver1.getResolvers.get(0).asInstanceOf[IBiblioResolver].getName === "central") val repos = "a/1,b/2,c/3" diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index b42bd266da402..c3a405f5654cf 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -1553,8 +1553,8 @@ def test_package_dependency_on_cluster(self): """) self.create_spark_package("a:mylib:0.1") proc = subprocess.Popen([self.sparkSubmit, "--packages", "a:mylib:0.1", "--repositories", - "file:" + self.programDir, "--master", "local-cluster[1,1,512]", script], - stdout=subprocess.PIPE) + "file:" + self.programDir, "--master", + "local-cluster[1,1,512]", script], stdout=subprocess.PIPE) out, err = proc.communicate() self.assertEqual(0, proc.returncode) self.assertIn("[2, 3, 4]", out) From c07b81e0be3befc0c5ebf7eaee3f640eb904afc5 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 17 Feb 2015 13:30:55 -0800 Subject: [PATCH 6/8] fixed pep8 --- python/pyspark/tests.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index c3a405f5654cf..0fdc1b8d6cf65 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -1435,26 +1435,26 @@ def createFileInZip(self, name, content, ext=".zip", dir=None, zip_name=None): zip.writestr(name, content) zip.close() return path - + def create_spark_package(self, artifact_name): group_id, artifact_id, version = artifact_name.split(":") self.createTempFile("%s-%s.pom" % (artifact_id, version), (""" | | | 4.0.0 | %s | %s | %s | - """ % (group_id, artifact_id, version)).lstrip(), - os.path.join(group_id, artifact_id, version)) + """ % (group_id, artifact_id, version)).lstrip(), + os.path.join(group_id, artifact_id, version)) self.createFileInZip("%s.py" % artifact_id, """ |def myfunc(x): | return x + 1 - """, ".jar", os.path.join(group_id, artifact_id, version), + """, ".jar", os.path.join(group_id, artifact_id, version), "%s-%s" % (artifact_id, version)) def test_single_script(self): @@ -1537,7 +1537,6 @@ def test_package_dependency(self): self.create_spark_package("a:mylib:0.1") proc = subprocess.Popen([self.sparkSubmit, "--packages", "a:mylib:0.1", "--repositories", "file:" + self.programDir, script], stdout=subprocess.PIPE) - out, err = proc.communicate() self.assertEqual(0, proc.returncode) self.assertIn("[2, 3, 4]", out) @@ -1553,7 +1552,7 @@ def test_package_dependency_on_cluster(self): """) self.create_spark_package("a:mylib:0.1") proc = subprocess.Popen([self.sparkSubmit, "--packages", "a:mylib:0.1", "--repositories", - "file:" + self.programDir, "--master", + "file:" + self.programDir, "--master", "local-cluster[1,1,512]", script], stdout=subprocess.PIPE) out, err = proc.communicate() self.assertEqual(0, proc.returncode) From 64cb8ee191ffc3059f8d43222bee893e87a022bf Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 17 Feb 2015 13:38:16 -0800 Subject: [PATCH 7/8] passed pep8 on local --- python/pyspark/tests.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 0fdc1b8d6cf65..25565733ca9fc 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -1449,8 +1449,8 @@ def create_spark_package(self, artifact_name): | %s | %s | - """ % (group_id, artifact_id, version)).lstrip(), - os.path.join(group_id, artifact_id, version)) + """ % (group_id, artifact_id, version)).lstrip(), + os.path.join(group_id, artifact_id, version)) self.createFileInZip("%s.py" % artifact_id, """ |def myfunc(x): | return x + 1 From 56ccccd2b9d765345c4ec71fb4492f7de7e90784 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 17 Feb 2015 14:40:59 -0800 Subject: [PATCH 8/8] fixed broken test --- .../apache/spark/deploy/SparkSubmitUtilsSuite.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala index 4dc5da6639a2d..ad62b35f624f6 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala @@ -57,20 +57,23 @@ class SparkSubmitUtilsSuite extends FunSuite with BeforeAndAfterAll { test("create repo resolvers") { val resolver1 = SparkSubmitUtils.createRepoResolvers(None) - // should have central by default + // should have central and spark-packages by default assert(resolver1.getResolvers.size() === 2) assert(resolver1.getResolvers.get(0).asInstanceOf[IBiblioResolver].getName === "central") + assert(resolver1.getResolvers.get(1).asInstanceOf[IBiblioResolver].getName === "spark-packages") val repos = "a/1,b/2,c/3" val resolver2 = SparkSubmitUtils.createRepoResolvers(Option(repos)) - assert(resolver2.getResolvers.size() === 4) + assert(resolver2.getResolvers.size() === 5) val expected = repos.split(",").map(r => s"$r/") resolver2.getResolvers.toArray.zipWithIndex.foreach { case (resolver: IBiblioResolver, i) => if (i == 0) { assert(resolver.getName === "central") + } else if (i == 1) { + assert(resolver.getName === "spark-packages") } else { - assert(resolver.getName === s"repo-$i") - assert(resolver.getRoot === expected(i - 1)) + assert(resolver.getName === s"repo-${i - 1}") + assert(resolver.getRoot === expected(i - 2)) } } }