Skip to content

Commit 6aed719

Browse files
brkyvzpwendell
authored andcommitted
[SPARK-5341] Use maven coordinates as dependencies in spark-shell and spark-submit
This PR adds support for using maven coordinates as dependencies to spark-shell. Coordinates can be provided as a comma-delimited string after the flag `--packages`. Additional remote repositories (like SonaType) can be supplied as a comma-delimited string after the flag `--repositories`. Uses the Ivy library to resolve dependencies. Unfortunately the library has no decent documentation, therefore solving more complex dependency issues can be a problem. pwendell, mateiz, mengxr **Note: This is still a WIP. The following need to be handled:** - [x] add docs for the methods - [x] take local ivy cache path as an argument - [x] add tests - [x] add Windows compatibility - [x] exclude unused Ivy dependencies Author: Burak Yavuz <[email protected]> Closes apache#4215 from brkyvz/SPARK-5341ivy and squashes the following commits: 9215851 [Burak Yavuz] ready to merge db2a5cc [Burak Yavuz] changed logging to printStream 9dae87f [Burak Yavuz] file separators changed 71c374d [Burak Yavuz] merge conflicts fixed c08dc9f [Burak Yavuz] fixed merge conflicts 3ada19a [Burak Yavuz] fixed Jenkins error (hopefully) and added comment on oro 43c2290 [Burak Yavuz] fixed that ONE line 231f72f [Burak Yavuz] addressed code review 2cd6562 [Burak Yavuz] Merge branch 'master' of github.com:apache/spark into SPARK-5341ivy 85ec5a3 [Burak Yavuz] added oro as a dependency explicitly ea44ca4 [Burak Yavuz] add oro back to dependencies cef0e24 [Burak Yavuz] IntelliJ is just messing things up 97c4a92 [Burak Yavuz] fix more weird IntelliJ formatting 9cf077d [Burak Yavuz] fix weird IntelliJ formatting dcf5e13 [Burak Yavuz] fix windows command line flags 3a23f21 [Burak Yavuz] excluded ivy dependencies 53423e0 [Burak Yavuz] tests added 3705907 [Burak Yavuz] remove ivy-repo as a command line argument. Use global ivy cache as default c04d885 [Burak Yavuz] take path to ivy cache as a conf 2edc9b5 [Burak Yavuz] managed to exclude Spark and it's dependencies a0870af [Burak Yavuz] add docs. remove unnecesary new lines 6645af4 [Burak Yavuz] [SPARK-5341] added base implementation 882c4c8 [Burak Yavuz] added maven dependency download
1 parent 83de71c commit 6aed719

File tree

9 files changed

+404
-10
lines changed

9 files changed

+404
-10
lines changed

