Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.metadata.Table
import org.apache.hadoop.hive.ql.parse.VariableSubstitution
import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}
import org.apache.hadoop.util.VersionInfo

import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.SQLConf.SQLConfEntry
Expand Down Expand Up @@ -288,7 +289,8 @@ class HiveContext private[hive](
logInfo(
s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using maven.")
IsolatedClientLoader.forVersion(
version = hiveMetastoreVersion,
hiveMetastoreVersion = hiveMetastoreVersion,
hadoopVersion = VersionInfo.getVersion,
config = allConfig,
barrierPrefixes = hiveMetastoreBarrierPrefixes,
sharedPrefixes = hiveMetastoreSharedPrefixes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,51 @@ import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.util.{MutableURLClassLoader, Utils}

/** Factory for `IsolatedClientLoader` with specific versions of hive. */
private[hive] object IsolatedClientLoader {
private[hive] object IsolatedClientLoader extends Logging {
/**
* Creates isolated Hive client loaders by downloading the requested version from maven.
*/
def forVersion(
version: String,
hiveMetastoreVersion: String,
hadoopVersion: String,
config: Map[String, String] = Map.empty,
ivyPath: Option[String] = None,
sharedPrefixes: Seq[String] = Seq.empty,
barrierPrefixes: Seq[String] = Seq.empty): IsolatedClientLoader = synchronized {
val resolvedVersion = hiveVersion(version)
val files = resolvedVersions.getOrElseUpdate(resolvedVersion,
downloadVersion(resolvedVersion, ivyPath))
val resolvedVersion = hiveVersion(hiveMetastoreVersion)
// We will first try to share Hadoop classes. If we cannot resolve the Hadoop artifact
// with the given version, we will use Hadoop 2.4.0 and then will not share Hadoop classes.
var sharesHadoopClasses = true
val files = if (resolvedVersions.contains((resolvedVersion, hadoopVersion))) {
resolvedVersions((resolvedVersion, hadoopVersion))
} else {
val (downloadedFiles, actualHadoopVersion) =
try {
(downloadVersion(resolvedVersion, hadoopVersion, ivyPath), hadoopVersion)
} catch {
case e: RuntimeException if e.getMessage.contains("hadoop") =>
// If the error message contains hadoop, it is probably because the hadoop
// version cannot be resolved (e.g. it is a vendor specific version like
// 2.0.0-cdh4.1.1). If it is the case, we will try just
// "org.apache.hadoop:hadoop-client:2.4.0". "org.apache.hadoop:hadoop-client:2.4.0"
// is used just because we used to hard code it as the hadoop artifact to download.
logWarning(s"Failed to resolve Hadoop artifacts for the version ${hadoopVersion}. " +
s"We will change the hadoop version from ${hadoopVersion} to 2.4.0 and try again. " +
"Hadoop classes will not be shared between Spark and Hive metastore client. " +
"It is recommended to set jars used by Hive metastore client through " +
"spark.sql.hive.metastore.jars in the production environment.")
sharesHadoopClasses = false
(downloadVersion(resolvedVersion, "2.4.0", ivyPath), "2.4.0")
}
resolvedVersions.put((resolvedVersion, actualHadoopVersion), downloadedFiles)
resolvedVersions((resolvedVersion, actualHadoopVersion))
}

new IsolatedClientLoader(
version = hiveVersion(version),
version = hiveVersion(hiveMetastoreVersion),
execJars = files,
config = config,
sharesHadoopClasses = sharesHadoopClasses,
sharedPrefixes = sharedPrefixes,
barrierPrefixes = barrierPrefixes)
}
Expand All @@ -64,12 +92,15 @@ private[hive] object IsolatedClientLoader {
case "1.2" | "1.2.0" | "1.2.1" => hive.v1_2
}

private def downloadVersion(version: HiveVersion, ivyPath: Option[String]): Seq[URL] = {
private def downloadVersion(
version: HiveVersion,
hadoopVersion: String,
ivyPath: Option[String]): Seq[URL] = {
val hiveArtifacts = version.extraDeps ++
Seq("hive-metastore", "hive-exec", "hive-common", "hive-serde")
.map(a => s"org.apache.hive:$a:${version.fullVersion}") ++
Seq("com.google.guava:guava:14.0.1",
"org.apache.hadoop:hadoop-client:2.4.0")
s"org.apache.hadoop:hadoop-client:$hadoopVersion")

val classpath = quietly {
SparkSubmitUtils.resolveMavenCoordinates(
Expand All @@ -86,7 +117,10 @@ private[hive] object IsolatedClientLoader {
tempDir.listFiles().map(_.toURI.toURL)
}

private def resolvedVersions = new scala.collection.mutable.HashMap[HiveVersion, Seq[URL]]
// A map from a given pair of HiveVersion and Hadoop version to jar files.
// It is only used by forVersion.
private val resolvedVersions =
new scala.collection.mutable.HashMap[(HiveVersion, String), Seq[URL]]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marmbrus This should be a val, right?

}

/**
Expand All @@ -106,6 +140,7 @@ private[hive] object IsolatedClientLoader {
* @param config A set of options that will be added to the HiveConf of the constructed client.
* @param isolationOn When true, custom versions of barrier classes will be constructed. Must be
* true unless loading the version of hive that is on Sparks classloader.
* @param sharesHadoopClasses When true, we will share Hadoop classes between Spark and
* @param rootClassLoader The system root classloader. Must not know about Hive classes.
* @param baseClassLoader The spark classloader that is used to load shared classes.
*/
Expand All @@ -114,6 +149,7 @@ private[hive] class IsolatedClientLoader(
val execJars: Seq[URL] = Seq.empty,
val config: Map[String, String] = Map.empty,
val isolationOn: Boolean = true,
val sharesHadoopClasses: Boolean = true,
val rootClassLoader: ClassLoader = ClassLoader.getSystemClassLoader.getParent.getParent,
val baseClassLoader: ClassLoader = Thread.currentThread().getContextClassLoader,
val sharedPrefixes: Seq[String] = Seq.empty,
Expand All @@ -126,16 +162,20 @@ private[hive] class IsolatedClientLoader(
/** All jars used by the hive specific classloader. */
protected def allJars = execJars.toArray

protected def isSharedClass(name: String): Boolean =
protected def isSharedClass(name: String): Boolean = {
val isHadoopClass =
name.startsWith("org.apache.hadoop.") && !name.startsWith("org.apache.hadoop.hive.")

name.contains("slf4j") ||
name.contains("log4j") ||
name.startsWith("org.apache.spark.") ||
(name.startsWith("org.apache.hadoop.") && !name.startsWith("org.apache.hadoop.hive.")) ||
(sharesHadoopClasses && isHadoopClass) ||
name.startsWith("scala.") ||
(name.startsWith("com.google") && !name.startsWith("com.google.cloud")) ||
name.startsWith("java.lang.") ||
name.startsWith("java.net") ||
sharedPrefixes.exists(name.startsWith)
}

/** True if `name` refers to a spark class that must see specific version of Hive. */
protected def isBarrierClass(name: String): Boolean =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.hive.client

import java.io.File

import org.apache.hadoop.util.VersionInfo

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{Logging, SparkFunSuite}
import org.apache.spark.sql.catalyst.expressions.{NamedExpression, Literal, AttributeReference, EqualTo}
Expand Down Expand Up @@ -53,9 +55,11 @@ class VersionsSuite extends SparkFunSuite with Logging {
}

test("success sanity check") {
val badClient = IsolatedClientLoader.forVersion(HiveContext.hiveExecutionVersion,
buildConf(),
ivyPath).createClient()
val badClient = IsolatedClientLoader.forVersion(
hiveMetastoreVersion = HiveContext.hiveExecutionVersion,
hadoopVersion = VersionInfo.getVersion,
config = buildConf(),
ivyPath = ivyPath).createClient()
val db = new HiveDatabase("default", "")
badClient.createDatabase(db)
}
Expand Down Expand Up @@ -85,7 +89,11 @@ class VersionsSuite extends SparkFunSuite with Logging {
ignore("failure sanity check") {
val e = intercept[Throwable] {
val badClient = quietly {
IsolatedClientLoader.forVersion("13", buildConf(), ivyPath).createClient()
IsolatedClientLoader.forVersion(
hiveMetastoreVersion = "13",
hadoopVersion = VersionInfo.getVersion,
config = buildConf(),
ivyPath = ivyPath).createClient()
}
}
assert(getNestedMessages(e) contains "Unknown column 'A0.OWNER_NAME' in 'field list'")
Expand All @@ -99,7 +107,12 @@ class VersionsSuite extends SparkFunSuite with Logging {
test(s"$version: create client") {
client = null
System.gc() // Hack to avoid SEGV on some JVM versions.
client = IsolatedClientLoader.forVersion(version, buildConf(), ivyPath).createClient()
client =
IsolatedClientLoader.forVersion(
hiveMetastoreVersion = version,
hadoopVersion = VersionInfo.getVersion,
config = buildConf(),
ivyPath = ivyPath).createClient()
}

test(s"$version: createDatabase") {
Expand Down