Skip to content
17 changes: 11 additions & 6 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,6 @@ object SparkSubmit extends CommandLineUtils {
OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
sysProp = "spark.submit.deployMode"),
OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"),
OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars.ivy"),
OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT,
sysProp = "spark.driver.memory"),
OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
Expand Down Expand Up @@ -480,7 +479,12 @@ object SparkSubmit extends CommandLineUtils {
sysProp = "spark.driver.cores"),
OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER,
sysProp = "spark.driver.supervise"),
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy")
OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
sysProp = "spark.jars.ivy"),
OptionAssigner(args.repositories, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
sysProp = "spark.jars.repositories"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a new option?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it is used to store user's repos , then we can use it in download hive jars.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to document it in http://spark.apache.org/docs/latest/configuration.html, like what we did for spark.jars.ivy

OptionAssigner(args.sparkProperties.get("spark.jars.ivySettings").orNull,
ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.jars.ivySettings")
)

// In client mode, launch the application main class directly
Expand Down Expand Up @@ -1027,10 +1031,8 @@ private[spark] object SparkSubmitUtils {
val cr = new ChainResolver
cr.setName("user-list")

// add current default resolver, if any
Option(ivySettings.getDefaultResolver).foreach(cr.add)

// add additional repositories, last resolution in chain takes precedence
// before default resolvers, add additional repositories,
// last resolution in chain takes precedence
repositoryList.zipWithIndex.foreach { case (repo, i) =>
val brr: IBiblioResolver = new IBiblioResolver
brr.setM2compatible(true)
Expand All @@ -1043,6 +1045,9 @@ private[spark] object SparkSubmitUtils {
// scalastyle:on println
}

// add current default resolver, if any
Option(ivySettings.getDefaultResolver).foreach(cr.add)

ivySettings.addResolver(cr)
ivySettings.setDefaultResolver(cr.getName)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.deploy

import java.io.{File, OutputStream, PrintStream}
import java.io.{File, FileInputStream, OutputStream, PrintStream}
import java.nio.charset.StandardCharsets

import scala.collection.mutable.ArrayBuffer
Expand All @@ -30,6 +30,7 @@ import org.scalatest.BeforeAndAfterAll

import org.apache.spark.SparkFunSuite
import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils

class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
Expand Down Expand Up @@ -85,8 +86,8 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
val expected = repos.split(",").map(r => s"$r/")
resolver.getResolvers.toArray.zipWithIndex.foreach { case (resolver: AbstractResolver, i) =>
if (1 < i && i < 3) {
assert(resolver.getName === s"repo-$i")
assert(resolver.asInstanceOf[IBiblioResolver].getRoot === expected(i - 1))
assert(resolver.getName === s"repo-${i + 1}")
assert(resolver.asInstanceOf[IBiblioResolver].getRoot === expected(i))
}
}
}
Expand Down Expand Up @@ -258,4 +259,51 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
assert(jarPath.indexOf("mydep") >= 0, "should find dependency")
}
}

