Skip to content

Commit 2245c87

Browse files
darabospwendell
authored andcommitted
Use the Executor's ClassLoader in sc.objectFile().
This makes it possible to read classes from the object file which were specified in the user-provided jars. (By default ObjectInputStream uses latestUserDefinedLoader, which may or may not be the right one.) I created this because I ran into the following problem. I have x:RDD[X] with X being defined in the jar that I provide to SparkContext. I save it with x.saveAsObjectFile("x"). I try to load it with sc.objectFile\[X\]("x"). It fails with ClassNotFoundException. After a good while of debugging I figured out that Utils.deserialize() most likely uses the ClassLoader of Utils. This is the bootstrap ClassLoader, so it is not aware of the dynamically added jars. This patch fixes the issue. A more robust fix would be to always default to Thread.currentThread.getContextClassLoader. This would prevent this problem from biting anyone in the future. It would be a bit harder to test though. On the topic of testing, if you'd like to see tests for this, I will need some hand-holding. Thanks! Author: Daniel Darabos <[email protected]> Closes apache#181 from darabos/master and squashes the following commits: 45a011a [Daniel Darabos] Add test for SPARK-1877. (Fixed in 52eb54d.) e13e090 [Daniel Darabos] Merge branch 'master' of https://github.com/apache/spark 61fe0d0 [Daniel Darabos] Fix style (line too long). 1b5df2c [Daniel Darabos] Use the Executor's ClassLoader in sc.objectFile(). This makes it possible to read classes from the object file which were specified in the user-provided jars. (By default ObjectInputStream uses latestUserDefinedLoader, which may or may not be the right one.)
1 parent d38887b commit 2245c87

File tree

2 files changed

+27
-2
lines changed

2 files changed

+27
-2
lines changed

core/src/main/scala/org/apache/spark/TestUtils.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,8 @@ private[spark] object TestUtils {
9292
def createCompiledClass(className: String, destDir: File, value: String = ""): File = {
9393
val compiler = ToolProvider.getSystemJavaCompiler
9494
val sourceFile = new JavaSourceFromString(className,
95-
"public class " + className + " { @Override public String toString() { " +
96-
"return \"" + value + "\";}}")
95+
"public class " + className + " implements java.io.Serializable {" +
96+
" @Override public String toString() { return \"" + value + "\"; }}")
9797

9898
// Calling this outputs a class file in pwd. It's easier to just rename the file than
9999
// build a custom FileManager that controls the output location.

core/src/test/scala/org/apache/spark/FileSuite.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,31 @@ class FileSuite extends FunSuite with LocalSparkContext {
177177
assert(output.collect().toList === List((1, "a"), (2, "aa"), (3, "aaa")))
178178
}
179179

180+
test("object files of classes from a JAR") {
181+
val original = Thread.currentThread().getContextClassLoader
182+
val className = "FileSuiteObjectFileTest"
183+
val jar = TestUtils.createJarWithClasses(Seq(className))
184+
val loader = new java.net.URLClassLoader(Array(jar), Utils.getContextOrSparkClassLoader)
185+
Thread.currentThread().setContextClassLoader(loader)
186+
try {
187+
sc = new SparkContext("local", "test")
188+
val objs = sc.makeRDD(1 to 3).map { x =>
189+
val loader = Thread.currentThread().getContextClassLoader
190+
Class.forName(className, true, loader).newInstance()
191+
}
192+
val outputDir = new File(tempDir, "output").getAbsolutePath
193+
objs.saveAsObjectFile(outputDir)
194+
// Try reading the output back as an object file
195+
val ct = reflect.ClassTag[Any](Class.forName(className, true, loader))
196+
val output = sc.objectFile[Any](outputDir)
197+
assert(output.collect().size === 3)
198+
assert(output.collect().head.getClass.getName === className)
199+
}
200+
finally {
201+
Thread.currentThread().setContextClassLoader(original)
202+
}
203+
}
204+
180205
test("write SequenceFile using new Hadoop API") {
181206
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
182207
sc = new SparkContext("local", "test")

0 commit comments

Comments
 (0)