Skip to content

Commit 26009d4

Browse files
committed
Revert "[SPARK-42539][SQL][HIVE] Eliminate separate classloader when using 'builtin' Hive version for metadata client"
This reverts commit 27ad583.
1 parent be88832 commit 26009d4

File tree

4 files changed

+97
-78
lines changed

4 files changed

+97
-78
lines changed

core/src/main/scala/org/apache/spark/TestUtils.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -193,15 +193,12 @@ private[spark] object TestUtils {
193193
baseClass: String = null,
194194
classpathUrls: Seq[URL] = Seq.empty,
195195
implementsClasses: Seq[String] = Seq.empty,
196-
extraCodeBody: String = "",
197-
packageName: Option[String] = None): File = {
196+
extraCodeBody: String = ""): File = {
198197
val extendsText = Option(baseClass).map { c => s" extends ${c}" }.getOrElse("")
199198
val implementsText =
200199
"implements " + (implementsClasses :+ "java.io.Serializable").mkString(", ")
201-
val packageText = packageName.map(p => s"package $p;\n").getOrElse("")
202200
val sourceFile = new JavaSourceFromString(className,
203201
s"""
204-
|$packageText
205202
|public class $className $extendsText $implementsText {
206203
| @Override public String toString() { return "$toStringValue"; }
207204
|

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,19 @@
1818
package org.apache.spark.sql.hive
1919

2020
import java.io.File
21-
import java.net.URL
21+
import java.net.{URL, URLClassLoader}
2222
import java.util.Locale
2323
import java.util.concurrent.TimeUnit
2424

2525
import scala.collection.JavaConverters._
2626
import scala.collection.mutable.HashMap
2727
import scala.util.Try
2828

29+
import org.apache.commons.lang3.{JavaVersion, SystemUtils}
2930
import org.apache.hadoop.conf.Configuration
3031
import org.apache.hadoop.hive.conf.HiveConf
3132
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
33+
import org.apache.hadoop.hive.ql.session.SessionState
3234
import org.apache.hadoop.util.VersionInfo
3335
import org.apache.hive.common.util.HiveVersionInfo
3436

@@ -44,7 +46,7 @@ import org.apache.spark.sql.internal.SQLConf
4446
import org.apache.spark.sql.internal.SQLConf._
4547
import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH
4648
import org.apache.spark.sql.types._
47-
import org.apache.spark.util.Utils
49+
import org.apache.spark.util.{ChildFirstURLClassLoader, Utils}
4850

4951

5052
private[spark] object HiveUtils extends Logging {
@@ -319,6 +321,22 @@ private[spark] object HiveUtils extends Logging {
319321
(commonTimeVars ++ hardcodingTimeVars).toMap
320322
}
321323

324+
/**
325+
* Check current Thread's SessionState type
326+
* @return true when SessionState.get returns an instance of CliSessionState,
327+
* false when it gets non-CliSessionState instance or null
328+
*/
329+
def isCliSessionState(): Boolean = {
330+
val state = SessionState.get
331+
var temp: Class[_] = if (state != null) state.getClass else null
332+
var found = false
333+
while (temp != null && !found) {
334+
found = temp.getName == "org.apache.hadoop.hive.cli.CliSessionState"
335+
temp = temp.getSuperclass
336+
}
337+
found
338+
}
339+
322340
/**
323341
* Create a [[HiveClient]] used for execution.
324342
*
@@ -391,14 +409,43 @@ private[spark] object HiveUtils extends Logging {
391409
s"or change ${HIVE_METASTORE_VERSION.key} to $builtinHiveVersion.")
392410
}
393411

412+
// We recursively find all jars in the class loader chain,
413+
// starting from the given classLoader.
414+
def allJars(classLoader: ClassLoader): Array[URL] = classLoader match {
415+
case null => Array.empty[URL]
416+
case childFirst: ChildFirstURLClassLoader =>
417+
childFirst.getURLs() ++ allJars(Utils.getSparkClassLoader)
418+
case urlClassLoader: URLClassLoader =>
419+
urlClassLoader.getURLs ++ allJars(urlClassLoader.getParent)
420+
case other => allJars(other.getParent)
421+
}
422+
423+
val classLoader = Utils.getContextOrSparkClassLoader
424+
val jars: Array[URL] = if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
425+
// Do nothing. The system classloader is no longer a URLClassLoader in Java 9,
426+
// so it won't match the case in allJars. It no longer exposes URLs of
427+
// the system classpath
428+
Array.empty[URL]
429+
} else {
430+
val loadedJars = allJars(classLoader)
431+
// Verify at least one jar was found
432+
if (loadedJars.length == 0) {
433+
throw new IllegalArgumentException(
434+
"Unable to locate hive jars to connect to metastore. " +
435+
s"Please set ${HIVE_METASTORE_JARS.key}.")
436+
}
437+
loadedJars
438+
}
439+
394440
logInfo(
395441
s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using Spark classes.")
396442
new IsolatedClientLoader(
397443
version = metaVersion,
398444
sparkConf = conf,
399445
hadoopConf = hadoopConf,
446+
execJars = jars.toSeq,
400447
config = configurations,
401-
isolationOn = false,
448+
isolationOn = !isCliSessionState(),
402449
barrierPrefixes = hiveMetastoreBarrierPrefixes,
403450
sharedPrefixes = hiveMetastoreSharedPrefixes)
404451
} else if (hiveMetastoreJars == "maven") {

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala

Lines changed: 44 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -232,46 +232,51 @@ private[hive] class IsolatedClientLoader(
232232
private[hive] val classLoader: MutableURLClassLoader = {
233233
val isolatedClassLoader =
234234
if (isolationOn) {
235-
val rootClassLoader: ClassLoader =
236-
if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
237-
// In Java 9, the boot classloader can see few JDK classes. The intended parent
238-
// classloader for delegation is now the platform classloader.
239-
// See http://java9.wtf/class-loading/
240-
val platformCL =
241-
classOf[ClassLoader].getMethod("getPlatformClassLoader").
242-
invoke(null).asInstanceOf[ClassLoader]
243-
// Check to make sure that the root classloader does not know about Hive.
244-
assert(Try(platformCL.loadClass("org.apache.hadoop.hive.conf.HiveConf")).isFailure)
245-
platformCL
246-
} else {
247-
// The boot classloader is represented by null (the instance itself isn't accessible)
248-
// and before Java 9 can see all JDK classes
249-
null
250-
}
251-
new URLClassLoader(allJars, rootClassLoader) {
252-
override def loadClass(name: String, resolve: Boolean): Class[_] = {
253-
val loaded = findLoadedClass(name)
254-
if (loaded == null) doLoadClass(name, resolve) else loaded
255-
}
256-
def doLoadClass(name: String, resolve: Boolean): Class[_] = {
257-
val classFileName = name.replaceAll("\\.", "/") + ".class"
258-
if (isBarrierClass(name)) {
259-
// For barrier classes, we construct a new copy of the class.
260-
val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName))
261-
logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}")
262-
defineClass(name, bytes, 0, bytes.length)
263-
} else if (!isSharedClass(name)) {
264-
logDebug(s"hive class: $name - ${getResource(classToPath(name))}")
265-
super.loadClass(name, resolve)
235+
if (allJars.isEmpty) {
236+
// See HiveUtils; this is the Java 9+ + builtin mode scenario
237+
baseClassLoader
238+
} else {
239+
val rootClassLoader: ClassLoader =
240+
if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
241+
// In Java 9, the boot classloader can see few JDK classes. The intended parent
242+
// classloader for delegation is now the platform classloader.
243+
// See http://java9.wtf/class-loading/
244+
val platformCL =
245+
classOf[ClassLoader].getMethod("getPlatformClassLoader").
246+
invoke(null).asInstanceOf[ClassLoader]
247+
// Check to make sure that the root classloader does not know about Hive.
248+
assert(Try(platformCL.loadClass("org.apache.hadoop.hive.conf.HiveConf")).isFailure)
249+
platformCL
266250
} else {
267-
// For shared classes, we delegate to baseClassLoader, but fall back in case the
268-
// class is not found.
269-
logDebug(s"shared class: $name")
270-
try {
271-
baseClassLoader.loadClass(name)
272-
} catch {
273-
case _: ClassNotFoundException =>
274-
super.loadClass(name, resolve)
251+
// The boot classloader is represented by null (the instance itself isn't accessible)
252+
// and before Java 9 can see all JDK classes
253+
null
254+
}
255+
new URLClassLoader(allJars, rootClassLoader) {
256+
override def loadClass(name: String, resolve: Boolean): Class[_] = {
257+
val loaded = findLoadedClass(name)
258+
if (loaded == null) doLoadClass(name, resolve) else loaded
259+
}
260+
def doLoadClass(name: String, resolve: Boolean): Class[_] = {
261+
val classFileName = name.replaceAll("\\.", "/") + ".class"
262+
if (isBarrierClass(name)) {
263+
// For barrier classes, we construct a new copy of the class.
264+
val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName))
265+
logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}")
266+
defineClass(name, bytes, 0, bytes.length)
267+
} else if (!isSharedClass(name)) {
268+
logDebug(s"hive class: $name - ${getResource(classToPath(name))}")
269+
super.loadClass(name, resolve)
270+
} else {
271+
// For shared classes, we delegate to baseClassLoader, but fall back in case the
272+
// class is not found.
273+
logDebug(s"shared class: $name")
274+
try {
275+
baseClassLoader.loadClass(name)
276+
} catch {
277+
case _: ClassNotFoundException =>
278+
super.loadClass(name, resolve)
279+
}
275280
}
276281
}
277282
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala

Lines changed: 2 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,15 @@
1717

1818
package org.apache.spark.sql.hive
1919

20-
import java.io.File
21-
import java.net.URI
22-
2320
import org.apache.hadoop.conf.Configuration
2421
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
2522

26-
import org.apache.spark.{SparkConf, TestUtils}
23+
import org.apache.spark.SparkConf
2724
import org.apache.spark.deploy.SparkHadoopUtil
2825
import org.apache.spark.sql.QueryTest
29-
import org.apache.spark.sql.catalyst.catalog.CatalogDatabase
3026
import org.apache.spark.sql.hive.test.TestHiveSingleton
3127
import org.apache.spark.sql.test.SQLTestUtils
32-
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader}
28+
import org.apache.spark.util.ChildFirstURLClassLoader
3329

3430
class HiveUtilsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
3531

@@ -81,32 +77,6 @@ class HiveUtilsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton
8177
}
8278
}
8379

84-
test("SPARK-42539: User-provided JARs should not take precedence over builtin Hive JARs") {
85-
withTempDir { tmpDir =>
86-
val classFile = TestUtils.createCompiledClass(
87-
"Hive", tmpDir, packageName = Some("org.apache.hadoop.hive.ql.metadata"))
88-
89-
val jarFile = new File(tmpDir, "hive-fake.jar")
90-
TestUtils.createJar(Seq(classFile), jarFile, Some("org/apache/hadoop/hive/ql/metadata"))
91-
92-
val conf = new SparkConf
93-
val contextClassLoader = Thread.currentThread().getContextClassLoader
94-
val loader = new MutableURLClassLoader(Array(jarFile.toURI.toURL), contextClassLoader)
95-
try {
96-
Thread.currentThread().setContextClassLoader(loader)
97-
val client = HiveUtils.newClientForMetadata(
98-
conf,
99-
SparkHadoopUtil.newConfiguration(conf),
100-
HiveUtils.newTemporaryConfiguration(useInMemoryDerby = true))
101-
client.createDatabase(
102-
CatalogDatabase("foo", "", URI.create(s"file://${tmpDir.getAbsolutePath}/foo.db"), Map()),
103-
ignoreIfExists = true)
104-
} finally {
105-
Thread.currentThread().setContextClassLoader(contextClassLoader)
106-
}
107-
}
108-
}
109-
11080
test("SPARK-27349: Dealing with TimeVars removed in Hive 2.x") {
11181
// Test default value
11282
val defaultConf = new Configuration

0 commit comments

Comments
 (0)