test("search for artifact taking order from user defined repositories to default repositories") {
val main = new MavenCoordinate("a", "b", "0.1")

def isSameFile(left: String, right: String): Boolean = {
val leftInput: FileInputStream = new FileInputStream(left)
val leftMd5 = org.apache.commons.codec.digest.DigestUtils.md5Hex(leftInput)

val rightInput: FileInputStream = new FileInputStream(left)
val rightMd5 = org.apache.commons.codec.digest.DigestUtils.md5Hex(rightInput)

leftMd5 == rightMd5
}

var userDefinedRepo = Utils.createTempDir("my_m2")
try {
IvyTestUtils.withRepository(main, None, Some(userDefinedRepo)) { repo =>
IvyTestUtils.withRepository(main, None, Some(SparkSubmitUtils.m2Path)) {
defaultRepo =>
val jarPath = SparkSubmitUtils.resolveMavenCoordinates(
main.toString,
SparkSubmitUtils.buildIvySettings(Option(repo), None),
isTest = false)
assert(isSameFile(Seq(userDefinedRepo, main.groupId, main.artifactId, main.version,
"b-0.1.jar").mkString(File.separatorChar.toString), jarPath))
assert(jarPath.indexOf("b") >= 0, "should find artifact")

}
}

IvyTestUtils.withRepository(main, None, Some(SparkSubmitUtils.m2Path)) { defaultRepo =>
IvyTestUtils.withRepository(main, None, Some(userDefinedRepo)) {
repo =>
val jarPath = SparkSubmitUtils.resolveMavenCoordinates(
main.toString,
SparkSubmitUtils.buildIvySettings(Option(repo), None),
isTest = false)
assert(isSameFile(Seq(SparkSubmitUtils.m2Path.getCanonicalPath, main.groupId,
main.artifactId, main.version, "b-0.1.jar").mkString(File.separatorChar.toString),
jarPath))
assert(jarPath.indexOf("b") >= 0, "should find artifact")
}
}
} finally {
Utils.deleteRecursively(userDefinedRepo)
}
}
}
15 changes: 12 additions & 3 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -438,9 +438,10 @@ Apart from these, the following properties are also available, and may be useful
Comma-separated list of Maven coordinates of jars to include on the driver and executor
classpaths. The coordinates should be groupId:artifactId:version. If <code>spark.jars.ivySettings</code>
is given artifacts will be resolved according to the configuration in the file, otherwise artifacts
will be searched for in the local maven repo, then maven central and finally any additional remote
repositories given by the command-line option <code>--repositories</code>. For more details, see
<a href="submitting-applications.html#advanced-dependency-management">Advanced Dependency Management</a>.
will be searched for in any additional remote repositories given by the command-line option
<code>--repositories</code>, then the local maven repo(${user.home}/.m2/repository), finally maven central.
For more details, see <a href="submitting-applications.html#advanced-dependency-management">
Advanced Dependency Management</a>.
</td>
</tr>
<tr>
Expand All @@ -451,6 +452,14 @@ Apart from these, the following properties are also available, and may be useful
provided in <code>spark.jars.packages</code> to avoid dependency conflicts.
</td>
</tr>
<tr>
<td><code>spark.jars.repositories</code></td>
<td></td>
<td>
Comma-separated list of additional remote repositories to search for the maven coordinates. It is also
given by the command-line option <code>--repositories</code>
</td>
</tr>
<tr>
<td><code>spark.jars.ivy</code></td>
<td></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,6 @@ private[spark] object HiveUtils extends Logging {
barrierPrefixes = hiveMetastoreBarrierPrefixes,
sharedPrefixes = hiveMetastoreSharedPrefixes)
} else if (hiveMetastoreJars == "maven") {
// TODO: Support for loading the jars from an already downloaded location.
logInfo(
s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using maven.")
IsolatedClientLoader.forVersion(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ private[hive] object IsolatedClientLoader extends Logging {
} else {
val (downloadedFiles, actualHadoopVersion) =
try {
(downloadVersion(resolvedVersion, hadoopVersion, ivyPath), hadoopVersion)
(downloadVersion(resolvedVersion, hadoopVersion, sparkConf, ivyPath), hadoopVersion)
} catch {
case e: RuntimeException if e.getMessage.contains("hadoop") =>
// If the error message contains hadoop, it is probably because the hadoop
Expand All @@ -70,7 +70,7 @@ private[hive] object IsolatedClientLoader extends Logging {
"It is recommended to set jars used by Hive metastore client through " +
"spark.sql.hive.metastore.jars in the production environment.")
sharesHadoopClasses = false
(downloadVersion(resolvedVersion, "2.6.5", ivyPath), "2.6.5")
(downloadVersion(resolvedVersion, "2.6.5", sparkConf, ivyPath), "2.6.5")
}
resolvedVersions.put((resolvedVersion, actualHadoopVersion), downloadedFiles)
resolvedVersions((resolvedVersion, actualHadoopVersion))
Expand Down Expand Up @@ -99,28 +99,37 @@ private[hive] object IsolatedClientLoader extends Logging {
private def downloadVersion(
version: HiveVersion,
hadoopVersion: String,
sparkConf: SparkConf,
ivyPath: Option[String]): Seq[URL] = {
val hiveArtifacts = version.extraDeps ++
Seq("hive-metastore", "hive-exec", "hive-common", "hive-serde")
.map(a => s"org.apache.hive:$a:${version.fullVersion}") ++
Seq("com.google.guava:guava:14.0.1",
s"org.apache.hadoop:hadoop-client:$hadoopVersion")

// if repositories contain a local repo, it will not download jars from remote repo
val repos: Option[String] = Option(sparkConf.get("spark.jars.repositories", ""))
.filterNot(_.isEmpty).map { repo =>
Seq(repo, "http://www.datanucleus.org/downloads/maven2").mkString(",")
}.orElse(Some("http://www.datanucleus.org/downloads/maven2"))

val ivyRepoPath = Option(sparkConf.get("spark.jars.ivy", "")).filterNot(_.isEmpty)
val ivySettings = Option(sparkConf.get("spark.jars.ivySettings", ""))
.filterNot(_.isEmpty).map { ivySettingsFile =>
SparkSubmitUtils.loadIvySettings(ivySettingsFile, repos, ivyRepoPath)
}.getOrElse {
SparkSubmitUtils.buildIvySettings(repos, ivyRepoPath)
}

val classpath = quietly {
SparkSubmitUtils.resolveMavenCoordinates(
hiveArtifacts.mkString(","),
SparkSubmitUtils.buildIvySettings(
Some("http://www.datanucleus.org/downloads/maven2"),
ivyPath),
ivySettings,
exclusions = version.exclusions)
}
val allFiles = classpath.split(",").map(new File(_)).toSet

// TODO: Remove copy logic.
val tempDir = Utils.createTempDir(namePrefix = s"hive-${version}")
allFiles.foreach(f => FileUtils.copyFileToDirectory(f, tempDir))
logInfo(s"Downloaded metastore jars to ${tempDir.getCanonicalPath}")
tempDir.listFiles().map(_.toURI.toURL)
logInfo(s"Downloaded metastore jars location: $classpath")
classpath.split(",").map(new File(_).toURI.toURL)
}

// A map from a given pair of HiveVersion and Hadoop version to jar files.
Expand Down