Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ private[hive] class ClientWrapper(
case hive.v14 => new Shim_v0_14()
}

// Create an internal session state for this ClientWrapper.
val state = {
val original = Thread.currentThread().getContextClassLoader
Thread.currentThread().setContextClassLoader(getClass.getClassLoader)
Expand Down Expand Up @@ -131,8 +132,15 @@ private[hive] class ClientWrapper(
*/
private def withHiveState[A](f: => A): A = synchronized {
val original = Thread.currentThread().getContextClassLoader
// This setContextClassLoader is used for Hive 0.12's metastore since Hive 0.12 will not
// internally override the context class loader of the current thread with the class loader
// associated with the HiveConf in `state`.
Thread.currentThread().setContextClassLoader(getClass.getClassLoader)
// Set the thread local metastore client to the client associated with this ClientWrapper.
Hive.set(client)
// Starting from Hive 0.13.0, setCurrentSessionState will use the classLoader associated
// with the HiveConf in `state` to override the context class loader of the current
// thread.
shim.setCurrentSessionState(state)
val ret = try f finally {
Thread.currentThread().setContextClassLoader(original)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,15 @@ case class AddJar(path: String) extends RunnableCommand {
val jarURL = new java.io.File(path).toURL
val newClassLoader = new java.net.URLClassLoader(Array(jarURL), currentClassLoader)
Thread.currentThread.setContextClassLoader(newClassLoader)
org.apache.hadoop.hive.ql.metadata.Hive.get().getConf().setClassLoader(newClassLoader)

// Add jar to isolated hive classloader
// We need to explicitly set the class loader associated with the conf in executionHive's
// state because this class loader will be used as the context class loader of the current
// thread to execute any Hive command.
// We cannot use `org.apache.hadoop.hive.ql.metadata.Hive.get().getConf()` because Hive.get()
// returns the value of a thread local variable and its HiveConf may not be the HiveConf
// associated with `executionHive.state` (for example, HiveContext is created in one thread
// and then add jar is called from another thread).
hiveContext.executionHive.state.getConf.setClassLoader(newClassLoader)
// Add jar to isolated hive (metadataHive) class loader.
hiveContext.runSqlHive(s"ADD JAR $path")

// Add jar to executors
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -934,4 +934,32 @@ class SQLQuerySuite extends QueryTest {
sql("set hive.exec.dynamic.partition.mode=strict")
}
}

test("Call add jar in a different thread (SPARK-8306)") {
@volatile var error: Option[Throwable] = None
val thread = new Thread {
override def run() {
// To make sure this test works, this jar should not be loaded in another place.
TestHive.sql(
s"ADD JAR ${TestHive.getHiveFile("hive-contrib-0.13.1.jar").getCanonicalPath()}")
try {
TestHive.sql(
"""
|CREATE TEMPORARY FUNCTION example_max
|AS 'org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleMax'
""".stripMargin)
} catch {
case throwable: Throwable =>
error = Some(throwable)
}
}
}
thread.start()
thread.join()
error match {
case Some(throwable) =>
fail("CREATE TEMPORARY FUNCTION should not fail.", throwable)
case None => // OK
}
}
}