Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 65 additions & 25 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

package org.apache.spark.deploy

import java.io.{File, PrintStream}
import java.io.{File, IOException, PrintStream}
import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowableException}
import java.net.URL
import java.security.PrivilegedExceptionAction
import java.text.ParseException

import scala.annotation.tailrec
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
Expand Down Expand Up @@ -291,8 +292,12 @@ object SparkSubmit {
} else {
Nil
}

val ivySettings = Option(args.ivySettingsFile).map(SparkSubmitUtils.loadIvySettings).getOrElse(
SparkSubmitUtils.buildIvySettings(Option(args.repositories), Option(args.ivyRepoPath)))

val resolvedMavenCoordinates = SparkSubmitUtils.resolveMavenCoordinates(args.packages,
Option(args.repositories), Option(args.ivyRepoPath), exclusions = exclusions)
ivySettings, exclusions = exclusions)
if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
if (args.isPython) {
Expand Down Expand Up @@ -445,6 +450,8 @@ object SparkSubmit {
sysProp = "spark.submit.deployMode"),
OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"),
OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars.ivy"),
OptionAssigner(args.ivySettingsFile, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
sysProp = "spark.ivy.settings"),
OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT,
sysProp = "spark.driver.memory"),
OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
Expand Down Expand Up @@ -975,23 +982,71 @@ private[spark] object SparkSubmitUtils {
}
}

/**
* Build Ivy Settings using options with default resolvers
* @param remoteRepos Comma-delimited string of remote repositories other than maven central
* @param ivyPath The path to the local ivy repository
* @return An IvySettings object
*/
def buildIvySettings(
remoteRepos: Option[String],
ivyPath: Option[String]): IvySettings = {
val ivySettings: IvySettings = new IvySettings

// set ivy settings for location of cache
val alternateIvyCache = ivyPath.getOrElse("")
val packagesDirectory: File =
if (alternateIvyCache == null || alternateIvyCache.trim.isEmpty) {
new File(ivySettings.getDefaultIvyUserDir, "jars")
} else {
ivySettings.setDefaultIvyUserDir(new File(alternateIvyCache))
ivySettings.setDefaultCache(new File(alternateIvyCache, "cache"))
new File(alternateIvyCache, "jars")
}

// create a pattern matcher
ivySettings.addMatcher(new GlobPatternMatcher)
// create the dependency resolvers
val repoResolver = createRepoResolvers(remoteRepos, ivySettings)
ivySettings.addResolver(repoResolver)
ivySettings.setDefaultResolver(repoResolver.getName)
ivySettings
}

/**
* Load Ivy settings from a given filename, using supplied resolvers
* @param settingsFile Path to Ivy settings file
* @return An IvySettings object
*/
def loadIvySettings(settingsFile: String): IvySettings = {
val file = new File(settingsFile)
require(file.exists(), s"Ivy settings file $file does not exist")
require(file.isFile(), s"Ivy settings file $file is not a normal file")
val ivySettings: IvySettings = new IvySettings
try {
ivySettings.load(file)
} catch {
case e @ (_: IOException | _: ParseException) =>
throw new SparkException(s"Failed when loading Ivy settings from $settingsFile", e)
}
ivySettings
}

/** A nice function to use in tests as well. Values are dummy strings. */
def getModuleDescriptor: DefaultModuleDescriptor = DefaultModuleDescriptor.newDefaultInstance(
ModuleRevisionId.newInstance("org.apache.spark", "spark-submit-parent", "1.0"))

