Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,15 @@ private[spark] object TestUtils {
baseClass: String = null,
classpathUrls: Seq[URL] = Seq.empty,
implementsClasses: Seq[String] = Seq.empty,
extraCodeBody: String = ""): File = {
extraCodeBody: String = "",
packageName: Option[String] = None): File = {
val extendsText = Option(baseClass).map { c => s" extends ${c}" }.getOrElse("")
val implementsText =
"implements " + (implementsClasses :+ "java.io.Serializable").mkString(", ")
val packageText = packageName.map(p => s"package $p;\n").getOrElse("")
val sourceFile = new JavaSourceFromString(className,
s"""
|$packageText
|public class $className $extendsText $implementsText {
| @Override public String toString() { return "$toStringValue"; }
|
Expand Down
53 changes: 3 additions & 50 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,17 @@
package org.apache.spark.sql.hive

import java.io.File
import java.net.{URL, URLClassLoader}
import java.net.URL
import java.util.Locale
import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
import scala.util.Try

import org.apache.commons.lang3.{JavaVersion, SystemUtils}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.util.VersionInfo
import org.apache.hive.common.util.HiveVersionInfo

Expand All @@ -46,7 +44,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf._
import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH
import org.apache.spark.sql.types._
import org.apache.spark.util.{ChildFirstURLClassLoader, Utils}
import org.apache.spark.util.Utils


private[spark] object HiveUtils extends Logging {
Expand Down Expand Up @@ -321,22 +319,6 @@ private[spark] object HiveUtils extends Logging {
(commonTimeVars ++ hardcodingTimeVars).toMap
}

/**
* Check current Thread's SessionState type
* @return true when SessionState.get returns an instance of CliSessionState,
* false when it gets non-CliSessionState instance or null
*/
def isCliSessionState(): Boolean = {
val state = SessionState.get
var temp: Class[_] = if (state != null) state.getClass else null
var found = false
while (temp != null && !found) {
found = temp.getName == "org.apache.hadoop.hive.cli.CliSessionState"
temp = temp.getSuperclass
}
found
}

/**
* Create a [[HiveClient]] used for execution.
*
Expand Down Expand Up @@ -409,43 +391,14 @@ private[spark] object HiveUtils extends Logging {
s"or change ${HIVE_METASTORE_VERSION.key} to $builtinHiveVersion.")
}

// We recursively find all jars in the class loader chain,
// starting from the given classLoader.
def allJars(classLoader: ClassLoader): Array[URL] = classLoader match {
case null => Array.empty[URL]
case childFirst: ChildFirstURLClassLoader =>
childFirst.getURLs() ++ allJars(Utils.getSparkClassLoader)
case urlClassLoader: URLClassLoader =>
urlClassLoader.getURLs ++ allJars(urlClassLoader.getParent)
case other => allJars(other.getParent)
}

val classLoader = Utils.getContextOrSparkClassLoader
val jars: Array[URL] = if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
// Do nothing. The system classloader is no longer a URLClassLoader in Java 9,
// so it won't match the case in allJars. It no longer exposes URLs of
// the system classpath
Array.empty[URL]
} else {
val loadedJars = allJars(classLoader)
// Verify at least one jar was found
if (loadedJars.length == 0) {
throw new IllegalArgumentException(
"Unable to locate hive jars to connect to metastore. " +
s"Please set ${HIVE_METASTORE_JARS.key}.")
}
loadedJars
}

logInfo(
s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using Spark classes.")
new IsolatedClientLoader(
version = metaVersion,
sparkConf = conf,
hadoopConf = hadoopConf,
execJars = jars.toSeq,
config = configurations,
isolationOn = !isCliSessionState(),
isolationOn = false,
barrierPrefixes = hiveMetastoreBarrierPrefixes,
sharedPrefixes = hiveMetastoreSharedPrefixes)
} else if (hiveMetastoreJars == "maven") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,51 +232,46 @@ private[hive] class IsolatedClientLoader(
private[hive] val classLoader: MutableURLClassLoader = {
val isolatedClassLoader =
if (isolationOn) {
if (allJars.isEmpty) {
// See HiveUtils; this is the Java 9+ + builtin mode scenario
baseClassLoader
} else {
val rootClassLoader: ClassLoader =
if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
// In Java 9, the boot classloader can see few JDK classes. The intended parent
// classloader for delegation is now the platform classloader.
// See http://java9.wtf/class-loading/
val platformCL =
classOf[ClassLoader].getMethod("getPlatformClassLoader").
invoke(null).asInstanceOf[ClassLoader]
// Check to make sure that the root classloader does not know about Hive.
assert(Try(platformCL.loadClass("org.apache.hadoop.hive.conf.HiveConf")).isFailure)
platformCL
val rootClassLoader: ClassLoader =
if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
// In Java 9, the boot classloader can see few JDK classes. The intended parent
// classloader for delegation is now the platform classloader.
// See http://java9.wtf/class-loading/
val platformCL =
classOf[ClassLoader].getMethod("getPlatformClassLoader").
invoke(null).asInstanceOf[ClassLoader]
// Check to make sure that the root classloader does not know about Hive.
assert(Try(platformCL.loadClass("org.apache.hadoop.hive.conf.HiveConf")).isFailure)
platformCL
} else {
// The boot classloader is represented by null (the instance itself isn't accessible)
// and before Java 9 can see all JDK classes
null
}
new URLClassLoader(allJars, rootClassLoader) {
override def loadClass(name: String, resolve: Boolean): Class[_] = {
val loaded = findLoadedClass(name)
if (loaded == null) doLoadClass(name, resolve) else loaded
}
def doLoadClass(name: String, resolve: Boolean): Class[_] = {
val classFileName = name.replaceAll("\\.", "/") + ".class"
if (isBarrierClass(name)) {
// For barrier classes, we construct a new copy of the class.
val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName))
logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}")
defineClass(name, bytes, 0, bytes.length)
} else if (!isSharedClass(name)) {
logDebug(s"hive class: $name - ${getResource(classToPath(name))}")
super.loadClass(name, resolve)
} else {
// The boot classloader is represented by null (the instance itself isn't accessible)
// and before Java 9 can see all JDK classes
null
}
new URLClassLoader(allJars, rootClassLoader) {
override def loadClass(name: String, resolve: Boolean): Class[_] = {
val loaded = findLoadedClass(name)
if (loaded == null) doLoadClass(name, resolve) else loaded
}
def doLoadClass(name: String, resolve: Boolean): Class[_] = {
val classFileName = name.replaceAll("\\.", "/") + ".class"
if (isBarrierClass(name)) {
// For barrier classes, we construct a new copy of the class.
val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName))
logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}")
defineClass(name, bytes, 0, bytes.length)
} else if (!isSharedClass(name)) {
logDebug(s"hive class: $name - ${getResource(classToPath(name))}")
super.loadClass(name, resolve)
} else {
// For shared classes, we delegate to baseClassLoader, but fall back in case the
// class is not found.
logDebug(s"shared class: $name")
try {
baseClassLoader.loadClass(name)
} catch {
case _: ClassNotFoundException =>
super.loadClass(name, resolve)
}
// For shared classes, we delegate to baseClassLoader, but fall back in case the
// class is not found.
logDebug(s"shared class: $name")
try {
baseClassLoader.loadClass(name)
} catch {
case _: ClassNotFoundException =>
super.loadClass(name, resolve)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@

package org.apache.spark.sql.hive

import java.io.File
import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.conf.HiveConf.ConfVars

import org.apache.spark.SparkConf
import org.apache.spark.{SparkConf, TestUtils}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.catalog.CatalogDatabase
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.util.ChildFirstURLClassLoader
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader}

class HiveUtilsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {

Expand Down Expand Up @@ -77,6 +81,32 @@ class HiveUtilsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton
}
}

test("SPARK-42539: User-provided JARs should not take precedence over builtin Hive JARs") {
withTempDir { tmpDir =>
val classFile = TestUtils.createCompiledClass(
"Hive", tmpDir, packageName = Some("org.apache.hadoop.hive.ql.metadata"))

val jarFile = new File(tmpDir, "hive-fake.jar")
TestUtils.createJar(Seq(classFile), jarFile, Some("org/apache/hadoop/hive/ql/metadata"))

val conf = new SparkConf
val contextClassLoader = Thread.currentThread().getContextClassLoader
val loader = new MutableURLClassLoader(Array(jarFile.toURI.toURL), contextClassLoader)
try {
Thread.currentThread().setContextClassLoader(loader)
val client = HiveUtils.newClientForMetadata(
conf,
SparkHadoopUtil.newConfiguration(conf),
HiveUtils.newTemporaryConfiguration(useInMemoryDerby = true))
client.createDatabase(
CatalogDatabase("foo", "", URI.create(s"file://${tmpDir.getAbsolutePath}/foo.db"), Map()),
ignoreIfExists = true)
} finally {
Thread.currentThread().setContextClassLoader(contextClassLoader)
}
}
}

test("SPARK-27349: Dealing with TimeVars removed in Hive 2.x") {
// Test default value
val defaultConf = new Configuration
Expand Down