diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 85f80b6971e80..a980144a75953 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -17,10 +17,11 @@
package org.apache.spark.deploy
-import java.io.{File, PrintStream}
+import java.io.{File, IOException, PrintStream}
import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowableException}
import java.net.URL
import java.security.PrivilegedExceptionAction
+import java.text.ParseException
import scala.annotation.tailrec
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
@@ -283,8 +284,17 @@ object SparkSubmit extends CommandLineUtils {
} else {
Nil
}
+
+ // Create the IvySettings, either load from file or build defaults
+ val ivySettings = args.sparkProperties.get("spark.jars.ivySettings").map { ivySettingsFile =>
+ SparkSubmitUtils.loadIvySettings(ivySettingsFile, Option(args.repositories),
+ Option(args.ivyRepoPath))
+ }.getOrElse {
+ SparkSubmitUtils.buildIvySettings(Option(args.repositories), Option(args.ivyRepoPath))
+ }
+
val resolvedMavenCoordinates = SparkSubmitUtils.resolveMavenCoordinates(args.packages,
- Option(args.repositories), Option(args.ivyRepoPath), exclusions = exclusions)
+ ivySettings, exclusions = exclusions)
if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
if (args.isPython) {
@@ -860,30 +870,13 @@ private[spark] object SparkSubmitUtils {
/**
* Extracts maven coordinates from a comma-delimited string
- * @param remoteRepos Comma-delimited string of remote repositories
- * @param ivySettings The Ivy settings for this session
+ * @param defaultIvyUserDir The default user path for Ivy
* @return A ChainResolver used by Ivy to search for and resolve dependencies.
*/
- def createRepoResolvers(remoteRepos: Option[String], ivySettings: IvySettings): ChainResolver = {
+ def createRepoResolvers(defaultIvyUserDir: File): ChainResolver = {
// We need a chain resolver if we want to check multiple repositories
val cr = new ChainResolver
- cr.setName("list")
-
- val repositoryList = remoteRepos.getOrElse("")
- // add any other remote repositories other than maven central
- if (repositoryList.trim.nonEmpty) {
- repositoryList.split(",").zipWithIndex.foreach { case (repo, i) =>
- val brr: IBiblioResolver = new IBiblioResolver
- brr.setM2compatible(true)
- brr.setUsepoms(true)
- brr.setRoot(repo)
- brr.setName(s"repo-${i + 1}")
- cr.add(brr)
- // scalastyle:off println
- printStream.println(s"$repo added as a remote repository with the name: ${brr.getName}")
- // scalastyle:on println
- }
- }
+ cr.setName("spark-list")
val localM2 = new IBiblioResolver
localM2.setM2compatible(true)
@@ -893,7 +886,7 @@ private[spark] object SparkSubmitUtils {
cr.add(localM2)
val localIvy = new FileSystemResolver
- val localIvyRoot = new File(ivySettings.getDefaultIvyUserDir, "local")
+ val localIvyRoot = new File(defaultIvyUserDir, "local")
localIvy.setLocal(true)
localIvy.setRepository(new FileRepository(localIvyRoot))
val ivyPattern = Seq(localIvyRoot.getAbsolutePath, "[organisation]", "[module]", "[revision]",
@@ -974,6 +967,87 @@ private[spark] object SparkSubmitUtils {
}
}
+ /**
+ * Build Ivy Settings using options with default resolvers
+ * @param remoteRepos Comma-delimited string of remote repositories other than maven central
+ * @param ivyPath The path to the local ivy repository
+ * @return An IvySettings object
+ */
+ def buildIvySettings(remoteRepos: Option[String], ivyPath: Option[String]): IvySettings = {
+ val ivySettings: IvySettings = new IvySettings
+ processIvyPathArg(ivySettings, ivyPath)
+
+ // create a pattern matcher
+ ivySettings.addMatcher(new GlobPatternMatcher)
+ // create the dependency resolvers
+ val repoResolver = createRepoResolvers(ivySettings.getDefaultIvyUserDir)
+ ivySettings.addResolver(repoResolver)
+ ivySettings.setDefaultResolver(repoResolver.getName)
+ processRemoteRepoArg(ivySettings, remoteRepos)
+ ivySettings
+ }
+
+ /**
+ * Load Ivy settings from a given filename, using supplied resolvers
+ * @param settingsFile Path to Ivy settings file
+ * @param remoteRepos Comma-delimited string of remote repositories other than maven central
+ * @param ivyPath The path to the local ivy repository
+ * @return An IvySettings object
+ */
+ def loadIvySettings(
+ settingsFile: String,
+ remoteRepos: Option[String],
+ ivyPath: Option[String]): IvySettings = {
+ val file = new File(settingsFile)
+ require(file.exists(), s"Ivy settings file $file does not exist")
+ require(file.isFile(), s"Ivy settings file $file is not a normal file")
+ val ivySettings: IvySettings = new IvySettings
+ try {
+ ivySettings.load(file)
+ } catch {
+ case e @ (_: IOException | _: ParseException) =>
+ throw new SparkException(s"Failed when loading Ivy settings from $settingsFile", e)
+ }
+ processIvyPathArg(ivySettings, ivyPath)
+ processRemoteRepoArg(ivySettings, remoteRepos)
+ ivySettings
+ }
+
+ /* Set ivy settings for location of cache, if option is supplied */
+ private def processIvyPathArg(ivySettings: IvySettings, ivyPath: Option[String]): Unit = {
+ ivyPath.filterNot(_.trim.isEmpty).foreach { alternateIvyDir =>
+ ivySettings.setDefaultIvyUserDir(new File(alternateIvyDir))
+ ivySettings.setDefaultCache(new File(alternateIvyDir, "cache"))
+ }
+ }
+
+ /* Add any optional additional remote repositories */
+ private def processRemoteRepoArg(ivySettings: IvySettings, remoteRepos: Option[String]): Unit = {
+ remoteRepos.filterNot(_.trim.isEmpty).map(_.split(",")).foreach { repositoryList =>
+ val cr = new ChainResolver
+ cr.setName("user-list")
+
+ // add current default resolver, if any
+ Option(ivySettings.getDefaultResolver).foreach(cr.add)
+
+ // add additional repositories, last resolution in chain takes precedence
+ repositoryList.zipWithIndex.foreach { case (repo, i) =>
+ val brr: IBiblioResolver = new IBiblioResolver
+ brr.setM2compatible(true)
+ brr.setUsepoms(true)
+ brr.setRoot(repo)
+ brr.setName(s"repo-${i + 1}")
+ cr.add(brr)
+ // scalastyle:off println
+ printStream.println(s"$repo added as a remote repository with the name: ${brr.getName}")
+ // scalastyle:on println
+ }
+
+ ivySettings.addResolver(cr)
+ ivySettings.setDefaultResolver(cr.getName)
+ }
+ }
+
/** A nice function to use in tests as well. Values are dummy strings. */
def getModuleDescriptor: DefaultModuleDescriptor = DefaultModuleDescriptor.newDefaultInstance(
ModuleRevisionId.newInstance("org.apache.spark", "spark-submit-parent", "1.0"))
@@ -981,16 +1055,14 @@ private[spark] object SparkSubmitUtils {
/**
* Resolves any dependencies that were supplied through maven coordinates
* @param coordinates Comma-delimited string of maven coordinates
- * @param remoteRepos Comma-delimited string of remote repositories other than maven central
- * @param ivyPath The path to the local ivy repository
+ * @param ivySettings An IvySettings containing resolvers to use
* @param exclusions Exclusions to apply when resolving transitive dependencies
* @return The comma-delimited path to the jars of the given maven artifacts including their
* transitive dependencies
*/
def resolveMavenCoordinates(
coordinates: String,
- remoteRepos: Option[String],
- ivyPath: Option[String],
+ ivySettings: IvySettings,
exclusions: Seq[String] = Nil,
isTest: Boolean = false): String = {
if (coordinates == null || coordinates.trim.isEmpty) {
@@ -1001,32 +1073,14 @@ private[spark] object SparkSubmitUtils {
// To prevent ivy from logging to system out
System.setOut(printStream)
val artifacts = extractMavenCoordinates(coordinates)
- // Default configuration name for ivy
- val ivyConfName = "default"
- // set ivy settings for location of cache
- val ivySettings: IvySettings = new IvySettings
// Directories for caching downloads through ivy and storing the jars when maven coordinates
// are supplied to spark-submit
- val alternateIvyCache = ivyPath.getOrElse("")
- val packagesDirectory: File =
- if (alternateIvyCache == null || alternateIvyCache.trim.isEmpty) {
- new File(ivySettings.getDefaultIvyUserDir, "jars")
- } else {
- ivySettings.setDefaultIvyUserDir(new File(alternateIvyCache))
- ivySettings.setDefaultCache(new File(alternateIvyCache, "cache"))
- new File(alternateIvyCache, "jars")
- }
+ val packagesDirectory: File = new File(ivySettings.getDefaultIvyUserDir, "jars")
// scalastyle:off println
printStream.println(
s"Ivy Default Cache set to: ${ivySettings.getDefaultCache.getAbsolutePath}")
printStream.println(s"The jars for the packages stored in: $packagesDirectory")
// scalastyle:on println
- // create a pattern matcher
- ivySettings.addMatcher(new GlobPatternMatcher)
- // create the dependency resolvers
- val repoResolver = createRepoResolvers(remoteRepos, ivySettings)
- ivySettings.addResolver(repoResolver)
- ivySettings.setDefaultResolver(repoResolver.getName)
val ivy = Ivy.newInstance(ivySettings)
// Set resolve options to download transitive dependencies as well
@@ -1042,6 +1096,9 @@ private[spark] object SparkSubmitUtils {
resolveOptions.setDownload(true)
}
+ // Default configuration name for ivy
+ val ivyConfName = "default"
+
// A Module descriptor must be specified. Entries are dummy strings
val md = getModuleDescriptor
// clear ivy resolution from previous launches. The resolution file is usually at
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala
index 4877710c1237d..266c9d33b5a96 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala
@@ -18,12 +18,14 @@
package org.apache.spark.deploy
import java.io.{File, OutputStream, PrintStream}
+import java.nio.charset.StandardCharsets
import scala.collection.mutable.ArrayBuffer
+import com.google.common.io.Files
import org.apache.ivy.core.module.descriptor.MDArtifact
import org.apache.ivy.core.settings.IvySettings
-import org.apache.ivy.plugins.resolver.{AbstractResolver, FileSystemResolver, IBiblioResolver}
+import org.apache.ivy.plugins.resolver.{AbstractResolver, ChainResolver, FileSystemResolver, IBiblioResolver}
import org.scalatest.BeforeAndAfterAll
import org.apache.spark.SparkFunSuite
@@ -66,22 +68,25 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
test("create repo resolvers") {
val settings = new IvySettings
- val res1 = SparkSubmitUtils.createRepoResolvers(None, settings)
+ val res1 = SparkSubmitUtils.createRepoResolvers(settings.getDefaultIvyUserDir)
// should have central and spark-packages by default
assert(res1.getResolvers.size() === 4)
assert(res1.getResolvers.get(0).asInstanceOf[IBiblioResolver].getName === "local-m2-cache")
assert(res1.getResolvers.get(1).asInstanceOf[FileSystemResolver].getName === "local-ivy-cache")
assert(res1.getResolvers.get(2).asInstanceOf[IBiblioResolver].getName === "central")
assert(res1.getResolvers.get(3).asInstanceOf[IBiblioResolver].getName === "spark-packages")
+ }
+ test("create additional resolvers") {
val repos = "a/1,b/2,c/3"
- val resolver2 = SparkSubmitUtils.createRepoResolvers(Option(repos), settings)
- assert(resolver2.getResolvers.size() === 7)
+ val settings = SparkSubmitUtils.buildIvySettings(Option(repos), None)
+ val resolver = settings.getDefaultResolver.asInstanceOf[ChainResolver]
+ assert(resolver.getResolvers.size() === 4)
val expected = repos.split(",").map(r => s"$r/")
- resolver2.getResolvers.toArray.zipWithIndex.foreach { case (resolver: AbstractResolver, i) =>
- if (i < 3) {
- assert(resolver.getName === s"repo-${i + 1}")
- assert(resolver.asInstanceOf[IBiblioResolver].getRoot === expected(i))
+ resolver.getResolvers.toArray.zipWithIndex.foreach { case (resolver: AbstractResolver, i) =>
+ if (1 < i && i < 3) {
+ assert(resolver.getName === s"repo-$i")
+ assert(resolver.asInstanceOf[IBiblioResolver].getRoot === expected(i - 1))
}
}
}
@@ -126,8 +131,10 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
val main = MavenCoordinate("my.awesome.lib", "mylib", "0.1")
IvyTestUtils.withRepository(main, None, None) { repo =>
// end to end
- val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, Option(repo),
- Option(tempIvyPath), isTest = true)
+ val jarPath = SparkSubmitUtils.resolveMavenCoordinates(
+ main.toString,
+ SparkSubmitUtils.buildIvySettings(Option(repo), Option(tempIvyPath)),
+ isTest = true)
assert(jarPath.indexOf(tempIvyPath) >= 0, "should use non-default ivy path")
}
}
@@ -137,7 +144,9 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
val dep = "my.great.dep:mydep:0.5"
// Local M2 repository
IvyTestUtils.withRepository(main, Some(dep), Some(SparkSubmitUtils.m2Path)) { repo =>
- val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None, None,
+ val jarPath = SparkSubmitUtils.resolveMavenCoordinates(
+ main.toString,
+ SparkSubmitUtils.buildIvySettings(None, None),
isTest = true)
assert(jarPath.indexOf("mylib") >= 0, "should find artifact")
assert(jarPath.indexOf("mydep") >= 0, "should find dependency")
@@ -146,7 +155,9 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
val settings = new IvySettings
val ivyLocal = new File(settings.getDefaultIvyUserDir, "local" + File.separator)
IvyTestUtils.withRepository(main, Some(dep), Some(ivyLocal), useIvyLayout = true) { repo =>
- val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None, None,
+ val jarPath = SparkSubmitUtils.resolveMavenCoordinates(
+ main.toString,
+ SparkSubmitUtils.buildIvySettings(None, None),
isTest = true)
assert(jarPath.indexOf("mylib") >= 0, "should find artifact")
assert(jarPath.indexOf("mydep") >= 0, "should find dependency")
@@ -156,8 +167,10 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
settings.setDefaultIvyUserDir(new File(tempIvyPath))
IvyTestUtils.withRepository(main, Some(dep), Some(dummyIvyLocal), useIvyLayout = true,
ivySettings = settings) { repo =>
- val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None,
- Some(tempIvyPath), isTest = true)
+ val jarPath = SparkSubmitUtils.resolveMavenCoordinates(
+ main.toString,
+ SparkSubmitUtils.buildIvySettings(None, Some(tempIvyPath)),
+ isTest = true)
assert(jarPath.indexOf("mylib") >= 0, "should find artifact")
assert(jarPath.indexOf(tempIvyPath) >= 0, "should be in new ivy path")
assert(jarPath.indexOf("mydep") >= 0, "should find dependency")
@@ -166,7 +179,10 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
test("dependency not found throws RuntimeException") {
intercept[RuntimeException] {
- SparkSubmitUtils.resolveMavenCoordinates("a:b:c", None, None, isTest = true)
+ SparkSubmitUtils.resolveMavenCoordinates(
+ "a:b:c",
+ SparkSubmitUtils.buildIvySettings(None, None),
+ isTest = true)
}
}
@@ -178,12 +194,17 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
components.map(comp => s"org.apache.spark:spark-${comp}2.10:1.2.0").mkString(",") +
",org.apache.spark:spark-core_fake:1.2.0"
- val path = SparkSubmitUtils.resolveMavenCoordinates(coordinates, None, None, isTest = true)
+ val path = SparkSubmitUtils.resolveMavenCoordinates(
+ coordinates,
+ SparkSubmitUtils.buildIvySettings(None, None),
+ isTest = true)
assert(path === "", "should return empty path")
val main = MavenCoordinate("org.apache.spark", "spark-streaming-kafka-assembly_2.10", "1.2.0")
IvyTestUtils.withRepository(main, None, None) { repo =>
- val files = SparkSubmitUtils.resolveMavenCoordinates(coordinates + "," + main.toString,
- Some(repo), None, isTest = true)
+ val files = SparkSubmitUtils.resolveMavenCoordinates(
+ coordinates + "," + main.toString,
+ SparkSubmitUtils.buildIvySettings(Some(repo), None),
+ isTest = true)
assert(files.indexOf(main.artifactId) >= 0, "Did not return artifact")
}
}
@@ -192,10 +213,49 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
val main = new MavenCoordinate("my.great.lib", "mylib", "0.1")
val dep = "my.great.dep:mydep:0.5"
IvyTestUtils.withRepository(main, Some(dep), None) { repo =>
- val files = SparkSubmitUtils.resolveMavenCoordinates(main.toString,
- Some(repo), None, Seq("my.great.dep:mydep"), isTest = true)
+ val files = SparkSubmitUtils.resolveMavenCoordinates(
+ main.toString,
+ SparkSubmitUtils.buildIvySettings(Some(repo), None),
+ Seq("my.great.dep:mydep"),
+ isTest = true)
assert(files.indexOf(main.artifactId) >= 0, "Did not return artifact")
assert(files.indexOf("my.great.dep") < 0, "Returned excluded artifact")
}
}
+
+ test("load ivy settings file") {
+ val main = new MavenCoordinate("my.great.lib", "mylib", "0.1")
+ val dep = "my.great.dep:mydep:0.5"
+ val dummyIvyLocal = new File(tempIvyPath, "local" + File.separator)
+ val settingsText =
+ s"""
+ |
spark.jars.ivy. The format for the coordinates should be
- groupId:artifactId:version.
+ classpaths. The coordinates should be groupId:artifactId:version. If spark.jars.ivySettings
+ is given artifacts will be resolved according to the configuration in the file, otherwise artifacts
+ will be searched for in the local maven repo, then maven central and finally any additional remote
+ repositories given by the command-line option --repositories. For more details, see
+ Advanced Dependency Management.
spark.jars.ivyspark.jars.packages.
+ Path to specify the Ivy user directory, used for the local Ivy cache and package files from
+ spark.jars.packages. This will override the Ivy property ivy.default.ivy.user.dir
+ which defaults to ~/.ivy2.
+ spark.jars.ivySettingsspark.jars.packages
+ instead of the built-in defaults, such as maven central. Additional repositories given by the command-line
+ option --repositories will also be included. Useful for allowing Spark to resolve artifacts from behind
+ a firewall e.g. via an in-house artifact server like Artifactory. Details on the settings file format can be
+ found at http://ant.apache.org/ivy/history/latest-milestone/settings.html
spark.task.reaper.* configurations for details on how to control the exact behavior
- of this monitoring. When set to false (the default), task killing will use an older code
+ of this monitoring. When set to false (the default), task killing will use an older code
path which lacks such monitoring.