From 5c8727dfdd8de8b17cdeab4aba8b316f8063863e Mon Sep 17 00:00:00 2001 From: windpiger Date: Sat, 4 Feb 2017 17:23:14 +0800 Subject: [PATCH 01/13] [SPARK-19458][SQL]load hive jars from local repo which has downloaded --- .../org/apache/spark/deploy/SparkSubmit.scala | 8 +++-- .../org/apache/spark/sql/hive/HiveUtils.scala | 1 - .../hive/client/IsolatedClientLoader.scala | 30 ++++++++++++------- 3 files changed, 25 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 5ffdedd1658a..2c852d9e79c5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -443,7 +443,6 @@ object SparkSubmit extends CommandLineUtils { OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.submit.deployMode"), OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"), - OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars.ivy"), OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.driver.memory"), OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, @@ -480,7 +479,12 @@ object SparkSubmit extends CommandLineUtils { sysProp = "spark.driver.cores"), OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER, sysProp = "spark.driver.supervise"), - OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy") + OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, + sysProp = "spark.jars.ivy"), + OptionAssigner(args.repositories, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, + sysProp = "spark.jars.repositories"), + OptionAssigner(args.sparkProperties.get("spark.jars.ivySettings").orNull, + ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.jars.ivySettings") ) // In client mode, launch the application main class directly diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 26b1994308f5..62b6662d96cd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -320,7 +320,6 @@ private[spark] object HiveUtils extends Logging { barrierPrefixes = hiveMetastoreBarrierPrefixes, sharedPrefixes = hiveMetastoreSharedPrefixes) } else if (hiveMetastoreJars == "maven") { - // TODO: Support for loading the jars from an already downloaded location. logInfo( s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using maven.") IsolatedClientLoader.forVersion( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index 63fdd6b090e6..1acf894154bb 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -59,7 +59,7 @@ private[hive] object IsolatedClientLoader extends Logging { } else { val (downloadedFiles, actualHadoopVersion) = try { - (downloadVersion(resolvedVersion, hadoopVersion, ivyPath), hadoopVersion) + (downloadVersion(resolvedVersion, hadoopVersion, sparkConf, ivyPath), hadoopVersion) } catch { case e: RuntimeException if e.getMessage.contains("hadoop") => // If the error message contains hadoop, it is probably because the hadoop @@ -73,7 +73,7 @@ private[hive] object IsolatedClientLoader extends Logging { "It is recommended to set jars used by Hive metastore client through " + "spark.sql.hive.metastore.jars in the production environment.") sharesHadoopClasses = false - (downloadVersion(resolvedVersion, "2.4.0", ivyPath), "2.4.0") + (downloadVersion(resolvedVersion, "2.4.0", sparkConf, ivyPath), "2.4.0") } resolvedVersions.put((resolvedVersion, actualHadoopVersion), downloadedFiles) resolvedVersions((resolvedVersion, actualHadoopVersion)) @@ -102,6 +102,7 @@ private[hive] object IsolatedClientLoader extends Logging { private def downloadVersion( version: HiveVersion, hadoopVersion: String, + sparkConf: SparkConf, ivyPath: Option[String]): Seq[URL] = { val hiveArtifacts = version.extraDeps ++ Seq("hive-metastore", "hive-exec", "hive-common", "hive-serde") @@ -109,21 +110,28 @@ private[hive] object IsolatedClientLoader extends Logging { Seq("com.google.guava:guava:14.0.1", s"org.apache.hadoop:hadoop-client:$hadoopVersion") + // if repositories contain a local repo, it will not download jars from remote repo + val repos: Option[String] = Option(sparkConf.get("spark.jars.repositories")).map { + repo => + Seq(repo, "http://www.datanucleus.org/downloads/maven2").mkString(",") + }.orElse(Some("http://www.datanucleus.org/downloads/maven2")) + + val ivyRepoPath = Option(sparkConf.get("spark.jars.ivy")) + val ivySettings = Option(sparkConf.get("spark.jars.ivySettings")).map { ivySettingsFile => + SparkSubmitUtils.loadIvySettings(ivySettingsFile, repos, ivyRepoPath) + }.getOrElse { + SparkSubmitUtils.buildIvySettings(repos, ivyRepoPath) + } + val classpath = quietly { SparkSubmitUtils.resolveMavenCoordinates( hiveArtifacts.mkString(","), - SparkSubmitUtils.buildIvySettings( - Some("http://www.datanucleus.org/downloads/maven2"), - ivyPath), + ivySettings, exclusions = version.exclusions) } - val allFiles = classpath.split(",").map(new File(_)).toSet - // TODO: Remove copy logic. - val tempDir = Utils.createTempDir(namePrefix = s"hive-${version}") - allFiles.foreach(f => FileUtils.copyFileToDirectory(f, tempDir)) - logInfo(s"Downloaded metastore jars to ${tempDir.getCanonicalPath}") - tempDir.listFiles().map(_.toURI.toURL) + logInfo(s"Downloaded metastore jars location: $classpath") + classpath.split(",").map(new File(_).toURI.toURL) } // A map from a given pair of HiveVersion and Hadoop version to jar files. From 0c76584ce3a434265c9f06d6ce90518c7c205398 Mon Sep 17 00:00:00 2001 From: windpiger Date: Sun, 5 Feb 2017 00:10:36 +0800 Subject: [PATCH 02/13] put user defined repo before default repo --- .../org/apache/spark/deploy/SparkSubmit.scala | 9 +-- .../apache/spark/deploy/IvyTestUtils.scala | 4 +- .../spark/deploy/SparkSubmitUtilsSuite.scala | 57 ++++++++++++++++++- .../org/apache/spark/sql/hive/HiveUtils.scala | 1 + .../hive/client/IsolatedClientLoader.scala | 2 +- .../spark/sql/hive/HiveUtilsSuite.scala | 6 +- .../sql/hive/client/HiveClientSuite.scala | 3 +- 7 files changed, 70 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 2c852d9e79c5..96cf8d19353f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -1031,10 +1031,8 @@ private[spark] object SparkSubmitUtils { val cr = new ChainResolver cr.setName("user-list") - // add current default resolver, if any - Option(ivySettings.getDefaultResolver).foreach(cr.add) - - // add additional repositories, last resolution in chain takes precedence + // before default resolvers, add additional repositories, + // last resolution in chain takes precedence repositoryList.zipWithIndex.foreach { case (repo, i) => val brr: IBiblioResolver = new IBiblioResolver brr.setM2compatible(true) @@ -1047,6 +1045,9 @@ private[spark] object SparkSubmitUtils { // scalastyle:on println } + // add current default resolver, if any + Option(ivySettings.getDefaultResolver).foreach(cr.add) + ivySettings.addResolver(cr) ivySettings.setDefaultResolver(cr.getName) } diff --git a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala b/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala index f50cb38311db..600665591e8d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala +++ b/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala @@ -31,7 +31,7 @@ import org.apache.ivy.core.settings.IvySettings import org.apache.spark.TestUtils.{createCompiledClass, JavaSourceFromString} import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate -private[deploy] object IvyTestUtils { +object IvyTestUtils { /** * Create the path for the jar and pom from the maven coordinate. Extension should be `jar` @@ -355,7 +355,7 @@ private[deploy] object IvyTestUtils { * @param withPython Whether to pack python files inside the jar for extensive testing. * @return Root path of the repository. Will be `rootDir` if supplied. */ - private[deploy] def withRepository( + def withRepository( artifact: MavenCoordinate, dependencies: Option[String], rootDir: Option[File], diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala index 266c9d33b5a9..e9367eaedf1d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy -import java.io.{File, OutputStream, PrintStream} +import java.io.{File, FileInputStream, OutputStream, PrintStream} import java.nio.charset.StandardCharsets import scala.collection.mutable.ArrayBuffer @@ -30,6 +30,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.SparkFunSuite import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate +import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { @@ -142,12 +143,13 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { test("search for artifact at local repositories") { val main = new MavenCoordinate("my.great.lib", "mylib", "0.1") val dep = "my.great.dep:mydep:0.5" - // Local M2 repository + + // Local M2 repositorya IvyTestUtils.withRepository(main, Some(dep), Some(SparkSubmitUtils.m2Path)) { repo => val jarPath = SparkSubmitUtils.resolveMavenCoordinates( main.toString, SparkSubmitUtils.buildIvySettings(None, None), - isTest = true) + isTest = false) assert(jarPath.indexOf("mylib") >= 0, "should find artifact") assert(jarPath.indexOf("mydep") >= 0, "should find dependency") } @@ -258,4 +260,53 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { assert(jarPath.indexOf("mydep") >= 0, "should find dependency") } } + + test("search for artifact taking order from user defined repositories to default repositories") { + val main = new MavenCoordinate("a", "b", "0.1") + + def isSameFile(left: String, right: String): Boolean = { + val leftInput: FileInputStream = new FileInputStream(left) + val leftMd5 = UTF8String.fromString(org.apache.commons.codec + .digest.DigestUtils.md5Hex(leftInput)) + + val rightInput: FileInputStream = new FileInputStream(left) + val rightMd5 = UTF8String.fromString(org.apache.commons.codec + .digest.DigestUtils.md5Hex(rightInput)) + + leftMd5 == rightMd5 + } + + var userDefinedRepo = Utils.createTempDir("my_m2") + try { + IvyTestUtils.withRepository(main, None, Some(userDefinedRepo)) { repo => + IvyTestUtils.withRepository(main, None, Some(SparkSubmitUtils.m2Path)) { + defaultRepo => + val jarPath = SparkSubmitUtils.resolveMavenCoordinates( + main.toString, + SparkSubmitUtils.buildIvySettings(Option(repo), None), + isTest = false) + assert(isSameFile(Seq(userDefinedRepo, main.groupId, main.artifactId, main.version, + "b-0.1.jar").mkString(File.separatorChar.toString), jarPath)) + assert(jarPath.indexOf("b") >= 0, "should find artifact") + + } + } + + IvyTestUtils.withRepository(main, None, Some(SparkSubmitUtils.m2Path)) { defaultRepo => + IvyTestUtils.withRepository(main, None, Some(userDefinedRepo)) { + repo => + val jarPath = SparkSubmitUtils.resolveMavenCoordinates( + main.toString, + SparkSubmitUtils.buildIvySettings(Option(repo), None), + isTest = false) + assert(isSameFile(Seq(SparkSubmitUtils.m2Path.getCanonicalPath, main.groupId, + main.artifactId, main.version, "b-0.1.jar").mkString(File.separatorChar.toString), + jarPath)) + assert(jarPath.indexOf("b") >= 0, "should find artifact") + } + } + } finally { + Utils.deleteRecursively(userDefinedRepo) + } + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 62b6662d96cd..c1cbd27bc343 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -328,6 +328,7 @@ private[spark] object HiveUtils extends Logging { sparkConf = conf, hadoopConf = hadoopConf, config = configurations, + ivyPath = conf.getOption("spark.jars.ivy"), barrierPrefixes = hiveMetastoreBarrierPrefixes, sharedPrefixes = hiveMetastoreSharedPrefixes) } else { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index 1acf894154bb..91be4814b2f8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -136,7 +136,7 @@ private[hive] object IsolatedClientLoader extends Logging { // A map from a given pair of HiveVersion and Hadoop version to jar files. // It is only used by forVersion. - private val resolvedVersions = + private[hive] val resolvedVersions = new scala.collection.mutable.HashMap[(HiveVersion, String), Seq[URL]] } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala index 667a7ddd8bb6..6872aecb8c63 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala @@ -17,11 +17,15 @@ package org.apache.spark.sql.hive +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.conf.HiveConf.ConfVars - +import org.apache.spark.SparkConf +import org.apache.spark.deploy.IvyTestUtils +import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.hive.client.IsolatedClientLoader class HiveUtilsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala index 4790331168bd..5e56dc23a462 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala @@ -19,8 +19,9 @@ package org.apache.spark.sql.hive.client import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.conf.HiveConf - import org.apache.spark.SparkFunSuite +import org.apache.spark.deploy.IvyTestUtils +import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal} import org.apache.spark.sql.hive.HiveUtils From 338aed502575dd744b691b2e901a2af97d48037a Mon Sep 17 00:00:00 2001 From: windpiger Date: Sun, 5 Feb 2017 00:12:54 +0800 Subject: [PATCH 03/13] fix a code style --- .../org/apache/spark/deploy/SparkSubmitUtilsSuite.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala index e9367eaedf1d..b388bcb916a0 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala @@ -143,13 +143,12 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { test("search for artifact at local repositories") { val main = new MavenCoordinate("my.great.lib", "mylib", "0.1") val dep = "my.great.dep:mydep:0.5" - // Local M2 repositorya IvyTestUtils.withRepository(main, Some(dep), Some(SparkSubmitUtils.m2Path)) { repo => val jarPath = SparkSubmitUtils.resolveMavenCoordinates( main.toString, SparkSubmitUtils.buildIvySettings(None, None), - isTest = false) + isTest = true) assert(jarPath.indexOf("mylib") >= 0, "should find artifact") assert(jarPath.indexOf("mydep") >= 0, "should find dependency") } @@ -260,7 +259,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { assert(jarPath.indexOf("mydep") >= 0, "should find dependency") } } - + test("search for artifact taking order from user defined repositories to default repositories") { val main = new MavenCoordinate("a", "b", "0.1") From 289189147cea836cf0130c6f820aacd007e311a0 Mon Sep 17 00:00:00 2001 From: windpiger Date: Sun, 5 Feb 2017 00:14:49 +0800 Subject: [PATCH 04/13] revert IvyTestUtils --- .../src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala b/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala index 600665591e8d..f50cb38311db 100644 --- a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala +++ b/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala @@ -31,7 +31,7 @@ import org.apache.ivy.core.settings.IvySettings import org.apache.spark.TestUtils.{createCompiledClass, JavaSourceFromString} import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate -object IvyTestUtils { +private[deploy] object IvyTestUtils { /** * Create the path for the jar and pom from the maven coordinate. Extension should be `jar` @@ -355,7 +355,7 @@ object IvyTestUtils { * @param withPython Whether to pack python files inside the jar for extensive testing. * @return Root path of the repository. Will be `rootDir` if supplied. */ - def withRepository( + private[deploy] def withRepository( artifact: MavenCoordinate, dependencies: Option[String], rootDir: Option[File], From 6e667936348698db3ece47f8159194ca91c7064b Mon Sep 17 00:00:00 2001 From: windpiger Date: Sun, 5 Feb 2017 00:16:20 +0800 Subject: [PATCH 05/13] fix a comment --- .../scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala index b388bcb916a0..2ce5e3926929 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala @@ -143,7 +143,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { test("search for artifact at local repositories") { val main = new MavenCoordinate("my.great.lib", "mylib", "0.1") val dep = "my.great.dep:mydep:0.5" - // Local M2 repositorya + // Local M2 repository IvyTestUtils.withRepository(main, Some(dep), Some(SparkSubmitUtils.m2Path)) { repo => val jarPath = SparkSubmitUtils.resolveMavenCoordinates( main.toString, From c790a4b2f376645b0ee87e34b7840a0f4f1d3c36 Mon Sep 17 00:00:00 2001 From: windpiger Date: Sun, 5 Feb 2017 00:18:53 +0800 Subject: [PATCH 06/13] revert HiveUtilsSuite --- .../scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala index 6872aecb8c63..667a7ddd8bb6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala @@ -17,15 +17,11 @@ package org.apache.spark.sql.hive -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.conf.HiveConf.ConfVars -import org.apache.spark.SparkConf -import org.apache.spark.deploy.IvyTestUtils -import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate + import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.QueryTest -import org.apache.spark.sql.hive.client.IsolatedClientLoader class HiveUtilsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { From 5ab3cdc171709ee0b4a7fccc50701a42c91a0be9 Mon Sep 17 00:00:00 2001 From: windpiger Date: Sun, 5 Feb 2017 00:22:55 +0800 Subject: [PATCH 07/13] remove ivyPath from HiveUtils --- .../src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index c1cbd27bc343..62b6662d96cd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -328,7 +328,6 @@ private[spark] object HiveUtils extends Logging { sparkConf = conf, hadoopConf = hadoopConf, config = configurations, - ivyPath = conf.getOption("spark.jars.ivy"), barrierPrefixes = hiveMetastoreBarrierPrefixes, sharedPrefixes = hiveMetastoreSharedPrefixes) } else { From 41466ffce218e7644ad7943573bd133c514460dd Mon Sep 17 00:00:00 2001 From: windpiger Date: Sun, 5 Feb 2017 00:29:16 +0800 Subject: [PATCH 08/13] fix some code style --- .../apache/spark/sql/hive/client/IsolatedClientLoader.scala | 2 +- .../org/apache/spark/sql/hive/client/HiveClientSuite.scala | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index 91be4814b2f8..9997ecb4e1ce 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -293,5 +293,5 @@ private[hive] class IsolatedClientLoader( * The place holder for shared Hive client for all the HiveContext sessions (they share an * IsolatedClientLoader). */ - private[hive] var cachedHive: Any = null + private var cachedHive: Any = null } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala index 5e56dc23a462..4790331168bd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala @@ -19,9 +19,8 @@ package org.apache.spark.sql.hive.client import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.conf.HiveConf + import org.apache.spark.SparkFunSuite -import org.apache.spark.deploy.IvyTestUtils -import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal} import org.apache.spark.sql.hive.HiveUtils From 5570e74207d6ffef70e937862142b72082680ca8 Mon Sep 17 00:00:00 2001 From: windpiger Date: Sun, 5 Feb 2017 00:30:44 +0800 Subject: [PATCH 09/13] fix some code style --- .../apache/spark/sql/hive/client/IsolatedClientLoader.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index 9997ecb4e1ce..1acf894154bb 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -136,7 +136,7 @@ private[hive] object IsolatedClientLoader extends Logging { // A map from a given pair of HiveVersion and Hadoop version to jar files. // It is only used by forVersion. - private[hive] val resolvedVersions = + private val resolvedVersions = new scala.collection.mutable.HashMap[(HiveVersion, String), Seq[URL]] } @@ -293,5 +293,5 @@ private[hive] class IsolatedClientLoader( * The place holder for shared Hive client for all the HiveContext sessions (they share an * IsolatedClientLoader). */ - private var cachedHive: Any = null + private[hive] var cachedHive: Any = null } From 630b019cdc06ff4ec89ddf3d77c925b04510592c Mon Sep 17 00:00:00 2001 From: windpiger Date: Sun, 5 Feb 2017 00:37:22 +0800 Subject: [PATCH 10/13] fix some test failed --- .../spark/sql/hive/client/IsolatedClientLoader.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index 1acf894154bb..c63d1660ba1c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -111,13 +111,14 @@ private[hive] object IsolatedClientLoader extends Logging { s"org.apache.hadoop:hadoop-client:$hadoopVersion") // if repositories contain a local repo, it will not download jars from remote repo - val repos: Option[String] = Option(sparkConf.get("spark.jars.repositories")).map { - repo => + val repos: Option[String] = Option(sparkConf.get("spark.jars.repositories", "")) + .filterNot(_.isEmpty).map { repo => Seq(repo, "http://www.datanucleus.org/downloads/maven2").mkString(",") }.orElse(Some("http://www.datanucleus.org/downloads/maven2")) - val ivyRepoPath = Option(sparkConf.get("spark.jars.ivy")) - val ivySettings = Option(sparkConf.get("spark.jars.ivySettings")).map { ivySettingsFile => + val ivyRepoPath = Option(sparkConf.get("spark.jars.ivy", "")).filterNot(_.isEmpty) + val ivySettings = Option(sparkConf.get("spark.jars.ivySettings", "")) + .filterNot(_.isEmpty).map { ivySettingsFile => SparkSubmitUtils.loadIvySettings(ivySettingsFile, repos, ivyRepoPath) }.getOrElse { SparkSubmitUtils.buildIvySettings(repos, ivyRepoPath) From d61c077e638a63e432324a56c02c6b613dc4b28c Mon Sep 17 00:00:00 2001 From: windpiger Date: Sun, 5 Feb 2017 12:10:24 +0800 Subject: [PATCH 11/13] user defined repo first --- .../scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala index 2ce5e3926929..8712a10b478a 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala @@ -86,8 +86,8 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { val expected = repos.split(",").map(r => s"$r/") resolver.getResolvers.toArray.zipWithIndex.foreach { case (resolver: AbstractResolver, i) => if (1 < i && i < 3) { - assert(resolver.getName === s"repo-$i") - assert(resolver.asInstanceOf[IBiblioResolver].getRoot === expected(i - 1)) + assert(resolver.getName === s"repo-${i + 1}") + assert(resolver.asInstanceOf[IBiblioResolver].getRoot === expected(i)) } } } From 1bb31e51a73565a07dc703edf51578762a47f5b2 Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 6 Feb 2017 15:34:10 +0800 Subject: [PATCH 12/13] remove utf-8 string --- .../org/apache/spark/deploy/SparkSubmitUtilsSuite.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala index 8712a10b478a..493c9ca610b9 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala @@ -265,12 +265,10 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { def isSameFile(left: String, right: String): Boolean = { val leftInput: FileInputStream = new FileInputStream(left) - val leftMd5 = UTF8String.fromString(org.apache.commons.codec - .digest.DigestUtils.md5Hex(leftInput)) + val leftMd5 = org.apache.commons.codec.digest.DigestUtils.md5Hex(leftInput) val rightInput: FileInputStream = new FileInputStream(left) - val rightMd5 = UTF8String.fromString(org.apache.commons.codec - .digest.DigestUtils.md5Hex(rightInput)) + val rightMd5 = org.apache.commons.codec.digest.DigestUtils.md5Hex(rightInput) leftMd5 == rightMd5 } From 9330e3542e7bcf3e4554c3ec26f1b6387f96f810 Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 6 Feb 2017 23:36:36 +0800 Subject: [PATCH 13/13] add doc --- docs/configuration.md | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 7c040330db63..fc1eb4168e5d 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -438,9 +438,10 @@ Apart from these, the following properties are also available, and may be useful Comma-separated list of Maven coordinates of jars to include on the driver and executor classpaths. The coordinates should be groupId:artifactId:version. If spark.jars.ivySettings is given artifacts will be resolved according to the configuration in the file, otherwise artifacts - will be searched for in the local maven repo, then maven central and finally any additional remote - repositories given by the command-line option --repositories. For more details, see - Advanced Dependency Management. + will be searched for in any additional remote repositories given by the command-line option + --repositories, then the local maven repo(${user.home}/.m2/repository), finally maven central. + For more details, see + Advanced Dependency Management. @@ -451,6 +452,14 @@ Apart from these, the following properties are also available, and may be useful provided in spark.jars.packages to avoid dependency conflicts. + + spark.jars.repositories + + + Comma-separated list of additional remote repositories to search for the maven coordinates. It is also + given by the command-line option --repositories + + spark.jars.ivy