Skip to content

Commit 64813b7

Browse files
committed
[SPARK-11998][SQL][TEST-HADOOP2.0] When downloading Hadoop artifacts from maven, we need to try to download the version that is used by Spark
If we need to download Hive/Hadoop artifacts, try to download a Hadoop that matches the Hadoop used by Spark. If the Hadoop artifact cannot be resolved (e.g. Hadoop version is a vendor specific version like 2.0.0-cdh4.1.1), we will use Hadoop 2.4.0 (we used to hard code this version as the hadoop that we will download from maven) and we will not share Hadoop classes. I tested this match in my laptop with the following confs (these confs are used by our builds). All tests are good. ``` build/sbt -Phadoop-1 -Dhadoop.version=1.2.1 -Pkinesis-asl -Phive-thriftserver -Phive build/sbt -Phadoop-1 -Dhadoop.version=2.0.0-mr1-cdh4.1.1 -Pkinesis-asl -Phive-thriftserver -Phive build/sbt -Pyarn -Phadoop-2.2 -Pkinesis-asl -Phive-thriftserver -Phive build/sbt -Pyarn -Phadoop-2.3 -Dhadoop.version=2.3.0 -Pkinesis-asl -Phive-thriftserver -Phive ``` Author: Yin Huai <[email protected]> Closes #9979 from yhuai/versionsSuite. (cherry picked from commit ad76562) Signed-off-by: Yin Huai <[email protected]>
1 parent ce90bbe commit 64813b7

File tree

3 files changed

+72
-17
lines changed

3 files changed

+72
-17
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars
3535
import org.apache.hadoop.hive.ql.metadata.Table
3636
import org.apache.hadoop.hive.ql.parse.VariableSubstitution
3737
import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}
38+
import org.apache.hadoop.util.VersionInfo
3839

