Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 103 additions & 46 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

package org.apache.spark.deploy

import java.io.{File, PrintStream}
import java.io.{File, IOException, PrintStream}
import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowableException}
import java.net.URL
import java.security.PrivilegedExceptionAction
import java.text.ParseException

import scala.annotation.tailrec
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
Expand Down Expand Up @@ -283,8 +284,17 @@ object SparkSubmit extends CommandLineUtils {
} else {
Nil
}

// Create the IvySettings, either load from file or build defaults
val ivySettings = args.sparkProperties.get("spark.jars.ivySettings").map { ivySettingsFile =>
SparkSubmitUtils.loadIvySettings(ivySettingsFile, Option(args.repositories),
Option(args.ivyRepoPath))
}.getOrElse {
SparkSubmitUtils.buildIvySettings(Option(args.repositories), Option(args.ivyRepoPath))
}

val resolvedMavenCoordinates = SparkSubmitUtils.resolveMavenCoordinates(args.packages,
Option(args.repositories), Option(args.ivyRepoPath), exclusions = exclusions)
ivySettings, exclusions = exclusions)
if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
if (args.isPython) {
Expand Down Expand Up @@ -860,30 +870,13 @@ private[spark] object SparkSubmitUtils {

/**
* Extracts maven coordinates from a comma-delimited string
* @param remoteRepos Comma-delimited string of remote repositories
* @param ivySettings The Ivy settings for this session
* @param defaultIvyUserDir The default user path for Ivy
* @return A ChainResolver used by Ivy to search for and resolve dependencies.
*/
def createRepoResolvers(remoteRepos: Option[String], ivySettings: IvySettings): ChainResolver = {
def createRepoResolvers(defaultIvyUserDir: File): ChainResolver = {
// We need a chain resolver if we want to check multiple repositories
val cr = new ChainResolver
cr.setName("list")

val repositoryList = remoteRepos.getOrElse("")
// add any other remote repositories other than maven central
if (repositoryList.trim.nonEmpty) {
repositoryList.split(",").zipWithIndex.foreach { case (repo, i) =>
val brr: IBiblioResolver = new IBiblioResolver
brr.setM2compatible(true)
brr.setUsepoms(true)
brr.setRoot(repo)
brr.setName(s"repo-${i + 1}")
cr.add(brr)
// scalastyle:off println
printStream.println(s"$repo added as a remote repository with the name: ${brr.getName}")
// scalastyle:on println
}
}
cr.setName("spark-list")

val localM2 = new IBiblioResolver
localM2.setM2compatible(true)
Expand All @@ -893,7 +886,7 @@ private[spark] object SparkSubmitUtils {
cr.add(localM2)

val localIvy = new FileSystemResolver
val localIvyRoot = new File(ivySettings.getDefaultIvyUserDir, "local")
val localIvyRoot = new File(defaultIvyUserDir, "local")
localIvy.setLocal(true)
localIvy.setRepository(new FileRepository(localIvyRoot))
val ivyPattern = Seq(localIvyRoot.getAbsolutePath, "[organisation]", "[module]", "[revision]",
Expand Down Expand Up @@ -974,23 +967,102 @@ private[spark] object SparkSubmitUtils {
}
}

/**
* Build Ivy Settings using options with default resolvers
* @param remoteRepos Comma-delimited string of remote repositories other than maven central
* @param ivyPath The path to the local ivy repository
* @return An IvySettings object
*/
def buildIvySettings(remoteRepos: Option[String], ivyPath: Option[String]): IvySettings = {
val ivySettings: IvySettings = new IvySettings
processIvyPathArg(ivySettings, ivyPath)

// create a pattern matcher
ivySettings.addMatcher(new GlobPatternMatcher)
// create the dependency resolvers
val repoResolver = createRepoResolvers(ivySettings.getDefaultIvyUserDir)
ivySettings.addResolver(repoResolver)
ivySettings.setDefaultResolver(repoResolver.getName)
processRemoteRepoArg(ivySettings, remoteRepos)
ivySettings
}

/**
* Load Ivy settings from a given filename, using supplied resolvers
* @param settingsFile Path to Ivy settings file
* @param remoteRepos Comma-delimited string of remote repositories other than maven central
* @param ivyPath The path to the local ivy repository
* @return An IvySettings object
*/
def loadIvySettings(
settingsFile: String,
remoteRepos: Option[String],
ivyPath: Option[String]): IvySettings = {
val file = new File(settingsFile)
require(file.exists(), s"Ivy settings file $file does not exist")
require(file.isFile(), s"Ivy settings file $file is not a normal file")
val ivySettings: IvySettings = new IvySettings
try {
ivySettings.load(file)
} catch {
case e @ (_: IOException | _: ParseException) =>
throw new SparkException(s"Failed when loading Ivy settings from $settingsFile", e)
}
processIvyPathArg(ivySettings, ivyPath)
processRemoteRepoArg(ivySettings, remoteRepos)
ivySettings
}

/* Set ivy settings for location of cache, if option is supplied */
private def processIvyPathArg(ivySettings: IvySettings, ivyPath: Option[String]): Unit = {
ivyPath.filterNot(_.trim.isEmpty).foreach { alternateIvyDir =>
ivySettings.setDefaultIvyUserDir(new File(alternateIvyDir))
ivySettings.setDefaultCache(new File(alternateIvyDir, "cache"))
}
}

/* Add any optional additional remote repositories */
private def processRemoteRepoArg(ivySettings: IvySettings, remoteRepos: Option[String]): Unit = {
remoteRepos.filterNot(_.trim.isEmpty).map(_.split(",")).foreach { repositoryList =>
val cr = new ChainResolver
cr.setName("user-list")

// add current default resolver, if any
Option(ivySettings.getDefaultResolver).foreach(cr.add)

// add additional repositories, last resolution in chain takes precedence
repositoryList.zipWithIndex.foreach { case (repo, i) =>
val brr: IBiblioResolver = new IBiblioResolver
brr.setM2compatible(true)
brr.setUsepoms(true)
brr.setRoot(repo)
brr.setName(s"repo-${i + 1}")
cr.add(brr)
// scalastyle:off println
printStream.println(s"$repo added as a remote repository with the name: ${brr.getName}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really needed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's used in testing IIRC and is helpful in debugging

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was in there before and I think it's nice to get confirmation that your additional was added with that alias, in case the artifact you need isn't resolved. It will print out all the info like this

bin/run-example --repositories www.somerepo.com --packages some:package:1.0 SparkPi
Listening for transport dt_socket at address: 5005
www.somerepo.com added as a remote repository with the name: repo-1
Ivy Default Cache set to: /home/bryan/.ivy2/cache
The jars for the packages stored in: /home/bryan/.ivy2/jars
:: loading settings :: url = jar:file:/home/bryan/git/spark/assembly/target/scala-2.11/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
some#package added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
	confs: [default]
:: resolution report :: resolve 1540ms :: artifacts dl 0ms
	:: modules in use:
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   0   |   0   |   0   ||   0   |   0   |
	---------------------------------------------------------------------

:: problems summary ::
:::: WARNINGS
		::::::::::::::::::::::::::::::::::::::::::::::

		::          UNRESOLVED DEPENDENCIES         ::

		::::::::::::::::::::::::::::::::::::::::::::::

		:: some#package;1.0: repo-1: unable to get resource for some#package;1.0: res=www.somerepo.com/some/package/1.0/package-1.0.pom: java.net.MalformedURLException: no protocol: www.somerepo.com/some/package/1.0/package-1.0.pom

		::::::::::::::::::::::::::::::::::::::::::::::

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I saw later it was already there. I just generally dislike non-error log messages that cannot be turned off.

// scalastyle:on println
}

ivySettings.addResolver(cr)
ivySettings.setDefaultResolver(cr.getName)
}
}

/** A nice function to use in tests as well. Values are dummy strings. */
def getModuleDescriptor: DefaultModuleDescriptor = DefaultModuleDescriptor.newDefaultInstance(
ModuleRevisionId.newInstance("org.apache.spark", "spark-submit-parent", "1.0"))

/**
* Resolves any dependencies that were supplied through maven coordinates
* @param coordinates Comma-delimited string of maven coordinates
* @param remoteRepos Comma-delimited string of remote repositories other than maven central
* @param ivyPath The path to the local ivy repository
* @param ivySettings An IvySettings containing resolvers to use
* @param exclusions Exclusions to apply when resolving transitive dependencies
* @return The comma-delimited path to the jars of the given maven artifacts including their
* transitive dependencies
*/
def resolveMavenCoordinates(
coordinates: String,
remoteRepos: Option[String],
ivyPath: Option[String],
ivySettings: IvySettings,
exclusions: Seq[String] = Nil,
isTest: Boolean = false): String = {
if (coordinates == null || coordinates.trim.isEmpty) {
Expand All @@ -1001,32 +1073,14 @@ private[spark] object SparkSubmitUtils {
// To prevent ivy from logging to system out
System.setOut(printStream)
val artifacts = extractMavenCoordinates(coordinates)
// Default configuration name for ivy
val ivyConfName = "default"
// set ivy settings for location of cache
val ivySettings: IvySettings = new IvySettings
// Directories for caching downloads through ivy and storing the jars when maven coordinates
// are supplied to spark-submit
val alternateIvyCache = ivyPath.getOrElse("")
val packagesDirectory: File =
if (alternateIvyCache == null || alternateIvyCache.trim.isEmpty) {
new File(ivySettings.getDefaultIvyUserDir, "jars")
} else {
ivySettings.setDefaultIvyUserDir(new File(alternateIvyCache))
ivySettings.setDefaultCache(new File(alternateIvyCache, "cache"))
new File(alternateIvyCache, "jars")
}
val packagesDirectory: File = new File(ivySettings.getDefaultIvyUserDir, "jars")
// scalastyle:off println
printStream.println(
s"Ivy Default Cache set to: ${ivySettings.getDefaultCache.getAbsolutePath}")
printStream.println(s"The jars for the packages stored in: $packagesDirectory")
// scalastyle:on println
// create a pattern matcher
ivySettings.addMatcher(new GlobPatternMatcher)
// create the dependency resolvers
val repoResolver = createRepoResolvers(remoteRepos, ivySettings)
ivySettings.addResolver(repoResolver)
ivySettings.setDefaultResolver(repoResolver.getName)

val ivy = Ivy.newInstance(ivySettings)
// Set resolve options to download transitive dependencies as well
Expand All @@ -1042,6 +1096,9 @@ private[spark] object SparkSubmitUtils {
resolveOptions.setDownload(true)
}

// Default configuration name for ivy
val ivyConfName = "default"

// A Module descriptor must be specified. Entries are dummy strings
val md = getModuleDescriptor
// clear ivy resolution from previous launches. The resolution file is usually at
Expand Down
Loading