bin/utils.sh

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,14 @@ function gatherSparkSubmitOpts() {
2626
exit 1
2727
fi
2828

29-
# NOTE: If you add or remove spark-sumbmit options,
29+
# NOTE: If you add or remove spark-submit options,
3030
# modify NOT ONLY this script but also SparkSubmitArgument.scala
3131
SUBMISSION_OPTS=()
3232
APPLICATION_OPTS=()
3333
while (($#)); do
3434
case "$1" in
35-
--master | --deploy-mode | --class | --name | --jars | --py-files | --files | \
36-
--conf | --properties-file | --driver-memory | --driver-java-options | \
35+
--master | --deploy-mode | --class | --name | --jars | --packages | --py-files | --files | \
36+
--conf | --repositories | --properties-file | --driver-memory | --driver-java-options | \
3737
--driver-library-path | --driver-class-path | --executor-memory | --driver-cores | \
3838
--total-executor-cores | --executor-cores | --queue | --num-executors | --archives)
3939
if [[ $# -lt 2 ]]; then

bin/windows-utils.cmd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ SET opts="\<--master\> \<--deploy-mode\> \<--class\> \<--name\> \<--jars\> \<--p
3232
SET opts="%opts:~1,-1% \<--conf\> \<--properties-file\> \<--driver-memory\> \<--driver-java-options\>"
3333
SET opts="%opts:~1,-1% \<--driver-library-path\> \<--driver-class-path\> \<--executor-memory\>"
3434
SET opts="%opts:~1,-1% \<--driver-cores\> \<--total-executor-cores\> \<--executor-cores\> \<--queue\>"
35-
SET opts="%opts:~1,-1% \<--num-executors\> \<--archives\>"
35+
SET opts="%opts:~1,-1% \<--num-executors\> \<--archives\> \<--packages\> \<--repositories\>"
3636

3737
echo %1 | findstr %opts% >nul
3838
if %ERRORLEVEL% equ 0 (

core/pom.xml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,17 @@
241241
<artifactId>derby</artifactId>
242242
<scope>test</scope>
243243
</dependency>
244+
<dependency>
245+
<groupId>org.apache.ivy</groupId>
246+
<artifactId>ivy</artifactId>
247+
<version>${ivy.version}</version>
248+
</dependency>
249+
<dependency>
250+
<groupId>oro</groupId>
251+
<!-- oro is needed by ivy, but only listed as an optional dependency, so we include it. -->
252+
<artifactId>oro</artifactId>
253+
<version>${oro.version}</version>
254+
</dependency>
244255
<dependency>
245256
<groupId>org.tachyonproject</groupId>
246257
<artifactId>tachyon-client</artifactId>

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1231,7 +1231,19 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
12311231
null
12321232
}
12331233
} else {
1234-
env.httpFileServer.addJar(new File(uri.getPath))
1234+
try {
1235+
env.httpFileServer.addJar(new File(uri.getPath))
1236+
} catch {
1237+
case exc: FileNotFoundException =>
1238+
logError(s"Jar not found at $path")
1239+
null
1240+
case e: Exception =>
1241+
// For now just log an error but allow to go through so spark examples work.
1242+
// The spark examples don't really need the jar distributed since its also
1243+
// the app jar.
1244+
logError("Error adding jar (" + e + "), was the --addJars option used?")
1245+
null
1246+
}
12351247
}
12361248
// A JAR file which exists locally on every worker node
12371249
case "local" =>

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 215 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,17 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
2525

2626
import org.apache.hadoop.fs.Path
2727

28+
import org.apache.ivy.Ivy
29+
import org.apache.ivy.core.LogOptions
30+
import org.apache.ivy.core.module.descriptor.{DefaultExcludeRule, DefaultDependencyDescriptor, DefaultModuleDescriptor}
31+
import org.apache.ivy.core.module.id.{ModuleId, ArtifactId, ModuleRevisionId}
32+
import org.apache.ivy.core.report.ResolveReport
33+
import org.apache.ivy.core.resolve.{IvyNode, ResolveOptions}
34+
import org.apache.ivy.core.retrieve.RetrieveOptions
35+
import org.apache.ivy.core.settings.IvySettings
36+
import org.apache.ivy.plugins.matcher.GlobPatternMatcher
37+
import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver}
38+
2839
import org.apache.spark.executor.ExecutorURLClassLoader
2940
import org.apache.spark.util.Utils
3041

@@ -194,6 +205,18 @@ object SparkSubmit {
194205
// Special flag to avoid deprecation warnings at the client
195206
sysProps("SPARK_SUBMIT") = "true"
196207

208+
// Resolve maven dependencies if there are any and add classpath to jars
209+
val resolvedMavenCoordinates =
210+
SparkSubmitUtils.resolveMavenCoordinates(
211+
args.packages, Option(args.repositories), Option(args.ivyRepoPath))
212+
if (!resolvedMavenCoordinates.trim.isEmpty) {
213+
if (args.jars == null || args.jars.trim.isEmpty) {
214+
args.jars = resolvedMavenCoordinates
215+
} else {
216+
args.jars += s",$resolvedMavenCoordinates"
217+
}
218+
}
219+
197220
// A list of rules to map each argument to system properties or command-line options in
198221
// each deploy mode; we iterate through these below
199222
val options = List[OptionAssigner](
@@ -202,6 +225,7 @@ object SparkSubmit {
202225
OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.master"),
203226
OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"),
204227
OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars"),
228+
OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars.ivy"),
205229
OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT,
206230
sysProp = "spark.driver.memory"),
207231
OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
@@ -213,6 +237,7 @@ object SparkSubmit {
213237

214238
// Standalone cluster only
215239
OptionAssigner(args.jars, STANDALONE, CLUSTER, sysProp = "spark.jars"),
240+
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy"),
216241
OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, clOption = "--memory"),
217242
OptionAssigner(args.driverCores, STANDALONE, CLUSTER, clOption = "--cores"),
218243

@@ -384,8 +409,8 @@ object SparkSubmit {
384409
case e: ClassNotFoundException =>
385410
e.printStackTrace(printStream)
386411
if (childMainClass.contains("thriftserver")) {
387-
println(s"Failed to load main class $childMainClass.")
388-
println("You need to build Spark with -Phive and -Phive-thriftserver.")
412+
printStream.println(s"Failed to load main class $childMainClass.")
413+
printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
389414
}
390415
System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
391416
}
@@ -475,6 +500,194 @@ object SparkSubmit {
475500
}
476501
}
477502

503+
/** Provides utility functions to be used inside SparkSubmit. */
504+
private[spark] object SparkSubmitUtils {
505+
506+
// Exposed for testing
507+
private[spark] var printStream = SparkSubmit.printStream
508+
509+
/**
510+
* Represents a Maven Coordinate
511+
* @param groupId the groupId of the coordinate
512+
* @param artifactId the artifactId of the coordinate
513+
* @param version the version of the coordinate
514+
*/
515+
private[spark] case class MavenCoordinate(groupId: String, artifactId: String, version: String)
516+
517+
/**
518+
* Extracts maven coordinates from a comma-delimited string
519+
* @param coordinates Comma-delimited string of maven coordinates
520+
* @return Sequence of Maven coordinates
521+
*/
522+
private[spark] def extractMavenCoordinates(coordinates: String): Seq[MavenCoordinate] = {
523+
coordinates.split(",").map { p =>
524+
val splits = p.split(":")
525+
require(splits.length == 3, s"Provided Maven Coordinates must be in the form " +
526+
s"'groupId:artifactId:version'. The coordinate provided is: $p")
527+
require(splits(0) != null && splits(0).trim.nonEmpty, s"The groupId cannot be null or " +
528+
s"be whitespace. The groupId provided is: ${splits(0)}")
529+
require(splits(1) != null && splits(1).trim.nonEmpty, s"The artifactId cannot be null or " +
530+
s"be whitespace. The artifactId provided is: ${splits(1)}")
531+
require(splits(2) != null && splits(2).trim.nonEmpty, s"The version cannot be null or " +
532+
s"be whitespace. The version provided is: ${splits(2)}")
533+
new MavenCoordinate(splits(0), splits(1), splits(2))
534+
}
535+
}
536+
537+
/**
538+
* Extracts maven coordinates from a comma-delimited string
539+
* @param remoteRepos Comma-delimited string of remote repositories
540+
* @return A ChainResolver used by Ivy to search for and resolve dependencies.
541+
*/
542+
private[spark] def createRepoResolvers(remoteRepos: Option[String]): ChainResolver = {
543+
// We need a chain resolver if we want to check multiple repositories
544+
val cr = new ChainResolver
545+
cr.setName("list")
546+
547+
// the biblio resolver resolves POM declared dependencies
548+
val br: IBiblioResolver = new IBiblioResolver
549+
br.setM2compatible(true)
550+
br.setUsepoms(true)
551+
br.setName("central")
552+
cr.add(br)
553+
554+
val repositoryList = remoteRepos.getOrElse("")
555+
// add any other remote repositories other than maven central
556+
if (repositoryList.trim.nonEmpty) {
557+
repositoryList.split(",").zipWithIndex.foreach { case (repo, i) =>
558+
val brr: IBiblioResolver = new IBiblioResolver
559+
brr.setM2compatible(true)
560+
brr.setUsepoms(true)
561+
brr.setRoot(repo)
562+
brr.setName(s"repo-${i + 1}")
563+
cr.add(brr)
564+
printStream.println(s"$repo added as a remote repository with the name: ${brr.getName}")
565+
}
566+
}
567+
cr
568+
}
569+
570+
/**
571+
* Output a comma-delimited list of paths for the downloaded jars to be added to the classpath
572+
* (will append to jars in SparkSubmit). The name of the jar is given
573+
* after a '!' by Ivy. It also sometimes contains '(bundle)' after '.jar'. Remove that as well.
574+
* @param artifacts Sequence of dependencies that were resolved and retrieved
575+
* @param cacheDirectory directory where jars are cached
576+
* @return a comma-delimited list of paths for the dependencies
577+
*/
578+
private[spark] def resolveDependencyPaths(
579+
artifacts: Array[AnyRef],
580+
cacheDirectory: File): String = {
581+
artifacts.map { artifactInfo =>
582+
val artifactString = artifactInfo.toString
583+
val jarName = artifactString.drop(artifactString.lastIndexOf("!") + 1)
584+
cacheDirectory.getAbsolutePath + File.separator +
585+
jarName.substring(0, jarName.lastIndexOf(".jar") + 4)
586+
}.mkString(",")
587+
}
588+
589+
/** Adds the given maven coordinates to Ivy's module descriptor. */
590+
private[spark] def addDependenciesToIvy(
591+
md: DefaultModuleDescriptor,
592+
artifacts: Seq[MavenCoordinate],
593+
ivyConfName: String): Unit = {
594+
artifacts.foreach { mvn =>
595+
val ri = ModuleRevisionId.newInstance(mvn.groupId, mvn.artifactId, mvn.version)
596+
val dd = new DefaultDependencyDescriptor(ri, false, false)
597+
dd.addDependencyConfiguration(ivyConfName, ivyConfName)
598+
printStream.println(s"${dd.getDependencyId} added as a dependency")
599+
md.addDependency(dd)
600+
}
601+
}
602+
603+
/** A nice function to use in tests as well. Values are dummy strings. */
604+
private[spark] def getModuleDescriptor = DefaultModuleDescriptor.newDefaultInstance(
605+
ModuleRevisionId.newInstance("org.apache.spark", "spark-submit-parent", "1.0"))
606+
607+
/**
608+
* Resolves any dependencies that were supplied through maven coordinates
609+
* @param coordinates Comma-delimited string of maven coordinates
610+
* @param remoteRepos Comma-delimited string of remote repositories other than maven central
611+
* @param ivyPath The path to the local ivy repository
612+
* @return The comma-delimited path to the jars of the given maven artifacts including their
613+
* transitive dependencies
614+
*/
615+
private[spark] def resolveMavenCoordinates(
616+
coordinates: String,
617+
remoteRepos: Option[String],
618+
ivyPath: Option[String],
619+
isTest: Boolean = false): String = {
620+
if (coordinates == null || coordinates.trim.isEmpty) {
621+
""
622+
} else {
623+
val artifacts = extractMavenCoordinates(coordinates)
624+
// Default configuration name for ivy
625+
val ivyConfName = "default"
626+
// set ivy settings for location of cache
627+
val ivySettings: IvySettings = new IvySettings
628+
// Directories for caching downloads through ivy and storing the jars when maven coordinates
629+
// are supplied to spark-submit
630+
val alternateIvyCache = ivyPath.getOrElse("")
631+
val packagesDirectory: File =
632+
if (alternateIvyCache.trim.isEmpty) {
633+
new File(ivySettings.getDefaultIvyUserDir, "jars")
634+
} else {
635+
ivySettings.setDefaultCache(new File(alternateIvyCache, "cache"))
636+
new File(alternateIvyCache, "jars")
637+
}
638+
printStream.println(
639+
s"Ivy Default Cache set to: ${ivySettings.getDefaultCache.getAbsolutePath}")
640+
printStream.println(s"The jars for the packages stored in: $packagesDirectory")
641+
// create a pattern matcher
642+
ivySettings.addMatcher(new GlobPatternMatcher)
643+
// create the dependency resolvers
644+
val repoResolver = createRepoResolvers(remoteRepos)
645+
ivySettings.addResolver(repoResolver)
646+
ivySettings.setDefaultResolver(repoResolver.getName)
647+
648+
val ivy = Ivy.newInstance(ivySettings)
649+
// Set resolve options to download transitive dependencies as well
650+
val resolveOptions = new ResolveOptions
651+
resolveOptions.setTransitive(true)
652+
val retrieveOptions = new RetrieveOptions
653+
// Turn downloading and logging off for testing
654+
if (isTest) {
655+
resolveOptions.setDownload(false)
656+
resolveOptions.setLog(LogOptions.LOG_QUIET)
657+
retrieveOptions.setLog(LogOptions.LOG_QUIET)
658+
} else {
659+
resolveOptions.setDownload(true)
660+
}
661+
662+
// A Module descriptor must be specified. Entries are dummy strings
663+
val md = getModuleDescriptor
664+
md.setDefaultConf(ivyConfName)
665+
666+
// Add an exclusion rule for Spark
667+
val sparkArtifacts = new ArtifactId(new ModuleId("org.apache.spark", "*"), "*", "*", "*")
668+
val sparkDependencyExcludeRule =
669+
new DefaultExcludeRule(sparkArtifacts, ivySettings.getMatcher("glob"), null)
670+
sparkDependencyExcludeRule.addConfiguration(ivyConfName)
671+
672+
// Exclude any Spark dependencies, and add all supplied maven artifacts as dependencies
673+
md.addExcludeRule(sparkDependencyExcludeRule)
674+
addDependenciesToIvy(md, artifacts, ivyConfName)
675+
676+
// resolve dependencies
677+
val rr: ResolveReport = ivy.resolve(md, resolveOptions)
678+
if (rr.hasError) {
679+
throw new RuntimeException(rr.getAllProblemMessages.toString)
680+
}
681+
// retrieve all resolved dependencies
682+
ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId,
683+
packagesDirectory.getAbsolutePath + File.separator + "[artifact](-[classifier]).[ext]",
684+
retrieveOptions.setConfs(Array(ivyConfName)))
685+
686+
resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory)
687+
}
688+
}
689+
}
690+
478691
/**
479692
* Provides an indirection layer for passing arguments as system properties or flags to
480693
* the user's driver program or to downstream launcher tools.

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
5050
var name: String = null
5151
var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
5252
var jars: String = null
53+
var packages: String = null
54+
var repositories: String = null
55+
var ivyRepoPath: String = null
5356
var verbose: Boolean = false
5457
var isPython: Boolean = false
5558
var pyFiles: String = null
@@ -123,6 +126,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
123126
.orNull
124127
name = Option(name).orElse(sparkProperties.get("spark.app.name")).orNull
125128
jars = Option(jars).orElse(sparkProperties.get("spark.jars")).orNull
129+
ivyRepoPath = sparkProperties.get("spark.jars.ivy").orNull
126130
deployMode = Option(deployMode).orElse(env.get("DEPLOY_MODE")).orNull
127131
numExecutors = Option(numExecutors)
128132
.getOrElse(sparkProperties.get("spark.executor.instances").orNull)
@@ -212,6 +216,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
212216
| name $name
213217
| childArgs [${childArgs.mkString(" ")}]
214218
| jars $jars
219+
| packages $packages
220+
| repositories $repositories
215221
| verbose $verbose
216222
|
217223
|Spark properties used, including those specified through
@@ -318,6 +324,14 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
318324
jars = Utils.resolveURIs(value)
319325
parse(tail)
320326

327+
case ("--packages") :: value :: tail =>
328+
packages = value
329+
parse(tail)
330+
331+
case ("--repositories") :: value :: tail =>
332+
repositories = value
333+
parse(tail)
334+
321335
case ("--conf" | "-c") :: value :: tail =>
322336
value.split("=", 2).toSeq match {
323337
case Seq(k, v) => sparkProperties(k) = v
@@ -368,6 +382,13 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
368382
| --name NAME A name of your application.
369383
| --jars JARS Comma-separated list of local jars to include on the driver
370384
| and executor classpaths.
385+
| --packages Comma-separated list of maven coordinates of jars to include
386+
| on the driver and executor classpaths. Will search the local
387+
| maven repo, then maven central and any additional remote
388+
| repositories given by --repositories. The format for the
389+
| coordinates should be groupId:artifactId:version.
390+
| --repositories Comma-separated list of additional remote repositories to
391+
| search for the maven coordinates given with --packages.
371392
| --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place
372393
| on the PYTHONPATH for Python apps.
373394
| --files FILES Comma-separated list of files to be placed in the working

0 commit comments

Comments
 (0)