From 1b5df2cb5a69e357b1eb0b19cc05f4b1aedb5701 Mon Sep 17 00:00:00 2001 From: Daniel Darabos Date: Wed, 19 Mar 2014 19:52:12 +0100 Subject: [PATCH 1/3] Use the Executor's ClassLoader in sc.objectFile(). This makes it possible to read classes from the object file which were specified in the user-provided jars. (By default ObjectInputStream uses latestUserDefinedLoader, which may or may not be the right one.) --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 852ed8fe1fb91..f56c2ac489e61 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -568,7 +568,7 @@ class SparkContext( minSplits: Int = defaultMinSplits ): RDD[T] = { sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minSplits) - .flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes)) + .flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes, Thread.currentThread.getContextClassLoader)) } From 61fe0d09d6a7873647820565065e066e544d55c9 Mon Sep 17 00:00:00 2001 From: Daniel Darabos Date: Thu, 20 Mar 2014 12:21:22 +0100 Subject: [PATCH 2/3] Fix style (line too long). --- core/src/main/scala/org/apache/spark/SparkContext.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f56c2ac489e61..bc0a71c6722f8 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -568,7 +568,8 @@ class SparkContext( minSplits: Int = defaultMinSplits ): RDD[T] = { sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minSplits) - .flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes, Thread.currentThread.getContextClassLoader)) + .flatMap(x => Utils.deserialize[Array[T]]( + x._2.getBytes, Thread.currentThread.getContextClassLoader)) } From 45a011a4d78ab1368ee39b1ef7f196a81c578162 Mon Sep 17 00:00:00 2001 From: Daniel Darabos Date: Tue, 1 Jul 2014 14:02:44 +0200 Subject: [PATCH 3/3] Add test for SPARK-1877. (Fixed in 52eb54d.) --- .../scala/org/apache/spark/TestUtils.scala | 4 +-- .../scala/org/apache/spark/FileSuite.scala | 25 +++++++++++++++++++ 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index 885c6829a2d72..8ca731038e528 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -92,8 +92,8 @@ private[spark] object TestUtils { def createCompiledClass(className: String, destDir: File, value: String = ""): File = { val compiler = ToolProvider.getSystemJavaCompiler val sourceFile = new JavaSourceFromString(className, - "public class " + className + " { @Override public String toString() { " + - "return \"" + value + "\";}}") + "public class " + className + " implements java.io.Serializable {" + + " @Override public String toString() { return \"" + value + "\"; }}") // Calling this outputs a class file in pwd. It's easier to just rename the file than // build a custom FileManager that controls the output location. diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 070e974657860..c70e22cf09433 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -177,6 +177,31 @@ class FileSuite extends FunSuite with LocalSparkContext { assert(output.collect().toList === List((1, "a"), (2, "aa"), (3, "aaa"))) } + test("object files of classes from a JAR") { + val original = Thread.currentThread().getContextClassLoader + val className = "FileSuiteObjectFileTest" + val jar = TestUtils.createJarWithClasses(Seq(className)) + val loader = new java.net.URLClassLoader(Array(jar), Utils.getContextOrSparkClassLoader) + Thread.currentThread().setContextClassLoader(loader) + try { + sc = new SparkContext("local", "test") + val objs = sc.makeRDD(1 to 3).map { x => + val loader = Thread.currentThread().getContextClassLoader + Class.forName(className, true, loader).newInstance() + } + val outputDir = new File(tempDir, "output").getAbsolutePath + objs.saveAsObjectFile(outputDir) + // Try reading the output back as an object file + val ct = reflect.ClassTag[Any](Class.forName(className, true, loader)) + val output = sc.objectFile[Any](outputDir) + assert(output.collect().size === 3) + assert(output.collect().head.getClass.getName === className) + } + finally { + Thread.currentThread().setContextClassLoader(original) + } + } + test("write SequenceFile using new Hadoop API") { import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat sc = new SparkContext("local", "test")