/**
* Resolves any dependencies that were supplied through maven coordinates
* @param coordinates Comma-delimited string of maven coordinates
* @param remoteRepos Comma-delimited string of remote repositories other than maven central
* @param ivyPath The path to the local ivy repository
* @param ivySettings An IvySettings containing resolvers to use
* @param exclusions Exclusions to apply when resolving transitive dependencies
* @return The comma-delimited path to the jars of the given maven artifacts including their
* transitive dependencies
*/
def resolveMavenCoordinates(
coordinates: String,
remoteRepos: Option[String],
ivyPath: Option[String],
ivySettings: IvySettings,
exclusions: Seq[String] = Nil,
isTest: Boolean = false): String = {
if (coordinates == null || coordinates.trim.isEmpty) {
Expand All @@ -1002,32 +1057,14 @@ private[spark] object SparkSubmitUtils {
// To prevent ivy from logging to system out
System.setOut(printStream)
val artifacts = extractMavenCoordinates(coordinates)
// Default configuration name for ivy
val ivyConfName = "default"
// set ivy settings for location of cache
val ivySettings: IvySettings = new IvySettings
// Directories for caching downloads through ivy and storing the jars when maven coordinates
// are supplied to spark-submit
val alternateIvyCache = ivyPath.getOrElse("")
val packagesDirectory: File =
if (alternateIvyCache == null || alternateIvyCache.trim.isEmpty) {
new File(ivySettings.getDefaultIvyUserDir, "jars")
} else {
ivySettings.setDefaultIvyUserDir(new File(alternateIvyCache))
ivySettings.setDefaultCache(new File(alternateIvyCache, "cache"))
new File(alternateIvyCache, "jars")
}
val packagesDirectory: File = new File(ivySettings.getDefaultIvyUserDir, "jars")
// scalastyle:off println
printStream.println(
s"Ivy Default Cache set to: ${ivySettings.getDefaultCache.getAbsolutePath}")
printStream.println(s"The jars for the packages stored in: $packagesDirectory")
// scalastyle:on println
// create a pattern matcher
ivySettings.addMatcher(new GlobPatternMatcher)
// create the dependency resolvers
val repoResolver = createRepoResolvers(remoteRepos, ivySettings)
ivySettings.addResolver(repoResolver)
ivySettings.setDefaultResolver(repoResolver.getName)

val ivy = Ivy.newInstance(ivySettings)
// Set resolve options to download transitive dependencies as well
Expand All @@ -1043,6 +1080,9 @@ private[spark] object SparkSubmitUtils {
resolveOptions.setDownload(true)
}

// Default configuration name for ivy
val ivyConfName = "default"

// A Module descriptor must be specified. Entries are dummy strings
val md = getModuleDescriptor
// clear ivy resolution from previous launches. The resolution file is usually at
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
var packages: String = null
var repositories: String = null
var ivyRepoPath: String = null
var ivySettingsFile: String = null
var packagesExclusions: String = null
var verbose: Boolean = false
var isPython: Boolean = false
Expand Down Expand Up @@ -175,6 +176,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
jars = Option(jars).orElse(sparkProperties.get("spark.jars")).orNull
files = Option(files).orElse(sparkProperties.get("spark.files")).orNull
ivyRepoPath = sparkProperties.get("spark.jars.ivy").orNull
ivySettingsFile = sparkProperties.get("spark.ivy.settings").orNull
packages = Option(packages).orElse(sparkProperties.get("spark.jars.packages")).orNull
packagesExclusions = Option(packagesExclusions)
.orElse(sparkProperties.get("spark.jars.excludes")).orNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,10 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
val main = MavenCoordinate("my.awesome.lib", "mylib", "0.1")
IvyTestUtils.withRepository(main, None, None) { repo =>
// end to end
val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, Option(repo),
Option(tempIvyPath), isTest = true)
val jarPath = SparkSubmitUtils.resolveMavenCoordinates(
main.toString,
SparkSubmitUtils.buildIvySettings(Option(repo), Option(tempIvyPath)),
isTest = true)
assert(jarPath.indexOf(tempIvyPath) >= 0, "should use non-default ivy path")
}
}
Expand All @@ -137,7 +139,9 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
val dep = "my.great.dep:mydep:0.5"
// Local M2 repository
IvyTestUtils.withRepository(main, Some(dep), Some(SparkSubmitUtils.m2Path)) { repo =>
val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None, None,
val jarPath = SparkSubmitUtils.resolveMavenCoordinates(
main.toString,
SparkSubmitUtils.buildIvySettings(None, None),
isTest = true)
assert(jarPath.indexOf("mylib") >= 0, "should find artifact")
assert(jarPath.indexOf("mydep") >= 0, "should find dependency")
Expand All @@ -146,7 +150,9 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
val settings = new IvySettings
val ivyLocal = new File(settings.getDefaultIvyUserDir, "local" + File.separator)
IvyTestUtils.withRepository(main, Some(dep), Some(ivyLocal), useIvyLayout = true) { repo =>
val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None, None,
val jarPath = SparkSubmitUtils.resolveMavenCoordinates(
main.toString,
SparkSubmitUtils.buildIvySettings(None, None),
isTest = true)
assert(jarPath.indexOf("mylib") >= 0, "should find artifact")
assert(jarPath.indexOf("mydep") >= 0, "should find dependency")
Expand All @@ -156,8 +162,10 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
settings.setDefaultIvyUserDir(new File(tempIvyPath))
IvyTestUtils.withRepository(main, Some(dep), Some(dummyIvyLocal), useIvyLayout = true,
ivySettings = settings) { repo =>
val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None,
Some(tempIvyPath), isTest = true)
val jarPath = SparkSubmitUtils.resolveMavenCoordinates(
main.toString,
SparkSubmitUtils.buildIvySettings(None, Some(tempIvyPath)),
isTest = true)
assert(jarPath.indexOf("mylib") >= 0, "should find artifact")
assert(jarPath.indexOf(tempIvyPath) >= 0, "should be in new ivy path")
assert(jarPath.indexOf("mydep") >= 0, "should find dependency")
Expand All @@ -166,7 +174,10 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {

test("dependency not found throws RuntimeException") {
intercept[RuntimeException] {
SparkSubmitUtils.resolveMavenCoordinates("a:b:c", None, None, isTest = true)
SparkSubmitUtils.resolveMavenCoordinates(
"a:b:c",
SparkSubmitUtils.buildIvySettings(None, None),
isTest = true)
}
}

Expand All @@ -178,12 +189,17 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
components.map(comp => s"org.apache.spark:spark-${comp}2.10:1.2.0").mkString(",") +
",org.apache.spark:spark-core_fake:1.2.0"

val path = SparkSubmitUtils.resolveMavenCoordinates(coordinates, None, None, isTest = true)
val path = SparkSubmitUtils.resolveMavenCoordinates(
coordinates,
SparkSubmitUtils.buildIvySettings(None, None),
isTest = true)
assert(path === "", "should return empty path")
val main = MavenCoordinate("org.apache.spark", "spark-streaming-kafka-assembly_2.10", "1.2.0")
IvyTestUtils.withRepository(main, None, None) { repo =>
val files = SparkSubmitUtils.resolveMavenCoordinates(coordinates + "," + main.toString,
Some(repo), None, isTest = true)
val files = SparkSubmitUtils.resolveMavenCoordinates(
coordinates + "," + main.toString,
SparkSubmitUtils.buildIvySettings(Some(repo), None),
isTest = true)
assert(files.indexOf(main.artifactId) >= 0, "Did not return artifact")
}
}
Expand All @@ -192,8 +208,11 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
val main = new MavenCoordinate("my.great.lib", "mylib", "0.1")
val dep = "my.great.dep:mydep:0.5"
IvyTestUtils.withRepository(main, Some(dep), None) { repo =>
val files = SparkSubmitUtils.resolveMavenCoordinates(main.toString,
Some(repo), None, Seq("my.great.dep:mydep"), isTest = true)
val files = SparkSubmitUtils.resolveMavenCoordinates(
main.toString,
SparkSubmitUtils.buildIvySettings(Some(repo), None),
Seq("my.great.dep:mydep"),
isTest = true)
assert(files.indexOf(main.artifactId) >= 0, "Did not return artifact")
assert(files.indexOf("my.great.dep") < 0, "Returned excluded artifact")
}
Expand Down
17 changes: 14 additions & 3 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -406,9 +406,10 @@ Apart from these, the following properties are also available, and may be useful
<td></td>
<td>
Comma-separated list of maven coordinates of jars to include on the driver and executor
classpaths. Will search the local maven repo, then maven central and any additional remote
repositories given by <code>spark.jars.ivy</code>. The format for the coordinates should be
groupId:artifactId:version.
classpaths. If <code>spark.ivy.settings</code> is given artifacts will be resolved according
to the configuration in the file, otherwise artifacts will be searched for in the local maven repo,
then maven central and finally any additional remote repositories given by <code>spark.jars.ivy</code>.
The format for the coordinates should be groupId:artifactId:version.
</td>
</tr>
<tr>
Expand All @@ -427,6 +428,16 @@ Apart from these, the following properties are also available, and may be useful
with <code>spark.jars.packages</code>.
</td>
</tr>
<tr>
<td><code>spark.ivy.settings</code></td>
<td></td>
<td>
Path to an Ivy settings file to customize resolution of jars specified using <code>spark.jars.packages</code>.
Will override <code>spark.jars.ivy</code>, so they shouldn't be used together. Useful for allowing Spark to
resolve artifacts from behind a firewall e.g. via an in-house artifact server like Artifactory. Details on
the settings file format can be found at http://ant.apache.org/ivy/history/latest-milestone/settings.html
</td>
</tr>
<tr>
<td><code>spark.pyspark.driver.python</code></td>
<td></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,9 @@ private[hive] object IsolatedClientLoader extends Logging {
val classpath = quietly {
SparkSubmitUtils.resolveMavenCoordinates(
hiveArtifacts.mkString(","),
Some("http://www.datanucleus.org/downloads/maven2"),
ivyPath,
SparkSubmitUtils.buildIvySettings(
Some("http://www.datanucleus.org/downloads/maven2"),
ivyPath),
exclusions = version.exclusions)
}
val allFiles = classpath.split(",").map(new File(_)).toSet
Expand Down