From e5d189c64f69718ba3abde85c79311492eddb198 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Thu, 19 Sep 2019 16:22:30 +0800 Subject: [PATCH 1/7] Add config DEFAULT_CENTRAL_REPOSITORY --- .../org/apache/spark/internal/config/package.scala | 7 +++++++ .../sql/hive/client/IsolatedClientLoader.scala | 13 ++++++++----- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index d142d22929728..951103a947afd 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1421,4 +1421,11 @@ package object config { .bytesConf(ByteUnit.BYTE) .createOptional + private[spark] val DEFAULT_CENTRAL_REPOSITORY = + ConfigBuilder("spark.central.repository") + .doc("The default central repository used for downloading Hive jars " + + "in IsolatedClientLoader.") + .stringConf + .createWithDefaultString("https://repo1.maven.org/maven2") + } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index 1720af3b2b367..c24b53dd37e4f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.shims.ShimLoader import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkSubmitUtils import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.DEFAULT_CENTRAL_REPOSITORY import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.internal.NonClosableMutableURLClassLoader @@ -54,6 +55,7 @@ private[hive] object IsolatedClientLoader extends Logging { barrierPrefixes: Seq[String] = Seq.empty, sharesHadoopClasses: Boolean = true): IsolatedClientLoader = synchronized { val resolvedVersion = hiveVersion(hiveMetastoreVersion) + val centralRepo = sparkConf.get(DEFAULT_CENTRAL_REPOSITORY) // We will first try to share Hadoop classes. If we cannot resolve the Hadoop artifact // with the given version, we will use Hadoop 2.7 and then will not share Hadoop classes. var _sharesHadoopClasses = sharesHadoopClasses @@ -62,7 +64,7 @@ private[hive] object IsolatedClientLoader extends Logging { } else { val (downloadedFiles, actualHadoopVersion) = try { - (downloadVersion(resolvedVersion, hadoopVersion, ivyPath), hadoopVersion) + (downloadVersion(resolvedVersion, hadoopVersion, ivyPath, centralRepo), hadoopVersion) } catch { case e: RuntimeException if e.getMessage.contains("hadoop") => // If the error message contains hadoop, it is probably because the hadoop @@ -74,7 +76,8 @@ private[hive] object IsolatedClientLoader extends Logging { "It is recommended to set jars used by Hive metastore client through " + "spark.sql.hive.metastore.jars in the production environment.") _sharesHadoopClasses = false - (downloadVersion(resolvedVersion, fallbackVersion, ivyPath), fallbackVersion) + (downloadVersion( + resolvedVersion, fallbackVersion, ivyPath, centralRepo), fallbackVersion) } resolvedVersions.put((resolvedVersion, actualHadoopVersion), downloadedFiles) resolvedVersions((resolvedVersion, actualHadoopVersion)) @@ -112,18 +115,18 @@ private[hive] object IsolatedClientLoader extends Logging { private def downloadVersion( version: HiveVersion, hadoopVersion: String, - ivyPath: Option[String]): Seq[URL] = { + ivyPath: Option[String], + centralRepo: String): Seq[URL] = { val hiveArtifacts = version.extraDeps ++ Seq("hive-metastore", "hive-exec", "hive-common", "hive-serde") .map(a => s"org.apache.hive:$a:${version.fullVersion}") ++ Seq("com.google.guava:guava:14.0.1", s"org.apache.hadoop:hadoop-client:$hadoopVersion") - val classpath = quietly { SparkSubmitUtils.resolveMavenCoordinates( hiveArtifacts.mkString(","), SparkSubmitUtils.buildIvySettings( - Some("https://maven-central.storage-download.googleapis.com/repos/central/data/"), + Some(centralRepo), ivyPath), exclusions = version.exclusions) } From 47f8402f121b564dda08091d65363fd13cfdc9c5 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Thu, 19 Sep 2019 20:01:30 +0800 Subject: [PATCH 2/7] move config to SQLConf --- .../scala/org/apache/spark/internal/config/package.scala | 7 ------- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 7 +++++++ .../spark/sql/hive/client/IsolatedClientLoader.scala | 4 ++-- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 951103a947afd..d142d22929728 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1421,11 +1421,4 @@ package object config { .bytesConf(ByteUnit.BYTE) .createOptional - private[spark] val DEFAULT_CENTRAL_REPOSITORY = - ConfigBuilder("spark.central.repository") - .doc("The default central repository used for downloading Hive jars " + - "in IsolatedClientLoader.") - .stringConf - .createWithDefaultString("https://repo1.maven.org/maven2") - } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 0ec661fc16c88..36c370e399764 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1979,6 +1979,13 @@ object SQLConf { .doc("When true, the ArrayExists will follow the three-valued boolean logic.") .booleanConf .createWithDefault(true) + + private[spark] val CENTRAL_REPOSITORY = + ConfigBuilder("spark.sql.centralRepository") + .doc("The default central repository used for downloading Hive jars " + + "in IsolatedClientLoader.") + .stringConf + .createWithDefault("https://repo1.maven.org/maven2") } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index c24b53dd37e4f..8fb9f85d9bdb6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -33,10 +33,10 @@ import org.apache.hadoop.hive.shims.ShimLoader import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkSubmitUtils import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.DEFAULT_CENTRAL_REPOSITORY import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.internal.NonClosableMutableURLClassLoader +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.{MutableURLClassLoader, Utils} /** Factory for `IsolatedClientLoader` with specific versions of hive. */ @@ -55,7 +55,7 @@ private[hive] object IsolatedClientLoader extends Logging { barrierPrefixes: Seq[String] = Seq.empty, sharesHadoopClasses: Boolean = true): IsolatedClientLoader = synchronized { val resolvedVersion = hiveVersion(hiveMetastoreVersion) - val centralRepo = sparkConf.get(DEFAULT_CENTRAL_REPOSITORY) + val centralRepo = sparkConf.get(SQLConf.CENTRAL_REPOSITORY) // We will first try to share Hadoop classes. If we cannot resolve the Hadoop artifact // with the given version, we will use Hadoop 2.7 and then will not share Hadoop classes. var _sharesHadoopClasses = sharesHadoopClasses From a1f99cc852af9e6e685677d67ed2e4f17fdcb1af Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Fri, 20 Sep 2019 13:14:12 +0800 Subject: [PATCH 3/7] rename and set the config in hive tests --- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 10 +++++----- .../spark/sql/hive/client/IsolatedClientLoader.scala | 10 +++++----- .../org/apache/spark/sql/hive/test/TestHive.scala | 6 +++++- 3 files changed, 15 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 36c370e399764..88f0dc4a88076 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1980,12 +1980,12 @@ object SQLConf { .booleanConf .createWithDefault(true) - private[spark] val CENTRAL_REPOSITORY = - ConfigBuilder("spark.sql.centralRepository") - .doc("The default central repository used for downloading Hive jars " + - "in IsolatedClientLoader.") + private[spark] val ADDITIONAL_REMOTE_REPOSITORIES = + ConfigBuilder("spark.sql.additionalRemoteRepositories") + .doc("A comma-delimited string config of the optional additional remote maven mirror " + + "repositories, this can be used for downloading Hive jars in IsolatedClientLoader.") .stringConf - .createWithDefault("https://repo1.maven.org/maven2") + .createWithDefault("") } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index 8fb9f85d9bdb6..8d92c30246feb 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -55,7 +55,7 @@ private[hive] object IsolatedClientLoader extends Logging { barrierPrefixes: Seq[String] = Seq.empty, sharesHadoopClasses: Boolean = true): IsolatedClientLoader = synchronized { val resolvedVersion = hiveVersion(hiveMetastoreVersion) - val centralRepo = sparkConf.get(SQLConf.CENTRAL_REPOSITORY) + val remoteRepos = sparkConf.get(SQLConf.ADDITIONAL_REMOTE_REPOSITORIES) // We will first try to share Hadoop classes. If we cannot resolve the Hadoop artifact // with the given version, we will use Hadoop 2.7 and then will not share Hadoop classes. var _sharesHadoopClasses = sharesHadoopClasses @@ -64,7 +64,7 @@ private[hive] object IsolatedClientLoader extends Logging { } else { val (downloadedFiles, actualHadoopVersion) = try { - (downloadVersion(resolvedVersion, hadoopVersion, ivyPath, centralRepo), hadoopVersion) + (downloadVersion(resolvedVersion, hadoopVersion, ivyPath, remoteRepos), hadoopVersion) } catch { case e: RuntimeException if e.getMessage.contains("hadoop") => // If the error message contains hadoop, it is probably because the hadoop @@ -77,7 +77,7 @@ private[hive] object IsolatedClientLoader extends Logging { "spark.sql.hive.metastore.jars in the production environment.") _sharesHadoopClasses = false (downloadVersion( - resolvedVersion, fallbackVersion, ivyPath, centralRepo), fallbackVersion) + resolvedVersion, fallbackVersion, ivyPath, remoteRepos), fallbackVersion) } resolvedVersions.put((resolvedVersion, actualHadoopVersion), downloadedFiles) resolvedVersions((resolvedVersion, actualHadoopVersion)) @@ -116,7 +116,7 @@ private[hive] object IsolatedClientLoader extends Logging { version: HiveVersion, hadoopVersion: String, ivyPath: Option[String], - centralRepo: String): Seq[URL] = { + remoteRepos: String): Seq[URL] = { val hiveArtifacts = version.extraDeps ++ Seq("hive-metastore", "hive-exec", "hive-common", "hive-serde") .map(a => s"org.apache.hive:$a:${version.fullVersion}") ++ @@ -126,7 +126,7 @@ private[hive] object IsolatedClientLoader extends Logging { SparkSubmitUtils.resolveMavenCoordinates( hiveArtifacts.mkString(","), SparkSubmitUtils.buildIvySettings( - Some(centralRepo), + Some(remoteRepos), ivyPath), exclusions = version.exclusions) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala index d68a47053f18c..f6c09b84f01d5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -70,7 +70,11 @@ object TestHive // LocalRelation will exercise the optimization rules better by disabling it as // this rule may potentially block testing of other optimization rules such as // ConstantPropagation etc. - .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName))) + .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName) + // Add additional remote maven mirror repo here for avoiding the jenkins is blocked + // by maven central. + .set(SQLConf.ADDITIONAL_REMOTE_REPOSITORIES.key, + "https://maven-central.storage-download.googleapis.com/repos/central/data/"))) case class TestHiveVersion(hiveClient: HiveClient) From a1502c539af057830192a84ab1051500b8c20a53 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Tue, 24 Sep 2019 20:50:14 +0800 Subject: [PATCH 4/7] fix comment --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 88f0dc4a88076..fa2868f9d4192 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1983,7 +1983,8 @@ object SQLConf { private[spark] val ADDITIONAL_REMOTE_REPOSITORIES = ConfigBuilder("spark.sql.additionalRemoteRepositories") .doc("A comma-delimited string config of the optional additional remote maven mirror " + - "repositories, this can be used for downloading Hive jars in IsolatedClientLoader.") + "repositories, this can be used for downloading Hive jars in IsolatedClientLoader " + + "if the default maven central repo is unreachable.") .stringConf .createWithDefault("") } From 7cc0607c22f1bff8e2feb2b3d16ccba653a96cb1 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Wed, 25 Sep 2019 11:31:03 +0800 Subject: [PATCH 5/7] Address comments --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 3 ++- .../scala/org/apache/spark/sql/hive/test/TestHive.scala | 6 +----- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index fa2868f9d4192..bf20fd99a830f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1986,7 +1986,8 @@ object SQLConf { "repositories, this can be used for downloading Hive jars in IsolatedClientLoader " + "if the default maven central repo is unreachable.") .stringConf - .createWithDefault("") + .createWithDefault( + "https://maven-central.storage-download.googleapis.com/repos/central/data/") } /** diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala index f6c09b84f01d5..d68a47053f18c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -70,11 +70,7 @@ object TestHive // LocalRelation will exercise the optimization rules better by disabling it as // this rule may potentially block testing of other optimization rules such as // ConstantPropagation etc. - .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName) - // Add additional remote maven mirror repo here for avoiding the jenkins is blocked - // by maven central. - .set(SQLConf.ADDITIONAL_REMOTE_REPOSITORIES.key, - "https://maven-central.storage-download.googleapis.com/repos/central/data/"))) + .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName))) case class TestHiveVersion(hiveClient: HiveClient) From 5bd630c1504648069a7e0e56c70cf3ab8ebc447f Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Wed, 25 Sep 2019 20:08:12 +0800 Subject: [PATCH 6/7] address comment --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 6 +++--- .../apache/spark/sql/hive/client/IsolatedClientLoader.scala | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index bf20fd99a830f..2d74c3f4053b0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1980,10 +1980,10 @@ object SQLConf { .booleanConf .createWithDefault(true) - private[spark] val ADDITIONAL_REMOTE_REPOSITORIES = - ConfigBuilder("spark.sql.additionalRemoteRepositories") + val ADDITIONAL_REMOTE_REPOSITORIES = + buildConf("spark.sql.additionalRemoteRepositories") .doc("A comma-delimited string config of the optional additional remote maven mirror " + - "repositories, this can be used for downloading Hive jars in IsolatedClientLoader " + + "repositories, this is only used for downloading Hive jars in IsolatedClientLoader " + "if the default maven central repo is unreachable.") .stringConf .createWithDefault( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index 8d92c30246feb..be50cb0ded5b5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -55,13 +55,13 @@ private[hive] object IsolatedClientLoader extends Logging { barrierPrefixes: Seq[String] = Seq.empty, sharesHadoopClasses: Boolean = true): IsolatedClientLoader = synchronized { val resolvedVersion = hiveVersion(hiveMetastoreVersion) - val remoteRepos = sparkConf.get(SQLConf.ADDITIONAL_REMOTE_REPOSITORIES) // We will first try to share Hadoop classes. If we cannot resolve the Hadoop artifact // with the given version, we will use Hadoop 2.7 and then will not share Hadoop classes. var _sharesHadoopClasses = sharesHadoopClasses val files = if (resolvedVersions.contains((resolvedVersion, hadoopVersion))) { resolvedVersions((resolvedVersion, hadoopVersion)) } else { + val remoteRepos = sparkConf.get(SQLConf.ADDITIONAL_REMOTE_REPOSITORIES) val (downloadedFiles, actualHadoopVersion) = try { (downloadVersion(resolvedVersion, hadoopVersion, ivyPath, remoteRepos), hadoopVersion) @@ -122,6 +122,7 @@ private[hive] object IsolatedClientLoader extends Logging { .map(a => s"org.apache.hive:$a:${version.fullVersion}") ++ Seq("com.google.guava:guava:14.0.1", s"org.apache.hadoop:hadoop-client:$hadoopVersion") + val classpath = quietly { SparkSubmitUtils.resolveMavenCoordinates( hiveArtifacts.mkString(","), From 6e6b87c776653459388d2cef3eb8f26e96c664f2 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Thu, 26 Sep 2019 11:53:34 +0800 Subject: [PATCH 7/7] fix doc --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 2d74c3f4053b0..d4f09abcd8048 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1982,9 +1982,9 @@ object SQLConf { val ADDITIONAL_REMOTE_REPOSITORIES = buildConf("spark.sql.additionalRemoteRepositories") - .doc("A comma-delimited string config of the optional additional remote maven mirror " + - "repositories, this is only used for downloading Hive jars in IsolatedClientLoader " + - "if the default maven central repo is unreachable.") + .doc("A comma-delimited string config of the optional additional remote Maven mirror " + + "repositories. This is only used for downloading Hive jars in IsolatedClientLoader " + + "if the default Maven Central repo is unreachable.") .stringConf .createWithDefault( "https://maven-central.storage-download.googleapis.com/repos/central/data/")