Skip to content

Commit b834446

Browse files
Marcelo Vanzinsquito
authored andcommitted
[SPARK-23209][core] Allow credential manager to work when Hive not available.
The JVM seems to be doing early binding of classes that the Hive provider depends on, causing an error to be thrown before it was caught by the code in the class. The fix wraps the creation of the provider in a try..catch so that the provider can be ignored when dependencies are missing. Added a unit test (which fails without the fix), and also tested that getting tokens still works in a real cluster. Author: Marcelo Vanzin <[email protected]> Closes #20399 from vanzin/SPARK-23209.
1 parent e30b34f commit b834446

File tree

2 files changed

+72
-3
lines changed

2 files changed

+72
-3
lines changed

core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,9 @@ private[spark] class HadoopDelegationTokenManager(
6464
}
6565

6666
private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = {
67-
val providers = List(new HadoopFSDelegationTokenProvider(fileSystems),
68-
new HiveDelegationTokenProvider,
69-
new HBaseDelegationTokenProvider)
67+
val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) ++
68+
safeCreateProvider(new HiveDelegationTokenProvider) ++
69+
safeCreateProvider(new HBaseDelegationTokenProvider)
7070

7171
// Filter out providers for which spark.security.credentials.{service}.enabled is false.
7272
providers
@@ -75,6 +75,17 @@ private[spark] class HadoopDelegationTokenManager(
7575
.toMap
7676
}
7777

78+
private def safeCreateProvider(
79+
createFn: => HadoopDelegationTokenProvider): Option[HadoopDelegationTokenProvider] = {
80+
try {
81+
Some(createFn)
82+
} catch {
83+
case t: Throwable =>
84+
logDebug(s"Failed to load built in provider.", t)
85+
None
86+
}
87+
}
88+
7889
def isServiceEnabled(serviceName: String): Boolean = {
7990
val key = providerEnabledConfig.format(serviceName)
8091

core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.deploy.security
1919

20+
import org.apache.commons.io.IOUtils
2021
import org.apache.hadoop.conf.Configuration
2122
import org.apache.hadoop.fs.FileSystem
2223
import org.apache.hadoop.security.Credentials
@@ -110,7 +111,64 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers {
110111
creds.getAllTokens.size should be (0)
111112
}
112113

114+
test("SPARK-23209: obtain tokens when Hive classes are not available") {
115+
// This test needs a custom class loader to hide Hive classes which are in the classpath.
116+
// Because the manager code loads the Hive provider directly instead of using reflection, we
117+
// need to drive the test through the custom class loader so a new copy that cannot find
118+
// Hive classes is loaded.
119+
val currentLoader = Thread.currentThread().getContextClassLoader()
120+
val noHive = new ClassLoader() {
121+
override def loadClass(name: String, resolve: Boolean): Class[_] = {
122+
if (name.startsWith("org.apache.hive") || name.startsWith("org.apache.hadoop.hive")) {
123+
throw new ClassNotFoundException(name)
124+
}
125+
126+
if (name.startsWith("java") || name.startsWith("scala")) {
127+
currentLoader.loadClass(name)
128+
} else {
129+
val classFileName = name.replaceAll("\\.", "/") + ".class"
130+
val in = currentLoader.getResourceAsStream(classFileName)
131+
if (in != null) {
132+
val bytes = IOUtils.toByteArray(in)
133+
defineClass(name, bytes, 0, bytes.length)
134+
} else {
135+
throw new ClassNotFoundException(name)
136+
}
137+
}
138+
}
139+
}
140+
141+
try {
142+
Thread.currentThread().setContextClassLoader(noHive)
143+
val test = noHive.loadClass(NoHiveTest.getClass.getName().stripSuffix("$"))
144+
test.getMethod("runTest").invoke(null)
145+
} finally {
146+
Thread.currentThread().setContextClassLoader(currentLoader)
147+
}
148+
}
149+
113150
private[spark] def hadoopFSsToAccess(hadoopConf: Configuration): Set[FileSystem] = {
114151
Set(FileSystem.get(hadoopConf))
115152
}
116153
}
154+
155+
/** Test code for SPARK-23209 to avoid using too much reflection above. */
156+
private object NoHiveTest extends Matchers {
157+
158+
def runTest(): Unit = {
159+
try {
160+
val manager = new HadoopDelegationTokenManager(new SparkConf(), new Configuration(),
161+
_ => Set())
162+
manager.getServiceDelegationTokenProvider("hive") should be (None)
163+
} catch {
164+
case e: Throwable =>
165+
// Throw a better exception in case the test fails, since there may be a lot of nesting.
166+
var cause = e
167+
while (cause.getCause() != null) {
168+
cause = cause.getCause()
169+
}
170+
throw cause
171+
}
172+
}
173+
174+
}

0 commit comments

Comments
 (0)