3940
import org.apache.spark.api.java.JavaSparkContext
4041
import org.apache.spark.sql.SQLConf.SQLConfEntry
@@ -288,7 +289,8 @@ class HiveContext private[hive](
288289
logInfo(
289290
s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using maven.")
290291
IsolatedClientLoader.forVersion(
291-
version = hiveMetastoreVersion,
292+
hiveMetastoreVersion = hiveMetastoreVersion,
293+
hadoopVersion = VersionInfo.getVersion,
292294
config = allConfig,
293295
barrierPrefixes = hiveMetastoreBarrierPrefixes,
294296
sharedPrefixes = hiveMetastoreSharedPrefixes)

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala

Lines changed: 51 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,23 +34,51 @@ import org.apache.spark.sql.hive.HiveContext
3434
import org.apache.spark.util.{MutableURLClassLoader, Utils}
3535

3636
/** Factory for `IsolatedClientLoader` with specific versions of hive. */
37-
private[hive] object IsolatedClientLoader {
37+
private[hive] object IsolatedClientLoader extends Logging {
3838
/**
3939
* Creates isolated Hive client loaders by downloading the requested version from maven.
4040
*/
4141
def forVersion(
42-
version: String,
42+
hiveMetastoreVersion: String,
43+
hadoopVersion: String,
4344
config: Map[String, String] = Map.empty,
4445
ivyPath: Option[String] = None,
4546
sharedPrefixes: Seq[String] = Seq.empty,
4647
barrierPrefixes: Seq[String] = Seq.empty): IsolatedClientLoader = synchronized {
47-
val resolvedVersion = hiveVersion(version)
48-
val files = resolvedVersions.getOrElseUpdate(resolvedVersion,
49-
downloadVersion(resolvedVersion, ivyPath))
48+
val resolvedVersion = hiveVersion(hiveMetastoreVersion)
49+
// We will first try to share Hadoop classes. If we cannot resolve the Hadoop artifact
50+
// with the given version, we will use Hadoop 2.4.0 and then will not share Hadoop classes.
51+
var sharesHadoopClasses = true
52+
val files = if (resolvedVersions.contains((resolvedVersion, hadoopVersion))) {
53+
resolvedVersions((resolvedVersion, hadoopVersion))
54+
} else {
55+
val (downloadedFiles, actualHadoopVersion) =
56+
try {
57+
(downloadVersion(resolvedVersion, hadoopVersion, ivyPath), hadoopVersion)
58+
} catch {
59+
case e: RuntimeException if e.getMessage.contains("hadoop") =>
60+
// If the error message contains hadoop, it is probably because the hadoop
61+
// version cannot be resolved (e.g. it is a vendor specific version like
62+
// 2.0.0-cdh4.1.1). If it is the case, we will try just
63+
// "org.apache.hadoop:hadoop-client:2.4.0". "org.apache.hadoop:hadoop-client:2.4.0"
64+
// is used just because we used to hard code it as the hadoop artifact to download.
65+
logWarning(s"Failed to resolve Hadoop artifacts for the version ${hadoopVersion}. " +
66+
s"We will change the hadoop version from ${hadoopVersion} to 2.4.0 and try again. " +
67+
"Hadoop classes will not be shared between Spark and Hive metastore client. " +
68+
"It is recommended to set jars used by Hive metastore client through " +
69+
"spark.sql.hive.metastore.jars in the production environment.")
70+
sharesHadoopClasses = false
71+
(downloadVersion(resolvedVersion, "2.4.0", ivyPath), "2.4.0")
72+
}
73+
resolvedVersions.put((resolvedVersion, actualHadoopVersion), downloadedFiles)
74+
resolvedVersions((resolvedVersion, actualHadoopVersion))
75+
}
76+
5077
new IsolatedClientLoader(
51-
version = hiveVersion(version),
78+
version = hiveVersion(hiveMetastoreVersion),
5279
execJars = files,
5380
config = config,
81+
sharesHadoopClasses = sharesHadoopClasses,
5482
sharedPrefixes = sharedPrefixes,
5583
barrierPrefixes = barrierPrefixes)
5684
}
@@ -64,12 +92,15 @@ private[hive] object IsolatedClientLoader {
6492
case "1.2" | "1.2.0" | "1.2.1" => hive.v1_2
6593
}
6694

67-
private def downloadVersion(version: HiveVersion, ivyPath: Option[String]): Seq[URL] = {
95+
private def downloadVersion(
96+
version: HiveVersion,
97+
hadoopVersion: String,
98+
ivyPath: Option[String]): Seq[URL] = {
6899
val hiveArtifacts = version.extraDeps ++
69100
Seq("hive-metastore", "hive-exec", "hive-common", "hive-serde")
70101
.map(a => s"org.apache.hive:$a:${version.fullVersion}") ++
71102
Seq("com.google.guava:guava:14.0.1",
72-
"org.apache.hadoop:hadoop-client:2.4.0")
103+
s"org.apache.hadoop:hadoop-client:$hadoopVersion")
73104

74105
val classpath = quietly {
75106
SparkSubmitUtils.resolveMavenCoordinates(
@@ -86,7 +117,10 @@ private[hive] object IsolatedClientLoader {
86117
tempDir.listFiles().map(_.toURI.toURL)
87118
}
88119

89-
private def resolvedVersions = new scala.collection.mutable.HashMap[HiveVersion, Seq[URL]]
120+
// A map from a given pair of HiveVersion and Hadoop version to jar files.
121+
// It is only used by forVersion.
122+
private val resolvedVersions =
123+
new scala.collection.mutable.HashMap[(HiveVersion, String), Seq[URL]]
90124
}
91125

92126
/**
@@ -106,6 +140,7 @@ private[hive] object IsolatedClientLoader {
106140
* @param config A set of options that will be added to the HiveConf of the constructed client.
107141
* @param isolationOn When true, custom versions of barrier classes will be constructed. Must be
108142
* true unless loading the version of hive that is on Sparks classloader.
143+
* @param sharesHadoopClasses When true, we will share Hadoop classes between Spark and
109144
* @param rootClassLoader The system root classloader. Must not know about Hive classes.
110145
* @param baseClassLoader The spark classloader that is used to load shared classes.
111146
*/
@@ -114,6 +149,7 @@ private[hive] class IsolatedClientLoader(
114149
val execJars: Seq[URL] = Seq.empty,
115150
val config: Map[String, String] = Map.empty,
116151
val isolationOn: Boolean = true,
152+
val sharesHadoopClasses: Boolean = true,
117153
val rootClassLoader: ClassLoader = ClassLoader.getSystemClassLoader.getParent.getParent,
118154
val baseClassLoader: ClassLoader = Thread.currentThread().getContextClassLoader,
119155
val sharedPrefixes: Seq[String] = Seq.empty,
@@ -126,16 +162,20 @@ private[hive] class IsolatedClientLoader(
126162
/** All jars used by the hive specific classloader. */
127163
protected def allJars = execJars.toArray
128164

129-
protected def isSharedClass(name: String): Boolean =
165+
protected def isSharedClass(name: String): Boolean = {
166+
val isHadoopClass =
167+
name.startsWith("org.apache.hadoop.") && !name.startsWith("org.apache.hadoop.hive.")
168+
130169
name.contains("slf4j") ||
131170
name.contains("log4j") ||
132171
name.startsWith("org.apache.spark.") ||
133-
(name.startsWith("org.apache.hadoop.") && !name.startsWith("org.apache.hadoop.hive.")) ||
172+
(sharesHadoopClasses && isHadoopClass) ||
134173
name.startsWith("scala.") ||
135174
(name.startsWith("com.google") && !name.startsWith("com.google.cloud")) ||
136175
name.startsWith("java.lang.") ||
137176
name.startsWith("java.net") ||
138177
sharedPrefixes.exists(name.startsWith)
178+
}
139179

140180
/** True if `name` refers to a spark class that must see specific version of Hive. */
141181
protected def isBarrierClass(name: String): Boolean =

sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package org.apache.spark.sql.hive.client
1919

2020
import java.io.File
2121

22+
import org.apache.hadoop.util.VersionInfo
23+
2224
import org.apache.spark.sql.hive.HiveContext
2325
import org.apache.spark.{Logging, SparkFunSuite}
2426
import org.apache.spark.sql.catalyst.expressions.{NamedExpression, Literal, AttributeReference, EqualTo}
@@ -53,9 +55,11 @@ class VersionsSuite extends SparkFunSuite with Logging {
5355
}
5456

5557
test("success sanity check") {
56-
val badClient = IsolatedClientLoader.forVersion(HiveContext.hiveExecutionVersion,
57-
buildConf(),
58-
ivyPath).createClient()
58+
val badClient = IsolatedClientLoader.forVersion(
59+
hiveMetastoreVersion = HiveContext.hiveExecutionVersion,
60+
hadoopVersion = VersionInfo.getVersion,
61+
config = buildConf(),
62+
ivyPath = ivyPath).createClient()
5963
val db = new HiveDatabase("default", "")
6064
badClient.createDatabase(db)
6165
}
@@ -85,7 +89,11 @@ class VersionsSuite extends SparkFunSuite with Logging {
8589
ignore("failure sanity check") {
8690
val e = intercept[Throwable] {
8791
val badClient = quietly {
88-
IsolatedClientLoader.forVersion("13", buildConf(), ivyPath).createClient()
92+
IsolatedClientLoader.forVersion(
93+
hiveMetastoreVersion = "13",
94+
hadoopVersion = VersionInfo.getVersion,
95+
config = buildConf(),
96+
ivyPath = ivyPath).createClient()
8997
}
9098
}
9199
assert(getNestedMessages(e) contains "Unknown column 'A0.OWNER_NAME' in 'field list'")
@@ -99,7 +107,12 @@ class VersionsSuite extends SparkFunSuite with Logging {
99107
test(s"$version: create client") {
100108
client = null
101109
System.gc() // Hack to avoid SEGV on some JVM versions.
102-
client = IsolatedClientLoader.forVersion(version, buildConf(), ivyPath).createClient()
110+
client =
111+
IsolatedClientLoader.forVersion(
112+
hiveMetastoreVersion = version,
113+
hadoopVersion = VersionInfo.getVersion,
114+
config = buildConf(),
115+
ivyPath = ivyPath).createClient()
103116
}
104117

105118
test(s"$version: createDatabase") {

0 commit comments

Comments
 (0)