diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index e5ebd350eeced..80a7afb3999f8 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -18,7 +18,7 @@ package org.apache.spark import java.io._ -import java.net.URI +import java.net.{URI, URL} import java.util.concurrent.atomic.AtomicInteger import java.util.{Properties, UUID} import java.util.UUID.randomUUID @@ -45,7 +45,8 @@ import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, Me import org.apache.spark.scheduler.local.LocalBackend import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils} import org.apache.spark.ui.SparkUI -import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils} +import org.apache.spark.util.{Utils, TimeStampedHashMap, MetadataCleaner, MetadataCleanerType, + ClosureCleaner, SparkURLClassLoader} /** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark @@ -141,6 +142,18 @@ class SparkContext( // An asynchronous listener bus for Spark events private[spark] val listenerBus = new LiveListenerBus + // Create a classLoader for use by the driver so that jars added via addJar are available to the + // driver. Do this before all other initialization so that any thread pools created for this + // SparkContext uses the class loader. + // In the future it might make sense to expose this to users so they can assign it as the + // context class loader for other threads. + // Note that this is config-enabled as classloaders can introduce subtle side effects + private[spark] val classLoader = if (conf.getBoolean("spark.driver.loadAddedJars", false)) { + val loader = new SparkURLClassLoader(Array.empty[URL], this.getClass.getClassLoader) + Thread.currentThread.setContextClassLoader(loader) + Some(loader) + } else None + // Create the Spark execution environment (cache, map output tracker, etc) private[spark] val env = SparkEnv.create( conf, @@ -800,6 +813,8 @@ class SparkContext( * Adds a JAR dependency for all tasks to be executed on this SparkContext in the future. * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported * filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node. + * NOTE: If you enable spark.driver.loadAddedJars, then the JAR will also be made available + * to this SparkContext and chld threads. local: JARs must be available on the driver node. */ def addJar(path: String) { if (path == null) { @@ -841,6 +856,20 @@ class SparkContext( case _ => path } + + // Add jar to driver class loader so it is available for driver, + // even if it is not on the classpath + uri.getScheme match { + case null | "file" | "local" => + // Assume file exists on current (driver) node as well. Unlike executors, driver + // doesn't need to download the jar since it's local. + addUrlToDriverLoader(new URL("file:" + uri.getPath)) + case "http" | "https" | "ftp" => + // Should be handled by the URLClassLoader, pass along entire URL + addUrlToDriverLoader(new URL(path)) + case other => + logWarning(s"This URI scheme for URI $path is not supported by the driver class loader") + } } if (key != null) { addedJars(key) = System.currentTimeMillis @@ -850,6 +879,15 @@ class SparkContext( postEnvironmentUpdate() } + private def addUrlToDriverLoader(url: URL) { + classLoader.foreach { loader => + if (!loader.getURLs.contains(url)) { + logInfo("Adding JAR " + url + " to driver class loader") + loader.addURL(url) + } + } + } + /** * Clear the job's list of JARs added by `addJar` so that they do not get downloaded to * any new nodes. diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 5ceac28fe7afb..3101dfc429749 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -17,6 +17,7 @@ package org.apache.spark +import java.net.URL import scala.collection.JavaConversions._ import scala.collection.mutable import scala.concurrent.Await @@ -132,6 +133,7 @@ object SparkEnv extends Logging { } val securityManager = new SecurityManager(conf) + val classLoader = Thread.currentThread.getContextClassLoader val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port, conf = conf, securityManager = securityManager) @@ -142,8 +144,6 @@ object SparkEnv extends Logging { conf.set("spark.driver.port", boundPort.toString) } - val classLoader = Thread.currentThread.getContextClassLoader - // Create an instance of the class named by the given Java system property, or by // defaultClassName if the property is not set, and return it as a T def instantiateClass[T](propertyName: String, defaultClassName: String): T = { diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index e05fbfe321495..e00ecf728421d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -20,7 +20,7 @@ package org.apache.spark.deploy import java.io.{PrintStream, File} import java.net.URL -import org.apache.spark.executor.ExecutorURLClassLoader +import org.apache.spark.util.SparkURLClassLoader import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap @@ -198,7 +198,7 @@ object SparkSubmit { System.err.println("\n") } - val loader = new ExecutorURLClassLoader(new Array[URL](0), + val loader = new SparkURLClassLoader(new Array[URL](0), Thread.currentThread.getContextClassLoader) Thread.currentThread.setContextClassLoader(loader) @@ -215,7 +215,7 @@ object SparkSubmit { mainMethod.invoke(null, childArgs.toArray) } - private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) { + private def addJarToClasspath(localJar: String, loader: SparkURLClassLoader) { val localJarFile = new File(localJar) if (!localJarFile.exists()) { printWarning(s"Jar $localJar does not exist, skipping.") diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 16887d8892b31..7b187d0909767 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -102,7 +102,7 @@ private[spark] object CoarseGrainedExecutorBackend { // Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor // before getting started with all our system properties, etc val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0, - indestructible = true, conf = conf, new SecurityManager(conf)) + indestructible = true, conf = conf, securityManager = new SecurityManager(conf)) // set it val sparkHostPort = hostname + ":" + boundPort actorSystem.actorOf( diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index aecb069e4202b..9f61c406a0454 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -29,7 +29,7 @@ import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler._ import org.apache.spark.storage.{StorageLevel, TaskResultBlockId} -import org.apache.spark.util.{AkkaUtils, Utils} +import org.apache.spark.util.{AkkaUtils, Utils, SparkURLClassLoader} /** * Spark executor used with Mesos, YARN, and the standalone scheduler. @@ -291,15 +291,15 @@ private[spark] class Executor( * Create a ClassLoader for use in tasks, adding any JARs specified by the user or any classes * created by the interpreter to the search path */ - private def createClassLoader(): ExecutorURLClassLoader = { - val loader = Thread.currentThread().getContextClassLoader + private def createClassLoader(): SparkURLClassLoader = { + val loader = this.getClass.getClassLoader // For each of the jars in the jarSet, add them to the class loader. // We assume each of the files has already been fetched. val urls = currentJars.keySet.map { uri => new File(uri.split("/").last).toURI.toURL }.toArray - new ExecutorURLClassLoader(urls, loader) + new SparkURLClassLoader(urls, loader) } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala index cb4ad4ae9350c..1747add15dd9e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala @@ -69,7 +69,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul } catch { case cnf: ClassNotFoundException => val loader = Thread.currentThread.getContextClassLoader - taskSetManager.abort("ClassNotFound with classloader: " + loader) + taskSetManager.abort(s"ClassNotFound [${cnf.getMessage}] with classloader: " + loader) case ex: Throwable => taskSetManager.abort("Exception while deserializing and fetching task: %s".format(ex)) } diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index d0ff17db632c1..46eb98bc9e3ed 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -42,7 +42,9 @@ private[spark] object AkkaUtils extends Logging { * of a fatal exception. This is used by [[org.apache.spark.executor.Executor]]. */ def createActorSystem(name: String, host: String, port: Int, indestructible: Boolean = false, - conf: SparkConf, securityManager: SecurityManager): (ActorSystem, Int) = { + conf: SparkConf, securityManager: SecurityManager, + classLoader: ClassLoader = Thread.currentThread.getContextClassLoader) + : (ActorSystem, Int) = { val akkaThreads = conf.getInt("spark.akka.threads", 4) val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15) @@ -102,9 +104,9 @@ private[spark] object AkkaUtils extends Logging { """.stripMargin)) val actorSystem = if (indestructible) { - IndestructibleActorSystem(name, akkaConf) + IndestructibleActorSystem(name, akkaConf, classLoader) } else { - ActorSystem(name, akkaConf) + ActorSystem(name, akkaConf, classLoader) } val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala b/core/src/main/scala/org/apache/spark/util/SparkURLClassLoader.scala similarity index 89% rename from core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala rename to core/src/main/scala/org/apache/spark/util/SparkURLClassLoader.scala index f9bfe8ed2f5ba..19134aca496f4 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala +++ b/core/src/main/scala/org/apache/spark/util/SparkURLClassLoader.scala @@ -15,14 +15,14 @@ * limitations under the License. */ -package org.apache.spark.executor +package org.apache.spark.util import java.net.{URLClassLoader, URL} /** * The addURL method in URLClassLoader is protected. We subclass it to make this accessible. */ -private[spark] class ExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader) +private[spark] class SparkURLClassLoader(urls: Array[URL], parent: ClassLoader) extends URLClassLoader(urls, parent) { override def addURL(url: URL) { diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala index aee9ab9091dac..9fb74341df0ff 100644 --- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala @@ -33,11 +33,12 @@ class FileServerSuite extends FunSuite with LocalSparkContext { override def beforeEach() { super.beforeEach() resetSparkContext() - System.setProperty("spark.authenticate", "false") } override def beforeAll() { super.beforeAll() + System.setProperty("spark.authenticate", "false") + val tmpDir = new File(Files.createTempDir(), "test") tmpDir.mkdir() @@ -47,27 +48,10 @@ class FileServerSuite extends FunSuite with LocalSparkContext { pw.close() val jarFile = new File(tmpDir, "test.jar") - val jarStream = new FileOutputStream(jarFile) - val jar = new JarOutputStream(jarStream, new java.util.jar.Manifest()) - System.setProperty("spark.authenticate", "false") - - val jarEntry = new JarEntry(textFile.getName) - jar.putNextEntry(jarEntry) - - val in = new FileInputStream(textFile) - val buffer = new Array[Byte](10240) - var nRead = 0 - while (nRead <= 0) { - nRead = in.read(buffer, 0, buffer.length) - jar.write(buffer, 0, nRead) - } - - in.close() - jar.close() - jarStream.close() + val jarUrl = TestUtils.createJar(Seq(textFile), jarFile) tmpFile = textFile - tmpJarUrl = jarFile.toURI.toURL.toString + tmpJarUrl = jarUrl.toString } test("Distributing files locally") { diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 01af94077144a..aab232c8f806f 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -18,8 +18,10 @@ package org.apache.spark import java.io.{File, FileWriter} +import java.util.concurrent.Semaphore import scala.io.Source +import scala.util.Try import com.google.common.io.Files import org.apache.hadoop.io._ @@ -32,6 +34,62 @@ import org.scalatest.FunSuite import org.apache.spark.SparkContext._ class FileSuite extends FunSuite with LocalSparkContext { + val loader = Thread.currentThread.getContextClassLoader + override def afterEach() { + super.afterEach() + Thread.currentThread.setContextClassLoader(loader) + } + + test("adding jars to classpath at the driver") { + val tmpDir = Files.createTempDir() + val classFile = TestUtils.createCompiledClass("HelloSpark", tmpDir) + val jarFile = new File(tmpDir, "test.jar") + TestUtils.createJar(Seq(classFile), jarFile) + + def canLoadClass(clazz: String) = + Try(Class.forName(clazz, true, Thread.currentThread().getContextClassLoader)).isSuccess + + val loadedBefore = canLoadClass("HelloSpark") + + val conf = new SparkConf().setMaster("local-cluster[1,1,512]").setAppName("test") + .set("spark.driver.loadAddedJars", "true") + + var driverLoadedAfter = false + var childLoadedAfter = false + + val sem = new Semaphore(1) + sem.acquire() + + new Thread() { + override def run() { + val sc = new SparkContext(conf) + sc.addJar(jarFile.getAbsolutePath) + driverLoadedAfter = canLoadClass("HelloSpark") + + // Test visibility in a child thread + val childSem = new Semaphore(1) + childSem.acquire() + new Thread() { + override def run() { + childLoadedAfter = canLoadClass("HelloSpark") + childSem.release() + } + }.start() + + childSem.acquire() + sem.release() + } + }.start() + sem.acquire() + + // Test visibility in a parent thread + val parentLoadedAfter = canLoadClass("HelloSpark") + + assert(false === loadedBefore, "Class visible before being added") + assert(true === driverLoadedAfter, "Class was not visible after being added") + assert(true === childLoadedAfter, "Class was not visible to child thread after being added") + assert(false === parentLoadedAfter, "Class was visible to parent thread after being added") + } test("text files") { sc = new SparkContext("local", "test") @@ -106,7 +164,7 @@ class FileSuite extends FunSuite with LocalSparkContext { sc = new SparkContext("local", "test") val tempDir = Files.createTempDir() val outputDir = new File(tempDir, "output").getAbsolutePath - val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), "a" * x)) + val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), "a" * x)) nums.saveAsSequenceFile(outputDir) // Try reading the output back as a SequenceFile val output = sc.sequenceFile[IntWritable, Text](outputDir) diff --git a/docs/cluster-overview.md b/docs/cluster-overview.md index 7f75ea44e4cea..41577bfcb74bd 100644 --- a/docs/cluster-overview.md +++ b/docs/cluster-overview.md @@ -108,7 +108,8 @@ and `addFile`. - **hdfs:**, **http:**, **https:**, **ftp:** - these pull down files and JARs from the URI as expected - **local:** - a URI starting with local:/ is expected to exist as a local file on each worker node. This means that no network IO will be incurred, and works well for large files/JARs that are pushed to each worker, - or shared via NFS, GlusterFS, etc. + or shared via NFS, GlusterFS, etc. Note that if `spark.driver.loadAddedJars` is set, + then the file must be visible to the node running the SparkContext as well. Note that JARs and files are copied to the working directory for each SparkContext on the executor nodes. Over time this can use up a significant amount of space and will need to be cleaned up. diff --git a/docs/configuration.md b/docs/configuration.md index 57bda20edcdf1..8302f2530873f 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -433,6 +433,16 @@ Apart from these, the following properties are also available, and may be useful Port for the driver to listen on. +