From 2082de8bbffaba841bfa98f912ccfb6bbebb3670 Mon Sep 17 00:00:00 2001 From: Evan Chan Date: Fri, 13 Dec 2013 09:22:56 -0800 Subject: [PATCH 01/14] Print out the class that was not found --- .../scala/org/apache/spark/scheduler/TaskResultGetter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala index cb4ad4ae9350c..75f3d789eb42f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala @@ -69,7 +69,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul } catch { case cnf: ClassNotFoundException => val loader = Thread.currentThread.getContextClassLoader - taskSetManager.abort("ClassNotFound with classloader: " + loader) + taskSetManager.abort("ClassNotFound [" + cnf.getMessage + "] with classloader: " + loader) case ex: Throwable => taskSetManager.abort("Exception while deserializing and fetching task: %s".format(ex)) } From 55a91ce31d2f8b0c68c5721fb8dfe815f355c713 Mon Sep 17 00:00:00 2001 From: Evan Chan Date: Fri, 13 Dec 2013 15:40:14 -0800 Subject: [PATCH 02/14] Fix bug where classes in jars added via addJar() are not available to the driver Adds a driver-specific class loader that knows about the jars added via addJar(), and initialize it first thing in SparkEnv, so that all other ThreadPool/ActorSystem initialization will take advantage of the classloader, and user-added jars are made available to all Spark subsystems. --- .../scala/org/apache/spark/SparkContext.scala | 23 ++++++++++++++++++- .../scala/org/apache/spark/SparkEnv.scala | 16 +++++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index b23accbbb9410..4e63d05408bbd 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -18,7 +18,7 @@ package org.apache.spark import java.io._ -import java.net.URI +import java.net.{URI, URL} import java.util.{Properties, UUID} import java.util.concurrent.atomic.AtomicInteger @@ -764,6 +764,7 @@ class SparkContext( /** * Adds a JAR dependency for all tasks to be executed on this SparkContext in the future. + * This also makes the JAR available to this driver process. * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported * filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node. */ @@ -807,6 +808,19 @@ class SparkContext( case _ => path } + + // Add jar to driver class loader so it is available for driver, even if it is not on the classpath + uri.getScheme match { + case null | "file" | "local" => + // Assume file exists on current (driver) node as well. Unlike executors, driver doesn't need to + // download the jar since it's local. + addUrlToDriverLoader(new URL("file:" + uri.getPath)) + case "http" | "https" | "ftp" => + // Should be handled by the URLClassLoader, pass along entire URL + addUrlToDriverLoader(new URL(path)) + case other => + logWarning("This URI scheme for URI " + path + " is not supported by the driver class loader") + } } if (key != null) { addedJars(key) = System.currentTimeMillis @@ -816,6 +830,13 @@ class SparkContext( postEnvironmentUpdate() } + private def addUrlToDriverLoader(url: URL) { + if (!env.classLoader.getURLs.contains(url)) { + logInfo("Adding JAR " + url + " to driver class loader") + env.classLoader.addURL(url) + } + } + /** * Clear the job's list of JARs added by `addJar` so that they do not get downloaded to * any new nodes. diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 5ceac28fe7afb..b86a45e0b9c73 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -17,6 +17,7 @@ package org.apache.spark +import java.net.URL import scala.collection.JavaConversions._ import scala.collection.mutable import scala.concurrent.Await @@ -27,6 +28,7 @@ import com.google.common.collect.MapMaker import org.apache.spark.api.python.PythonWorkerFactory import org.apache.spark.broadcast.BroadcastManager +import org.apache.spark.executor.ExecutorURLClassLoader import org.apache.spark.metrics.MetricsSystem import org.apache.spark.network.ConnectionManager import org.apache.spark.scheduler.LiveListenerBus @@ -56,6 +58,7 @@ class SparkEnv private[spark] ( val httpFileServer: HttpFileServer, val sparkFilesDir: String, val metricsSystem: MetricsSystem, + val classLoader: ExecutorURLClassLoader, val conf: SparkConf) extends Logging { // A mapping of thread ID to amount of memory used for shuffle in bytes @@ -133,6 +136,12 @@ object SparkEnv extends Logging { val securityManager = new SecurityManager(conf) + // Create a classLoader for use by the driver so that jars added via addJar are available to the driver + // Do this before all other initialization so that any thread pools created for this SparkContext + // uses the class loader + val driverLoader = getDriverClassLoader() + Thread.currentThread.setContextClassLoader(driverLoader) + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port, conf = conf, securityManager = securityManager) @@ -248,9 +257,16 @@ object SparkEnv extends Logging { httpFileServer, sparkFilesDir, metricsSystem, + driverLoader, conf) } + private def getDriverClassLoader(): ExecutorURLClassLoader = { + // Initially there are no jars + val parentLoader = this.getClass.getClassLoader + new ExecutorURLClassLoader(Array.empty[URL], parentLoader) + } + /** * Return a map representation of jvm information, Spark properties, system properties, and * class paths. Map keys define the category, and map values represent the corresponding From 8c8b15b3e0de8669b14c4eb9e12c797e4abbc7b7 Mon Sep 17 00:00:00 2001 From: Evan Chan Date: Sun, 29 Dec 2013 00:15:17 -0800 Subject: [PATCH 03/14] Move classLoader initialization to SparkContext SparkEnv is used by Executor as well; we want this change to affect driver only. --- .../scala/org/apache/spark/SparkContext.scala | 11 +++++++++-- .../main/scala/org/apache/spark/SparkEnv.scala | 15 --------------- 2 files changed, 9 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 4e63d05408bbd..83a51e8bd4f20 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -37,6 +37,7 @@ import org.apache.mesos.MesosNativeLibrary import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} +import org.apache.spark.executor.ExecutorURLClassLoader import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} import org.apache.spark.rdd._ import org.apache.spark.scheduler._ @@ -136,6 +137,12 @@ class SparkContext( // An asynchronous listener bus for Spark events private[spark] val listenerBus = new LiveListenerBus + // Create a classLoader for use by the driver so that jars added via addJar are available to the driver + // Do this before all other initialization so that any thread pools created for this SparkContext + // uses the class loader + private[spark] val classLoader = new ExecutorURLClassLoader(Array.empty[URL], this.getClass.getClassLoader) + Thread.currentThread.setContextClassLoader(classLoader) + // Create the Spark execution environment (cache, map output tracker, etc) private[spark] val env = SparkEnv.create( conf, @@ -831,9 +838,9 @@ class SparkContext( } private def addUrlToDriverLoader(url: URL) { - if (!env.classLoader.getURLs.contains(url)) { + if (!classLoader.getURLs.contains(url)) { logInfo("Adding JAR " + url + " to driver class loader") - env.classLoader.addURL(url) + classLoader.addURL(url) } } diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index b86a45e0b9c73..d5343c28f5356 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -28,7 +28,6 @@ import com.google.common.collect.MapMaker import org.apache.spark.api.python.PythonWorkerFactory import org.apache.spark.broadcast.BroadcastManager -import org.apache.spark.executor.ExecutorURLClassLoader import org.apache.spark.metrics.MetricsSystem import org.apache.spark.network.ConnectionManager import org.apache.spark.scheduler.LiveListenerBus @@ -58,7 +57,6 @@ class SparkEnv private[spark] ( val httpFileServer: HttpFileServer, val sparkFilesDir: String, val metricsSystem: MetricsSystem, - val classLoader: ExecutorURLClassLoader, val conf: SparkConf) extends Logging { // A mapping of thread ID to amount of memory used for shuffle in bytes @@ -136,12 +134,6 @@ object SparkEnv extends Logging { val securityManager = new SecurityManager(conf) - // Create a classLoader for use by the driver so that jars added via addJar are available to the driver - // Do this before all other initialization so that any thread pools created for this SparkContext - // uses the class loader - val driverLoader = getDriverClassLoader() - Thread.currentThread.setContextClassLoader(driverLoader) - val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port, conf = conf, securityManager = securityManager) @@ -257,16 +249,9 @@ object SparkEnv extends Logging { httpFileServer, sparkFilesDir, metricsSystem, - driverLoader, conf) } - private def getDriverClassLoader(): ExecutorURLClassLoader = { - // Initially there are no jars - val parentLoader = this.getClass.getClassLoader - new ExecutorURLClassLoader(Array.empty[URL], parentLoader) - } - /** * Return a map representation of jvm information, Spark properties, system properties, and * class paths. Map keys define the category, and map values represent the corresponding From da83523331ba1439bb7819ed2a7f4ea78e8e5bc9 Mon Sep 17 00:00:00 2001 From: Evan Chan Date: Sun, 29 Dec 2013 00:25:53 -0800 Subject: [PATCH 04/14] Pass the current class loader when creating Akka threadpool --- core/src/main/scala/org/apache/spark/SparkEnv.scala | 5 ++--- core/src/main/scala/org/apache/spark/util/AkkaUtils.scala | 7 ++++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index d5343c28f5356..e915f56e813c2 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -133,9 +133,10 @@ object SparkEnv extends Logging { } val securityManager = new SecurityManager(conf) + val classLoader = Thread.currentThread.getContextClassLoader val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port, conf = conf, - securityManager = securityManager) + securityManager = securityManager, classLoader = classLoader) // Bit of a hack: If this is the driver and our port was 0 (meaning bind to any free port), // figure out which port number Akka actually bound to and set spark.driver.port to it. @@ -143,8 +144,6 @@ object SparkEnv extends Logging { conf.set("spark.driver.port", boundPort.toString) } - val classLoader = Thread.currentThread.getContextClassLoader - // Create an instance of the class named by the given Java system property, or by // defaultClassName if the property is not set, and return it as a T def instantiateClass[T](propertyName: String, defaultClassName: String): T = { diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index d0ff17db632c1..f2a323de07cde 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -42,7 +42,8 @@ private[spark] object AkkaUtils extends Logging { * of a fatal exception. This is used by [[org.apache.spark.executor.Executor]]. */ def createActorSystem(name: String, host: String, port: Int, indestructible: Boolean = false, - conf: SparkConf, securityManager: SecurityManager): (ActorSystem, Int) = { + conf: SparkConf, securityManager: SecurityManager, + classLoader: ClassLoader = this.getClass.getClassLoader): (ActorSystem, Int) = { val akkaThreads = conf.getInt("spark.akka.threads", 4) val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15) @@ -102,9 +103,9 @@ private[spark] object AkkaUtils extends Logging { """.stripMargin)) val actorSystem = if (indestructible) { - IndestructibleActorSystem(name, akkaConf) + IndestructibleActorSystem(name, akkaConf, classLoader) } else { - ActorSystem(name, akkaConf) + ActorSystem(name, akkaConf, classLoader) } val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider From 2b9a61a8f4d5705d63eba043a04ae2c9ff5f4e4e Mon Sep 17 00:00:00 2001 From: Evan Chan Date: Sun, 12 Jan 2014 07:59:24 -0800 Subject: [PATCH 05/14] CR from pwendell --- core/src/main/scala/org/apache/spark/SparkContext.scala | 3 ++- core/src/main/scala/org/apache/spark/util/AkkaUtils.scala | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 83a51e8bd4f20..7b79e57cc79f3 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -140,7 +140,8 @@ class SparkContext( // Create a classLoader for use by the driver so that jars added via addJar are available to the driver // Do this before all other initialization so that any thread pools created for this SparkContext // uses the class loader - private[spark] val classLoader = new ExecutorURLClassLoader(Array.empty[URL], this.getClass.getClassLoader) + private[spark] val classLoader = new ExecutorURLClassLoader(Array.empty[URL], + this.getClass.getClassLoader) Thread.currentThread.setContextClassLoader(classLoader) // Create the Spark execution environment (cache, map output tracker, etc) diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index f2a323de07cde..46eb98bc9e3ed 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -42,8 +42,9 @@ private[spark] object AkkaUtils extends Logging { * of a fatal exception. This is used by [[org.apache.spark.executor.Executor]]. */ def createActorSystem(name: String, host: String, port: Int, indestructible: Boolean = false, - conf: SparkConf, securityManager: SecurityManager, - classLoader: ClassLoader = this.getClass.getClassLoader): (ActorSystem, Int) = { + conf: SparkConf, securityManager: SecurityManager, + classLoader: ClassLoader = Thread.currentThread.getContextClassLoader) + : (ActorSystem, Int) = { val akkaThreads = conf.getInt("spark.akka.threads", 4) val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15) From 9d62e3dd10976508502efc0e53d4f11e982af89b Mon Sep 17 00:00:00 2001 From: Evan Chan Date: Thu, 16 Jan 2014 22:19:27 -0800 Subject: [PATCH 06/14] CR: Rename ExecutorURLClassLoader -> SparkURLClassLoader --- core/src/main/scala/org/apache/spark/SparkContext.scala | 6 +++--- .../main/scala/org/apache/spark/executor/Executor.scala | 8 ++++---- .../SparkURLClassLoader.scala} | 4 ++-- 3 files changed, 9 insertions(+), 9 deletions(-) rename core/src/main/scala/org/apache/spark/{executor/ExecutorURLClassLoader.scala => util/SparkURLClassLoader.scala} (89%) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 7b79e57cc79f3..6c00b50d9b41d 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -37,7 +37,6 @@ import org.apache.mesos.MesosNativeLibrary import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} -import org.apache.spark.executor.ExecutorURLClassLoader import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} import org.apache.spark.rdd._ import org.apache.spark.scheduler._ @@ -46,7 +45,8 @@ import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, Me import org.apache.spark.scheduler.local.LocalBackend import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils} import org.apache.spark.ui.SparkUI -import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils} +import org.apache.spark.util.{Utils, TimeStampedHashMap, MetadataCleaner, MetadataCleanerType, + ClosureCleaner, SparkURLClassLoader} /** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark @@ -140,7 +140,7 @@ class SparkContext( // Create a classLoader for use by the driver so that jars added via addJar are available to the driver // Do this before all other initialization so that any thread pools created for this SparkContext // uses the class loader - private[spark] val classLoader = new ExecutorURLClassLoader(Array.empty[URL], + private[spark] val classLoader = new SparkURLClassLoader(Array.empty[URL], this.getClass.getClassLoader) Thread.currentThread.setContextClassLoader(classLoader) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index aecb069e4202b..9f61c406a0454 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -29,7 +29,7 @@ import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler._ import org.apache.spark.storage.{StorageLevel, TaskResultBlockId} -import org.apache.spark.util.{AkkaUtils, Utils} +import org.apache.spark.util.{AkkaUtils, Utils, SparkURLClassLoader} /** * Spark executor used with Mesos, YARN, and the standalone scheduler. @@ -291,15 +291,15 @@ private[spark] class Executor( * Create a ClassLoader for use in tasks, adding any JARs specified by the user or any classes * created by the interpreter to the search path */ - private def createClassLoader(): ExecutorURLClassLoader = { - val loader = Thread.currentThread().getContextClassLoader + private def createClassLoader(): SparkURLClassLoader = { + val loader = this.getClass.getClassLoader // For each of the jars in the jarSet, add them to the class loader. // We assume each of the files has already been fetched. val urls = currentJars.keySet.map { uri => new File(uri.split("/").last).toURI.toURL }.toArray - new ExecutorURLClassLoader(urls, loader) + new SparkURLClassLoader(urls, loader) } /** diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala b/core/src/main/scala/org/apache/spark/util/SparkURLClassLoader.scala similarity index 89% rename from core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala rename to core/src/main/scala/org/apache/spark/util/SparkURLClassLoader.scala index f9bfe8ed2f5ba..19134aca496f4 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala +++ b/core/src/main/scala/org/apache/spark/util/SparkURLClassLoader.scala @@ -15,14 +15,14 @@ * limitations under the License. */ -package org.apache.spark.executor +package org.apache.spark.util import java.net.{URLClassLoader, URL} /** * The addURL method in URLClassLoader is protected. We subclass it to make this accessible. */ -private[spark] class ExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader) +private[spark] class SparkURLClassLoader(urls: Array[URL], parent: ClassLoader) extends URLClassLoader(urls, parent) { override def addURL(url: URL) { From bb29207238a3955002fdfa63f59f9963d2e78a61 Mon Sep 17 00:00:00 2001 From: Evan Chan Date: Thu, 16 Jan 2014 22:34:34 -0800 Subject: [PATCH 07/14] Use SparkURLClassLoader for driver only if spark.driver.add-dynamic-jars is set --- .../scala/org/apache/spark/SparkContext.scala | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 6c00b50d9b41d..1a9f67b4dcf8c 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -137,12 +137,14 @@ class SparkContext( // An asynchronous listener bus for Spark events private[spark] val listenerBus = new LiveListenerBus - // Create a classLoader for use by the driver so that jars added via addJar are available to the driver - // Do this before all other initialization so that any thread pools created for this SparkContext - // uses the class loader - private[spark] val classLoader = new SparkURLClassLoader(Array.empty[URL], - this.getClass.getClassLoader) - Thread.currentThread.setContextClassLoader(classLoader) + // Create a classLoader for use by the driver so that jars added via addJar are available to the + // driver. Do this before all other initialization so that any thread pools created for this + // SparkContext uses the class loader. + private[spark] val classLoader = if (conf.getBoolean("spark.driver.add-dynamic-jars", false)) { + val loader = new SparkURLClassLoader(Array.empty[URL], this.getClass.getClassLoader) + Thread.currentThread.setContextClassLoader(loader) + Some(loader) + } else None // Create the Spark execution environment (cache, map output tracker, etc) private[spark] val env = SparkEnv.create( @@ -839,9 +841,11 @@ class SparkContext( } private def addUrlToDriverLoader(url: URL) { - if (!classLoader.getURLs.contains(url)) { - logInfo("Adding JAR " + url + " to driver class loader") - classLoader.addURL(url) + classLoader.foreach { loader => + if (!loader.getURLs.contains(url)) { + logInfo("Adding JAR " + url + " to driver class loader") + loader.addURL(url) + } } } From 77e865e6a2a615de322f009fb77b92efa20824e7 Mon Sep 17 00:00:00 2001 From: Evan Chan Date: Thu, 16 Jan 2014 23:54:58 -0800 Subject: [PATCH 08/14] Document new config option and effect on addJar --- .../main/scala/org/apache/spark/SparkContext.scala | 4 +++- docs/cluster-overview.md | 3 ++- docs/configuration.md | 14 ++++++++++++-- 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 1a9f67b4dcf8c..ce4a1a3cd43bb 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -140,6 +140,7 @@ class SparkContext( // Create a classLoader for use by the driver so that jars added via addJar are available to the // driver. Do this before all other initialization so that any thread pools created for this // SparkContext uses the class loader. + // Note that this is config-enabled as classloaders can introduce subtle side effects private[spark] val classLoader = if (conf.getBoolean("spark.driver.add-dynamic-jars", false)) { val loader = new SparkURLClassLoader(Array.empty[URL], this.getClass.getClassLoader) Thread.currentThread.setContextClassLoader(loader) @@ -774,9 +775,10 @@ class SparkContext( /** * Adds a JAR dependency for all tasks to be executed on this SparkContext in the future. - * This also makes the JAR available to this driver process. * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported * filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node. + * NOTE: If you enable spark.driver.add-dynamic-jars, then the JAR will also be made available + * to this SparkContext. local: JARs must be available on the driver node. */ def addJar(path: String) { if (path == null) { diff --git a/docs/cluster-overview.md b/docs/cluster-overview.md index b69e3416fb322..d3c058e0d4d3f 100644 --- a/docs/cluster-overview.md +++ b/docs/cluster-overview.md @@ -108,7 +108,8 @@ and `addFile`. - **hdfs:**, **http:**, **https:**, **ftp:** - these pull down files and JARs from the URI as expected - **local:** - a URI starting with local:/ is expected to exist as a local file on each worker node. This means that no network IO will be incurred, and works well for large files/JARs that are pushed to each worker, - or shared via NFS, GlusterFS, etc. + or shared via NFS, GlusterFS, etc. Note that if `spark.driver.add-dynamic-jars` is set, then the file + must be visible to the node running the SparkContext as well. Note that JARs and files are copied to the working directory for each SparkContext on the executor nodes. Over time this can use up a significant amount of space and will need to be cleaned up. diff --git a/docs/configuration.md b/docs/configuration.md index 1ff0150567255..69a28f2a3b40d 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -111,6 +111,7 @@ Apart from these, the following properties are also available, and may be useful it if you configure your own old generation size. + spark.shuffle.memoryFraction 0.3 @@ -375,7 +376,7 @@ Apart from these, the following properties are also available, and may be useful spark.akka.heartbeat.interval 1000 - This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). A larger interval value in seconds reduces network overhead and a smaller value ( ~ 1 s) might be more informative for akka's failure detector. Tune this in combination of `spark.akka.heartbeat.pauses` and `spark.akka.failure-detector.threshold` if you need to. Only positive use case for using failure detector can be, a sensistive failure detector can help evict rogue executors really quick. However this is usually not the case as gc pauses and network lags are expected in a real spark cluster. Apart from that enabling this leads to a lot of exchanges of heart beats between nodes leading to flooding the network with those. + This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). A larger interval value in seconds reduces network overhead and a smaller value ( ~ 1 s) might be more informative for akka's failure detector. Tune this in combination of `spark.akka.heartbeat.pauses` and `spark.akka.failure-detector.threshold` if you need to. Only positive use case for using failure detector can be, a sensistive failure detector can help evict rogue executors really quick. However this is usually not the case as gc pauses and network lags are expected in a real spark cluster. Apart from that enabling this leads to a lot of exchanges of heart beats between nodes leading to flooding the network with those. @@ -392,6 +393,15 @@ Apart from these, the following properties are also available, and may be useful Port for the driver to listen on. + + spark.driver.add-dynamic-jars + false + + If true, the SparkContext uses a class loader to make jars added via `addJar` available to the SparkContext. + The default behavior is that jars added via `addJar` are only made available to executors, and Spark apps + must include all its jars in the application CLASSPATH even if `addJar` is used. + + spark.cleaner.ttl (infinite) @@ -430,7 +440,7 @@ Apart from these, the following properties are also available, and may be useful spark.broadcast.blockSize 4096 - Size of each piece of a block in kilobytes for TorrentBroadcastFactory. + Size of each piece of a block in kilobytes for TorrentBroadcastFactory. Too large a value decreases parallelism during broadcast (makes it slower); however, if it is too small, BlockManager might take a performance hit. From ae4058d8559df7659a6649cced9d25dc806dafca Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Mon, 10 Mar 2014 21:56:49 -0700 Subject: [PATCH 09/14] Minor changes --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- core/src/main/scala/org/apache/spark/SparkEnv.scala | 2 +- docs/cluster-overview.md | 4 ++-- docs/configuration.md | 9 +++++---- 4 files changed, 9 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ce4a1a3cd43bb..390b26be5b809 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -141,7 +141,7 @@ class SparkContext( // driver. Do this before all other initialization so that any thread pools created for this // SparkContext uses the class loader. // Note that this is config-enabled as classloaders can introduce subtle side effects - private[spark] val classLoader = if (conf.getBoolean("spark.driver.add-dynamic-jars", false)) { + private[spark] val classLoader = if (conf.getBoolean("spark.driver.loadAddedJars", false)) { val loader = new SparkURLClassLoader(Array.empty[URL], this.getClass.getClassLoader) Thread.currentThread.setContextClassLoader(loader) Some(loader) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index e915f56e813c2..3101dfc429749 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -136,7 +136,7 @@ object SparkEnv extends Logging { val classLoader = Thread.currentThread.getContextClassLoader val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port, conf = conf, - securityManager = securityManager, classLoader = classLoader) + securityManager = securityManager) // Bit of a hack: If this is the driver and our port was 0 (meaning bind to any free port), // figure out which port number Akka actually bound to and set spark.driver.port to it. diff --git a/docs/cluster-overview.md b/docs/cluster-overview.md index d3c058e0d4d3f..1aefe46570ff0 100644 --- a/docs/cluster-overview.md +++ b/docs/cluster-overview.md @@ -108,8 +108,8 @@ and `addFile`. - **hdfs:**, **http:**, **https:**, **ftp:** - these pull down files and JARs from the URI as expected - **local:** - a URI starting with local:/ is expected to exist as a local file on each worker node. This means that no network IO will be incurred, and works well for large files/JARs that are pushed to each worker, - or shared via NFS, GlusterFS, etc. Note that if `spark.driver.add-dynamic-jars` is set, then the file - must be visible to the node running the SparkContext as well. + or shared via NFS, GlusterFS, etc. Note that if `spark.driver.loadAddedJars` is set, + then the file must be visible to the node running the SparkContext as well. Note that JARs and files are copied to the working directory for each SparkContext on the executor nodes. Over time this can use up a significant amount of space and will need to be cleaned up. diff --git a/docs/configuration.md b/docs/configuration.md index 69a28f2a3b40d..1d4efdac15468 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -394,12 +394,13 @@ Apart from these, the following properties are also available, and may be useful - spark.driver.add-dynamic-jars + spark.driver.loadAddedJars false - If true, the SparkContext uses a class loader to make jars added via `addJar` available to the SparkContext. - The default behavior is that jars added via `addJar` are only made available to executors, and Spark apps - must include all its jars in the application CLASSPATH even if `addJar` is used. + If true, the SparkContext uses a class loader to make jars added via `addJar` available to + the SparkContext. The default behavior is that jars added via `addJar` are only made + available to executors, and Spark apps must include all its jars in the driver's + CLASSPATH even if `addJar` is used. From f12efb70aaffbed5e22c1ed62bde92cfd23881e1 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Tue, 11 Mar 2014 10:55:32 -0700 Subject: [PATCH 10/14] Adding unit tests. --- .../org/apache/spark/FileServerSuite.scala | 24 +----- .../scala/org/apache/spark/FileSuite.scala | 25 +++++- .../scala/org/apache/spark/TestUtils.scala | 80 +++++++++++++++++++ 3 files changed, 108 insertions(+), 21 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/TestUtils.scala diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala index aee9ab9091dac..9fb74341df0ff 100644 --- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala @@ -33,11 +33,12 @@ class FileServerSuite extends FunSuite with LocalSparkContext { override def beforeEach() { super.beforeEach() resetSparkContext() - System.setProperty("spark.authenticate", "false") } override def beforeAll() { super.beforeAll() + System.setProperty("spark.authenticate", "false") + val tmpDir = new File(Files.createTempDir(), "test") tmpDir.mkdir() @@ -47,27 +48,10 @@ class FileServerSuite extends FunSuite with LocalSparkContext { pw.close() val jarFile = new File(tmpDir, "test.jar") - val jarStream = new FileOutputStream(jarFile) - val jar = new JarOutputStream(jarStream, new java.util.jar.Manifest()) - System.setProperty("spark.authenticate", "false") - - val jarEntry = new JarEntry(textFile.getName) - jar.putNextEntry(jarEntry) - - val in = new FileInputStream(textFile) - val buffer = new Array[Byte](10240) - var nRead = 0 - while (nRead <= 0) { - nRead = in.read(buffer, 0, buffer.length) - jar.write(buffer, 0, nRead) - } - - in.close() - jar.close() - jarStream.close() + val jarUrl = TestUtils.createJar(Seq(textFile), jarFile) tmpFile = textFile - tmpJarUrl = jarFile.toURI.toURL.toString + tmpJarUrl = jarUrl.toString } test("Distributing files locally") { diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 01af94077144a..b3139d14d3f95 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark import java.io.{File, FileWriter} import scala.io.Source +import scala.util.Try import com.google.common.io.Files import org.apache.hadoop.io._ @@ -32,6 +33,28 @@ import org.scalatest.FunSuite import org.apache.spark.SparkContext._ class FileSuite extends FunSuite with LocalSparkContext { + test("adding jars to classpath at the driver") { + val tmpDir = Files.createTempDir() + val classFile = TestUtils.createCompiledClass("HelloSpark", tmpDir) + val jarFile = new File(tmpDir, "test.jar") + TestUtils.createJar(Seq(classFile), jarFile) + + def canLoadClass(clazz: String) = + Try(Class.forName(clazz, true, Thread.currentThread().getContextClassLoader)).isSuccess + + val driverLoadedBefore = canLoadClass("HelloSpark") + + val conf = new SparkConf().setMaster("local-cluster[1,1,512]").setAppName("test") + .set("spark.driver.loadAddedJars", "true") + + val sc = new SparkContext(conf) + sc.addJar(jarFile.getAbsolutePath) + + val driverLoadedAfter = canLoadClass("HelloSpark") + + assert(false === driverLoadedBefore, "Class visible before being added") + assert(true === driverLoadedAfter, "Class was not visible after being added") + } test("text files") { sc = new SparkContext("local", "test") @@ -106,7 +129,7 @@ class FileSuite extends FunSuite with LocalSparkContext { sc = new SparkContext("local", "test") val tempDir = Files.createTempDir() val outputDir = new File(tempDir, "output").getAbsolutePath - val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), "a" * x)) + val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), "a" * x)) nums.saveAsSequenceFile(outputDir) // Try reading the output back as a SequenceFile val output = sc.sequenceFile[IntWritable, Text](outputDir) diff --git a/core/src/test/scala/org/apache/spark/TestUtils.scala b/core/src/test/scala/org/apache/spark/TestUtils.scala new file mode 100644 index 0000000000000..6b72bacac47a2 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/TestUtils.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import scala.collection.JavaConversions._ + +import java.io.{FileInputStream, FileOutputStream, File} +import java.util.jar.{JarEntry, JarOutputStream} +import java.net.{URL, URI} +import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider} + +object TestUtils { + + /** Create a jar file that contains this set of files. All files will be located at the root + * of the jar. */ + def createJar(files: Seq[File], jarFile: File): URL = { + val jarFileStream = new FileOutputStream(jarFile) + val jarStream = new JarOutputStream(jarFileStream, new java.util.jar.Manifest()) + + for (file <- files) { + val jarEntry = new JarEntry(file.getName) + jarStream.putNextEntry(jarEntry) + + val in = new FileInputStream(file) + val buffer = new Array[Byte](10240) + var nRead = 0 + while (nRead <= 0) { + nRead = in.read(buffer, 0, buffer.length) + jarStream.write(buffer, 0, nRead) + } + in.close() + } + jarStream.close() + jarFileStream.close() + + jarFile.toURI.toURL + } + + // Adapted from the JavaCompiler.java doc examples + private val SOURCE = JavaFileObject.Kind.SOURCE + private def createURI(name: String) = { + URI.create(s"string:///${name.replace(".", "/")}${SOURCE.extension}") + } + private class JavaSourceFromString(val name: String, val code: String) + extends SimpleJavaFileObject(createURI(name), SOURCE) { + override def getCharContent(ignoreEncodingErrors: Boolean) = code + } + + /** Creates a compiled class with the given name. Class file will be placed in destDir. */ + def createCompiledClass(className: String, destDir: File): File = { + val compiler = ToolProvider.getSystemJavaCompiler + val sourceFile = new JavaSourceFromString(className, s"public class $className {}") + + // Calling this outputs a class file in pwd. It's easier to just rename the file than + // build a custom FileManager that controls the output location. + compiler.getTask(null, null, null, null, null, Seq(sourceFile)).call() + + val fileName = className + ".class" + val result = new File(fileName) + if (!result.exists()) throw new Exception("Compiled file not found: " + fileName) + val out = new File(destDir, fileName) + result.renameTo(out) + out + } +} From a3093cbaca00434075a5c8d04f7fbc5e88d15cfb Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Wed, 12 Mar 2014 10:00:56 -0700 Subject: [PATCH 11/14] Review feedback --- .../scala/org/apache/spark/SparkContext.scala | 6 ++- .../scala/org/apache/spark/FileSuite.scala | 43 ++++++++++++++++--- docs/configuration.md | 9 ++-- 3 files changed, 44 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 390b26be5b809..a37601d13ba3b 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -140,6 +140,8 @@ class SparkContext( // Create a classLoader for use by the driver so that jars added via addJar are available to the // driver. Do this before all other initialization so that any thread pools created for this // SparkContext uses the class loader. + // In the future it might make sense to expose this to users so they can assign it as the + // context class loader for other threads. // Note that this is config-enabled as classloaders can introduce subtle side effects private[spark] val classLoader = if (conf.getBoolean("spark.driver.loadAddedJars", false)) { val loader = new SparkURLClassLoader(Array.empty[URL], this.getClass.getClassLoader) @@ -777,8 +779,8 @@ class SparkContext( * Adds a JAR dependency for all tasks to be executed on this SparkContext in the future. * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported * filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node. - * NOTE: If you enable spark.driver.add-dynamic-jars, then the JAR will also be made available - * to this SparkContext. local: JARs must be available on the driver node. + * NOTE: If you enable spark.driver.loadAddedJars, then the JAR will also be made available + * to this SparkContext and chld threads. local: JARs must be available on the driver node. */ def addJar(path: String) { if (path == null) { diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index b3139d14d3f95..85a43dce5a263 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark import java.io.{File, FileWriter} +import java.util.concurrent.Semaphore import scala.io.Source import scala.util.Try @@ -42,18 +43,46 @@ class FileSuite extends FunSuite with LocalSparkContext { def canLoadClass(clazz: String) = Try(Class.forName(clazz, true, Thread.currentThread().getContextClassLoader)).isSuccess - val driverLoadedBefore = canLoadClass("HelloSpark") + val loadedBefore = canLoadClass("HelloSpark") val conf = new SparkConf().setMaster("local-cluster[1,1,512]").setAppName("test") .set("spark.driver.loadAddedJars", "true") - val sc = new SparkContext(conf) - sc.addJar(jarFile.getAbsolutePath) - - val driverLoadedAfter = canLoadClass("HelloSpark") - - assert(false === driverLoadedBefore, "Class visible before being added") + var driverLoadedAfter = false + var childLoadedAfter = false + + val sem = new Semaphore(1) + sem.acquire() + + new Thread() { + override def run() { + val sc = new SparkContext(conf) + sc.addJar(jarFile.getAbsolutePath) + driverLoadedAfter = canLoadClass("HelloSpark") + + // Test visibility in a child thread + val childSem = new Semaphore(1) + childSem.acquire() + new Thread() { + override def run() { + childLoadedAfter = canLoadClass("HelloSpark") + childSem.release() + } + }.start() + + childSem.acquire() + sem.release() + } + }.start() + sem.acquire() + + // Test visibility in a parent thread + val parentLoadedAfter = canLoadClass("HelloSpark") + + assert(false === loadedBefore, "Class visible before being added") assert(true === driverLoadedAfter, "Class was not visible after being added") + assert(true === childLoadedAfter, "Class was not visible to child thread after being added") + assert(false === parentLoadedAfter, "Class was visible to parent thread after being added") } test("text files") { diff --git a/docs/configuration.md b/docs/configuration.md index 1d4efdac15468..896ad0be5e9aa 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -111,7 +111,6 @@ Apart from these, the following properties are also available, and may be useful it if you configure your own old generation size. - spark.shuffle.memoryFraction 0.3 @@ -376,7 +375,7 @@ Apart from these, the following properties are also available, and may be useful spark.akka.heartbeat.interval 1000 - This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). A larger interval value in seconds reduces network overhead and a smaller value ( ~ 1 s) might be more informative for akka's failure detector. Tune this in combination of `spark.akka.heartbeat.pauses` and `spark.akka.failure-detector.threshold` if you need to. Only positive use case for using failure detector can be, a sensistive failure detector can help evict rogue executors really quick. However this is usually not the case as gc pauses and network lags are expected in a real spark cluster. Apart from that enabling this leads to a lot of exchanges of heart beats between nodes leading to flooding the network with those. + This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). A larger interval value in seconds reduces network overhead and a smaller value ( ~ 1 s) might be more informative for akka's failure detector. Tune this in combination of `spark.akka.heartbeat.pauses` and `spark.akka.failure-detector.threshold` if you need to. Only positive use case for using failure detector can be, a sensistive failure detector can help evict rogue executors really quick. However this is usually not the case as gc pauses and network lags are expected in a real spark cluster. Apart from that enabling this leads to a lot of exchanges of heart beats between nodes leading to flooding the network with those. @@ -398,9 +397,9 @@ Apart from these, the following properties are also available, and may be useful false If true, the SparkContext uses a class loader to make jars added via `addJar` available to - the SparkContext. The default behavior is that jars added via `addJar` are only made - available to executors, and Spark apps must include all its jars in the driver's - CLASSPATH even if `addJar` is used. + the SparkContext. The default behavior is that jars added via `addJar` must already be on + the classpath. Jar contents will be visible to the thread that created the SparkContext + and all of its child threads. From 60d2410cbe74f87ae5bdd09e9b9e6eddc4df4663 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Wed, 12 Mar 2014 10:51:45 -0700 Subject: [PATCH 12/14] Style fixes --- core/src/main/scala/org/apache/spark/SparkContext.scala | 9 +++++---- .../org/apache/spark/scheduler/TaskResultGetter.scala | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index a37601d13ba3b..163f9c88f2bdd 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -823,17 +823,18 @@ class SparkContext( path } - // Add jar to driver class loader so it is available for driver, even if it is not on the classpath + // Add jar to driver class loader so it is available for driver, + // even if it is not on the classpath uri.getScheme match { case null | "file" | "local" => - // Assume file exists on current (driver) node as well. Unlike executors, driver doesn't need to - // download the jar since it's local. + // Assume file exists on current (driver) node as well. Unlike executors, driver + // doesn't need to download the jar since it's local. addUrlToDriverLoader(new URL("file:" + uri.getPath)) case "http" | "https" | "ftp" => // Should be handled by the URLClassLoader, pass along entire URL addUrlToDriverLoader(new URL(path)) case other => - logWarning("This URI scheme for URI " + path + " is not supported by the driver class loader") + logWarning(s"This URI scheme for URI $path is not supported by the driver class loader") } } if (key != null) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala index 75f3d789eb42f..1747add15dd9e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala @@ -69,7 +69,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul } catch { case cnf: ClassNotFoundException => val loader = Thread.currentThread.getContextClassLoader - taskSetManager.abort("ClassNotFound [" + cnf.getMessage + "] with classloader: " + loader) + taskSetManager.abort(s"ClassNotFound [${cnf.getMessage}] with classloader: " + loader) case ex: Throwable => taskSetManager.abort("Exception while deserializing and fetching task: %s".format(ex)) } From 6938fcd66e968fcda56b6bbc1e000bb8d422caf1 Mon Sep 17 00:00:00 2001 From: Evan Chan Date: Mon, 7 Apr 2014 12:17:11 -0700 Subject: [PATCH 13/14] A few fixes to get Spark to compile --- .../main/scala/org/apache/spark/deploy/SparkSubmit.scala | 6 +++--- .../spark/executor/CoarseGrainedExecutorBackend.scala | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 1fa799190409f..c3855ea41ae43 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -20,7 +20,7 @@ package org.apache.spark.deploy import java.io.{PrintStream, File} import java.net.URL -import org.apache.spark.executor.ExecutorURLClassLoader +import org.apache.spark.util.SparkURLClassLoader import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap @@ -195,7 +195,7 @@ object SparkSubmit { System.err.println("\n") } - val loader = new ExecutorURLClassLoader(new Array[URL](0), + val loader = new SparkURLClassLoader(new Array[URL](0), Thread.currentThread.getContextClassLoader) Thread.currentThread.setContextClassLoader(loader) @@ -212,7 +212,7 @@ object SparkSubmit { mainMethod.invoke(null, childArgs.toArray) } - private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) { + private def addJarToClasspath(localJar: String, loader: SparkURLClassLoader) { val localJarFile = new File(localJar) if (!localJarFile.exists()) { printWarning(s"Jar $localJar does not exist, skipping.") diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 3486092a140fb..bc71b32ee8f11 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -101,7 +101,7 @@ private[spark] object CoarseGrainedExecutorBackend { // Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor // before getting started with all our system properties, etc val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0, - indestructible = true, conf = conf, new SecurityManager(conf)) + indestructible = true, conf = conf, securityManager = new SecurityManager(conf)) // set it val sparkHostPort = hostname + ":" + boundPort actorSystem.actorOf( From 5cd73817a94de656325d4d7680fb95f1e6256e65 Mon Sep 17 00:00:00 2001 From: Evan Chan Date: Mon, 7 Apr 2014 17:25:11 -0700 Subject: [PATCH 14/14] Reset context classloader after each test so test passes --- core/src/test/scala/org/apache/spark/FileSuite.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 85a43dce5a263..aab232c8f806f 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -34,6 +34,12 @@ import org.scalatest.FunSuite import org.apache.spark.SparkContext._ class FileSuite extends FunSuite with LocalSparkContext { + val loader = Thread.currentThread.getContextClassLoader + override def afterEach() { + super.afterEach() + Thread.currentThread.setContextClassLoader(loader) + } + test("adding jars to classpath at the driver") { val tmpDir = Files.createTempDir() val classFile = TestUtils.createCompiledClass("HelloSpark", tmpDir)