From 46fe70a644e7e4f22e3c78b6e4689472327572c0 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Wed, 10 Dec 2014 16:20:10 -0800 Subject: [PATCH 1/9] SPARK-4687. Add a addDirectory API --- .../scala/org/apache/spark/SparkContext.scala | 27 ++++++++++- .../scala/org/apache/spark/SparkEnv.scala | 7 +-- .../org/apache/spark/executor/Executor.scala | 19 ++++++-- .../org/apache/spark/scheduler/Task.scala | 45 +++++++++---------- .../spark/scheduler/TaskSetManager.scala | 3 +- .../scala/org/apache/spark/util/Utils.scala | 34 ++++++++++++++ .../org/apache/spark/SparkContextSuite.scala | 43 ++++++++++++++++++ 7 files changed, 143 insertions(+), 35 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 3c61c10820ba9..54406f9b66ce0 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -246,6 +246,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // Used to store a URL for each static file/jar together with the file's local timestamp private[spark] val addedFiles = HashMap[String, Long]() private[spark] val addedJars = HashMap[String, Long]() + private[spark] val addedDirs = HashMap[String, Long]() // Keeps track of all persisted RDDs private[spark] val persistentRdds = new TimeStampedWeakValueHashMap[Int, RDD[_]] @@ -1014,6 +1015,27 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli postEnvironmentUpdate() } + /** + * Add a directory to be downloaded with this Spark job on every node. + * The `path` passed must be a directory in HDFS (or other Hadoop-supported + * filesystems). To access the directory in Spark jobs, use + * `SparkFiles.get(directoryName)` to find its download location. + */ + def addDirectory(path: String, fetchLocal: Boolean = true): Unit = { + val timestamp = System.currentTimeMillis + // TODO: check if addedDirs already contains path and throw an exception if so + addedDirs(path) = timestamp + + if (fetchLocal) { + // Fetch the file locally in case a job is executed using DAGScheduler.runLocally(). + Utils.fetchHcfsDir(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager, + hadoopConfiguration) + } + + logInfo("Added dir " + path + " at " + path + " with timestamp " + addedDirs(path)) + postEnvironmentUpdate() + } + /** * :: DeveloperApi :: * Register a listener to receive up-calls from events that happen during execution. @@ -1549,8 +1571,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli val schedulingMode = getSchedulingMode.toString val addedJarPaths = addedJars.keys.toSeq val addedFilePaths = addedFiles.keys.toSeq - val environmentDetails = - SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths, addedFilePaths) + val addedDirPaths = addedDirs.keys.toSeq + val environmentDetails = SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths, + addedFilePaths, addedDirPaths) val environmentUpdate = SparkListenerEnvironmentUpdate(environmentDetails) listenerBus.post(environmentUpdate) } diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 1264a8126153b..db276edf70ec8 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -379,7 +379,8 @@ object SparkEnv extends Logging { conf: SparkConf, schedulingMode: String, addedJars: Seq[String], - addedFiles: Seq[String]): Map[String, Seq[(String, String)]] = { + addedFiles: Seq[String], + addedDirs: Seq[String]): Map[String, Seq[(String, String)]] = { import Properties._ val jvmInformation = Seq( @@ -409,8 +410,8 @@ object SparkEnv extends Logging { .split(File.pathSeparator) .filterNot(_.isEmpty) .map((_, "System Classpath")) - val addedJarsAndFiles = (addedJars ++ addedFiles).map((_, "Added By User")) - val classPaths = (addedJarsAndFiles ++ classPathEntries).sorted + val addedResources = (addedJars ++ addedFiles ++ addedDirs).map((_, "Added By User")) + val classPaths = (addedResources ++ classPathEntries).sorted Map[String, Seq[(String, String)]]( "JVM Information" -> jvmInformation, diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 312bb3a1daaa3..38e5cb921fec6 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -50,9 +50,10 @@ private[spark] class Executor( logInfo(s"Starting executor ID $executorId on host $executorHostname") // Application dependencies (added through SparkContext) that we've fetched so far on this node. - // Each map holds the master's timestamp for the version of that file or JAR we got. + // Each map holds the master's timestamp for the version of that file, JAR, or directory we got. private val currentFiles: HashMap[String, Long] = new HashMap[String, Long]() private val currentJars: HashMap[String, Long] = new HashMap[String, Long]() + private val currentDirs: HashMap[String, Long] = new HashMap[String, Long]() private val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0)) @@ -171,8 +172,9 @@ private[spark] class Executor( startGCTime = gcTime try { - val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask) - updateDependencies(taskFiles, taskJars) + val (taskFiles, taskJars, taskDirs, taskBytes) = + Task.deserializeWithDependencies(serializedTask) + updateDependencies(taskFiles, taskJars, taskDirs) task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader) // If this task has been killed before we deserialized it, let's quit now. Otherwise, @@ -333,7 +335,10 @@ private[spark] class Executor( * Download any missing dependencies if we receive a new set of files and JARs from the * SparkContext. Also adds any new JARs we fetched to the class loader. */ - private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) { + private def updateDependencies( + newFiles: HashMap[String, Long], + newJars: HashMap[String, Long], + newDirs: HashMap[String, Long]) { lazy val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) synchronized { // Fetch missing dependencies @@ -358,6 +363,12 @@ private[spark] class Executor( urlClassLoader.addURL(url) } } + for ((name, timestamp) <- newDirs if currentDirs.getOrElse(name, -1L) < timestamp) { + logInfo("Fetching " + name + " with timestamp " + timestamp) + Utils.fetchHcfsDir(name, new File(SparkFiles.getRootDirectory), conf, + env.securityManager, hadoopConf) + currentDirs(name) = timestamp + } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 847a4912eec13..6620e1b8424e8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -124,24 +124,19 @@ private[spark] object Task { task: Task[_], currentFiles: HashMap[String, Long], currentJars: HashMap[String, Long], + currentDirs: HashMap[String, Long], serializer: SerializerInstance) : ByteBuffer = { val out = new ByteArrayOutputStream(4096) val dataOut = new DataOutputStream(out) - // Write currentFiles - dataOut.writeInt(currentFiles.size) - for ((name, timestamp) <- currentFiles) { - dataOut.writeUTF(name) - dataOut.writeLong(timestamp) - } - - // Write currentJars - dataOut.writeInt(currentJars.size) - for ((name, timestamp) <- currentJars) { - dataOut.writeUTF(name) - dataOut.writeLong(timestamp) + Array(currentFiles, currentJars, currentDirs).foreach { resourceMap => + dataOut.writeInt(resourceMap.size) + for ((name, timestamp) <- resourceMap) { + dataOut.writeUTF(name) + dataOut.writeLong(timestamp) + } } // Write the task itself and finish @@ -159,27 +154,27 @@ private[spark] object Task { * @return (taskFiles, taskJars, taskBytes) */ def deserializeWithDependencies(serializedTask: ByteBuffer) - : (HashMap[String, Long], HashMap[String, Long], ByteBuffer) = { + : (HashMap[String, Long], HashMap[String, Long], HashMap[String, Long], ByteBuffer) = { val in = new ByteBufferInputStream(serializedTask) val dataIn = new DataInputStream(in) - // Read task's files - val taskFiles = new HashMap[String, Long]() - val numFiles = dataIn.readInt() - for (i <- 0 until numFiles) { - taskFiles(dataIn.readUTF()) = dataIn.readLong() + def deserializeResourceMap(): HashMap[String, Long] = { + val map = new HashMap[String, Long]() + val numFiles = dataIn.readInt() + for (i <- 0 until numFiles) { + map(dataIn.readUTF()) = dataIn.readLong() + } + map } - // Read task's JARs - val taskJars = new HashMap[String, Long]() - val numJars = dataIn.readInt() - for (i <- 0 until numJars) { - taskJars(dataIn.readUTF()) = dataIn.readLong() - } + // Read task's files + val taskFiles = deserializeResourceMap() + val taskJars = deserializeResourceMap() + val taskDirs = deserializeResourceMap() // Create a sub-buffer for the rest of the data, which is the serialized Task object val subBuffer = serializedTask.slice() // ByteBufferInputStream will have read just up to task - (taskFiles, taskJars, subBuffer) + (taskFiles, taskJars, taskDirs, subBuffer) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 5c94c6bbcb37b..4814b2b8a2afa 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -460,7 +460,8 @@ private[spark] class TaskSetManager( // Serialize and return the task val startTime = clock.getTime() val serializedTask: ByteBuffer = try { - Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser) + Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, + sched.sc.addedDirs, ser) } catch { // If the task cannot be serialized, then there's no point to re-attempt the task, // as it will always fail. So just abort the whole task-set. diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 86ac307fc84ba..06bb7055cb63b 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -569,6 +569,40 @@ private[spark] object Utils extends Logging { } } + /** + * Copies a remote directory from a Hadoop-compatible file system to local disk. + */ + def fetchHcfsDir( + url: String, + targetDir: File, + conf: SparkConf, + securityMgr: SecurityManager, + hadoopConf: Configuration): Unit = { + val uri = new URI(url) + val fs = getHadoopFileSystem(uri, hadoopConf) + val path = new Path(uri) + doFetchHcfsDir(path, new File(targetDir, path.getName), fs, conf, securityMgr, hadoopConf) + } + + def doFetchHcfsDir( + path: Path, + targetDir: File, + fs: FileSystem, + conf: SparkConf, + securityMgr: SecurityManager, + hadoopConf: Configuration): Unit = { + targetDir.mkdir() + fs.listStatus(path).foreach { fileStatus => + val innerPath = fileStatus.getPath + if (fileStatus.isDirectory) { + doFetchHcfsDir(innerPath, new File(targetDir, innerPath.getName), fs, conf, securityMgr, + hadoopConf) + } else { + doFetchFile(innerPath.toString, targetDir, innerPath.getName, conf, securityMgr, hadoopConf) + } + } + } + /** * Get the path of a temporary directory. Spark's local directories can be configured through * multiple settings, which are used with the following precedence: diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 8b3c6871a7b39..84310496985a6 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark import org.scalatest.FunSuite import org.apache.hadoop.io.BytesWritable +import java.io.File class SparkContextSuite extends FunSuite with LocalSparkContext { @@ -72,4 +73,46 @@ class SparkContextSuite extends FunSuite with LocalSparkContext { val byteArray2 = converter.convert(bytesWritable) assert(byteArray2.length === 0) } + + test("addFile recursive works") { + val pluto = new File("pluto") + val neptune = new File(pluto, "neptune") + val saturn = new File(neptune, "saturn") + val alien1 = new File(neptune, "alien1") + val alien2 = new File(saturn, "alien2") + + try { + assert(neptune.mkdirs()) + assert(saturn.mkdir()) + assert(alien1.createNewFile()) + assert(alien2.createNewFile()) + + sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) + sc.addDirectory(neptune.getAbsolutePath) + sc.parallelize(Array(1), 1).map(x => { + val sep = File.separator + if (!new File(SparkFiles.get("neptune" + sep + "alien1")).exists()) { + throw new SparkException("can't access file under root added directory") + } + if (!new File(SparkFiles.get("neptune" + sep + "saturn" + sep + "alien2")).exists()) { + throw new SparkException("can't access file in nested directory") + } + if (new File(SparkFiles.get("pluto" + sep + "neptune" + sep + "alien1")).exists()) { + throw new SparkException("file exists that shouldn't") + } + x + }).count() + } finally { + sc.stop() + alien2.delete() + saturn.delete() + alien1.delete() + neptune.delete() + pluto.delete() + } + } + + test("addFile recursive can't add the same directory twice") { + + } } From 0239c3d2a3ca8767b062677344449eb17fc818a5 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Wed, 14 Jan 2015 10:06:42 -0800 Subject: [PATCH 2/9] Change addDirectory to addFile with recursive --- .../scala/org/apache/spark/SparkContext.scala | 89 +++++++++++-------- .../scala/org/apache/spark/SparkEnv.scala | 7 +- .../org/apache/spark/executor/Executor.scala | 12 +-- .../org/apache/spark/scheduler/Task.scala | 45 +++++----- .../spark/scheduler/TaskSetManager.scala | 3 +- .../scala/org/apache/spark/util/Utils.scala | 55 +++++------- .../org/apache/spark/SparkContextSuite.scala | 2 +- 7 files changed, 110 insertions(+), 103 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 54406f9b66ce0..bcb791482f99f 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -24,29 +24,37 @@ import java.net.URI import java.util.{Arrays, Properties, UUID} import java.util.concurrent.atomic.AtomicInteger import java.util.UUID.randomUUID + import scala.collection.{Map, Set} import scala.collection.JavaConversions._ import scala.collection.generic.Growable import scala.collection.mutable.HashMap import scala.reflect.{ClassTag, classTag} + +import akka.actor.Props + import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path -import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable} -import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat, TextInputFormat} +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, + FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable} +import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat, + TextInputFormat} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob} import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} + import org.apache.mesos.MesosNativeLibrary -import akka.actor.Props import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} import org.apache.spark.executor.TriggerThreadDump -import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextFileInputFormat, FixedLengthBinaryInputFormat} +import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextFileInputFormat, + FixedLengthBinaryInputFormat} import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} import org.apache.spark.rdd._ import org.apache.spark.scheduler._ -import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SparkDeploySchedulerBackend, SimrSchedulerBackend} +import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, + SparkDeploySchedulerBackend, SimrSchedulerBackend} import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} import org.apache.spark.scheduler.local.LocalBackend import org.apache.spark.storage._ @@ -246,7 +254,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // Used to store a URL for each static file/jar together with the file's local timestamp private[spark] val addedFiles = HashMap[String, Long]() private[spark] val addedJars = HashMap[String, Long]() - private[spark] val addedDirs = HashMap[String, Long]() // Keeps track of all persisted RDDs private[spark] val persistentRdds = new TimeStampedWeakValueHashMap[Int, RDD[_]] @@ -997,12 +1004,46 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs, * use `SparkFiles.get(fileName)` to find its download location. */ - def addFile(path: String) { + def addFile(path: String): Unit = { + addFile(path, false) + } + + /** + * Add a file to be downloaded with this Spark job on every node. + * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported + * filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs, + * use `SparkFiles.get(fileName)` to find its download location. + * + * A directory can be given if the recursive option is set to true. Currently directories are only + * supported for Hadoop-supported filesystems. + */ + def addFile(path: String, recursive: Boolean): Unit = { val uri = new URI(path) - val key = uri.getScheme match { - case null | "file" => env.httpFileServer.addFile(new File(uri.getPath)) - case "local" => "file:" + uri.getPath - case _ => path + val schemeCorrectedPath = uri.getScheme match { + case null | "local" => "file:" + uri.getPath + case _ => path + } + + val hadoopPath = new Path(schemeCorrectedPath) + val scheme = new URI(schemeCorrectedPath).getScheme + if (!Array("http", "https", "ftp").contains(scheme)) { + val fs = hadoopPath.getFileSystem(hadoopConfiguration) + if (!fs.exists(hadoopPath)) { + throw new SparkException(s"Added file $hadoopPath does not exist.") + } + val isDir = fs.isDirectory(hadoopPath) + if (scheme == "file" && isDir) { + throw new SparkException(s"addFile does not support adding local directories.") + } + if (!recursive && isDir) { + throw new SparkException(s"Added file $hadoopPath is a directory and recursive is not " + + "turned on.") + } + } + + val key = scheme match { + case "file" => env.httpFileServer.addFile(new File(uri.getPath)) + case _ => path } val timestamp = System.currentTimeMillis addedFiles(key) = timestamp @@ -1015,27 +1056,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli postEnvironmentUpdate() } - /** - * Add a directory to be downloaded with this Spark job on every node. - * The `path` passed must be a directory in HDFS (or other Hadoop-supported - * filesystems). To access the directory in Spark jobs, use - * `SparkFiles.get(directoryName)` to find its download location. - */ - def addDirectory(path: String, fetchLocal: Boolean = true): Unit = { - val timestamp = System.currentTimeMillis - // TODO: check if addedDirs already contains path and throw an exception if so - addedDirs(path) = timestamp - - if (fetchLocal) { - // Fetch the file locally in case a job is executed using DAGScheduler.runLocally(). - Utils.fetchHcfsDir(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager, - hadoopConfiguration) - } - - logInfo("Added dir " + path + " at " + path + " with timestamp " + addedDirs(path)) - postEnvironmentUpdate() - } - /** * :: DeveloperApi :: * Register a listener to receive up-calls from events that happen during execution. @@ -1571,9 +1591,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli val schedulingMode = getSchedulingMode.toString val addedJarPaths = addedJars.keys.toSeq val addedFilePaths = addedFiles.keys.toSeq - val addedDirPaths = addedDirs.keys.toSeq val environmentDetails = SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths, - addedFilePaths, addedDirPaths) + addedFilePaths) val environmentUpdate = SparkListenerEnvironmentUpdate(environmentDetails) listenerBus.post(environmentUpdate) } diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index db276edf70ec8..1264a8126153b 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -379,8 +379,7 @@ object SparkEnv extends Logging { conf: SparkConf, schedulingMode: String, addedJars: Seq[String], - addedFiles: Seq[String], - addedDirs: Seq[String]): Map[String, Seq[(String, String)]] = { + addedFiles: Seq[String]): Map[String, Seq[(String, String)]] = { import Properties._ val jvmInformation = Seq( @@ -410,8 +409,8 @@ object SparkEnv extends Logging { .split(File.pathSeparator) .filterNot(_.isEmpty) .map((_, "System Classpath")) - val addedResources = (addedJars ++ addedFiles ++ addedDirs).map((_, "Added By User")) - val classPaths = (addedResources ++ classPathEntries).sorted + val addedJarsAndFiles = (addedJars ++ addedFiles).map((_, "Added By User")) + val classPaths = (addedJarsAndFiles ++ classPathEntries).sorted Map[String, Seq[(String, String)]]( "JVM Information" -> jvmInformation, diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 38e5cb921fec6..caf9cffbc428f 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -53,7 +53,6 @@ private[spark] class Executor( // Each map holds the master's timestamp for the version of that file, JAR, or directory we got. private val currentFiles: HashMap[String, Long] = new HashMap[String, Long]() private val currentJars: HashMap[String, Long] = new HashMap[String, Long]() - private val currentDirs: HashMap[String, Long] = new HashMap[String, Long]() private val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0)) @@ -174,7 +173,7 @@ private[spark] class Executor( try { val (taskFiles, taskJars, taskDirs, taskBytes) = Task.deserializeWithDependencies(serializedTask) - updateDependencies(taskFiles, taskJars, taskDirs) + updateDependencies(taskFiles, taskJars) task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader) // If this task has been killed before we deserialized it, let's quit now. Otherwise, @@ -337,8 +336,7 @@ private[spark] class Executor( */ private def updateDependencies( newFiles: HashMap[String, Long], - newJars: HashMap[String, Long], - newDirs: HashMap[String, Long]) { + newJars: HashMap[String, Long]) { lazy val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) synchronized { // Fetch missing dependencies @@ -363,12 +361,6 @@ private[spark] class Executor( urlClassLoader.addURL(url) } } - for ((name, timestamp) <- newDirs if currentDirs.getOrElse(name, -1L) < timestamp) { - logInfo("Fetching " + name + " with timestamp " + timestamp) - Utils.fetchHcfsDir(name, new File(SparkFiles.getRootDirectory), conf, - env.securityManager, hadoopConf) - currentDirs(name) = timestamp - } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 6620e1b8424e8..847a4912eec13 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -124,19 +124,24 @@ private[spark] object Task { task: Task[_], currentFiles: HashMap[String, Long], currentJars: HashMap[String, Long], - currentDirs: HashMap[String, Long], serializer: SerializerInstance) : ByteBuffer = { val out = new ByteArrayOutputStream(4096) val dataOut = new DataOutputStream(out) - Array(currentFiles, currentJars, currentDirs).foreach { resourceMap => - dataOut.writeInt(resourceMap.size) - for ((name, timestamp) <- resourceMap) { - dataOut.writeUTF(name) - dataOut.writeLong(timestamp) - } + // Write currentFiles + dataOut.writeInt(currentFiles.size) + for ((name, timestamp) <- currentFiles) { + dataOut.writeUTF(name) + dataOut.writeLong(timestamp) + } + + // Write currentJars + dataOut.writeInt(currentJars.size) + for ((name, timestamp) <- currentJars) { + dataOut.writeUTF(name) + dataOut.writeLong(timestamp) } // Write the task itself and finish @@ -154,27 +159,27 @@ private[spark] object Task { * @return (taskFiles, taskJars, taskBytes) */ def deserializeWithDependencies(serializedTask: ByteBuffer) - : (HashMap[String, Long], HashMap[String, Long], HashMap[String, Long], ByteBuffer) = { + : (HashMap[String, Long], HashMap[String, Long], ByteBuffer) = { val in = new ByteBufferInputStream(serializedTask) val dataIn = new DataInputStream(in) - def deserializeResourceMap(): HashMap[String, Long] = { - val map = new HashMap[String, Long]() - val numFiles = dataIn.readInt() - for (i <- 0 until numFiles) { - map(dataIn.readUTF()) = dataIn.readLong() - } - map + // Read task's files + val taskFiles = new HashMap[String, Long]() + val numFiles = dataIn.readInt() + for (i <- 0 until numFiles) { + taskFiles(dataIn.readUTF()) = dataIn.readLong() } - // Read task's files - val taskFiles = deserializeResourceMap() - val taskJars = deserializeResourceMap() - val taskDirs = deserializeResourceMap() + // Read task's JARs + val taskJars = new HashMap[String, Long]() + val numJars = dataIn.readInt() + for (i <- 0 until numJars) { + taskJars(dataIn.readUTF()) = dataIn.readLong() + } // Create a sub-buffer for the rest of the data, which is the serialized Task object val subBuffer = serializedTask.slice() // ByteBufferInputStream will have read just up to task - (taskFiles, taskJars, taskDirs, subBuffer) + (taskFiles, taskJars, subBuffer) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 4814b2b8a2afa..5c94c6bbcb37b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -460,8 +460,7 @@ private[spark] class TaskSetManager( // Serialize and return the task val startTime = clock.getTime() val serializedTask: ByteBuffer = try { - Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, - sched.sc.addedDirs, ser) + Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser) } catch { // If the task cannot be serialized, then there's no point to re-attempt the task, // as it will always fail. So just abort the whole task-set. diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 06bb7055cb63b..96af38ea9ad70 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -359,8 +359,10 @@ private[spark] object Utils extends Logging { } /** - * Download a file to target directory. Supports fetching the file in a variety of ways, - * including HTTP, HDFS and files on a standard filesystem, based on the URL parameter. + * Download a file or directory to target directory. Supports fetching the file in a variety of + * ways, including HTTP, Hadoop-compatible filesystems, and files on a standard filesystem, based + * on the URL parameter. Fetching directories is only supported from Hadoop-compatible + * filesystems. * * If `useCache` is true, first attempts to fetch the file to a local cache that's shared * across executors running the same application. `useCache` is used mainly for @@ -429,7 +431,6 @@ private[spark] object Utils extends Logging { * * @param url URL that `sourceFile` originated from, for logging purposes. * @param in InputStream to download. - * @param tempFile File path to download `in` to. * @param destFile File path to move `tempFile` to. * @param fileOverwrite Whether to delete/overwrite an existing `destFile` that does not match * `sourceFile` @@ -437,9 +438,11 @@ private[spark] object Utils extends Logging { private def downloadFile( url: String, in: InputStream, - tempFile: File, destFile: File, fileOverwrite: Boolean): Unit = { + val tempFile = File.createTempFile("fetchFileTemp", null, + new File(destFile.getParentFile.getAbsolutePath)) + logInfo("Fetching " + url + " to " + tempFile) try { val out = new FileOutputStream(tempFile) @@ -518,8 +521,10 @@ private[spark] object Utils extends Logging { } /** - * Download a file to target directory. Supports fetching the file in a variety of ways, - * including HTTP, HDFS and files on a standard filesystem, based on the URL parameter. + * Download a file or directory to target directory. Supports fetching the file in a variety of + * ways, including HTTP, Hadoop-compatible filesystems, and files on a standard filesystem, based + * on the URL parameter. Fetching directories is only supported from Hadoop-compatible + * filesystems. * * Throws SparkException if the target file already exists and has different contents than * the requested file. @@ -531,14 +536,11 @@ private[spark] object Utils extends Logging { conf: SparkConf, securityMgr: SecurityManager, hadoopConf: Configuration) { - val tempFile = File.createTempFile("fetchFileTemp", null, new File(targetDir.getAbsolutePath)) val targetFile = new File(targetDir, filename) val uri = new URI(url) val fileOverwrite = conf.getBoolean("spark.files.overwrite", defaultValue = false) Option(uri.getScheme).getOrElse("file") match { case "http" | "https" | "ftp" => - logInfo("Fetching " + url + " to " + tempFile) - var uc: URLConnection = null if (securityMgr.isAuthenticationEnabled()) { logDebug("fetchFile with security enabled") @@ -555,50 +557,41 @@ private[spark] object Utils extends Logging { uc.setReadTimeout(timeout) uc.connect() val in = uc.getInputStream() - downloadFile(url, in, tempFile, targetFile, fileOverwrite) + downloadFile(url, in, targetFile, fileOverwrite) case "file" => // In the case of a local file, copy the local file to the target directory. // Note the difference between uri vs url. val sourceFile = if (uri.isAbsolute) new File(uri) else new File(url) copyFile(url, sourceFile, targetFile, fileOverwrite) case _ => - // Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others val fs = getHadoopFileSystem(uri, hadoopConf) - val in = fs.open(new Path(uri)) - downloadFile(url, in, tempFile, targetFile, fileOverwrite) + val path = new Path(uri) + fetchHcfsFile(path, new File(targetDir, path.getName), fs, conf, securityMgr, hadoopConf, + fileOverwrite) } } /** - * Copies a remote directory from a Hadoop-compatible file system to local disk. + * Fetch a file or directory from a Hadoop-compatible filesystem. */ - def fetchHcfsDir( - url: String, - targetDir: File, - conf: SparkConf, - securityMgr: SecurityManager, - hadoopConf: Configuration): Unit = { - val uri = new URI(url) - val fs = getHadoopFileSystem(uri, hadoopConf) - val path = new Path(uri) - doFetchHcfsDir(path, new File(targetDir, path.getName), fs, conf, securityMgr, hadoopConf) - } - - def doFetchHcfsDir( + private def fetchHcfsFile( path: Path, targetDir: File, fs: FileSystem, conf: SparkConf, securityMgr: SecurityManager, - hadoopConf: Configuration): Unit = { + hadoopConf: Configuration, + fileOverwrite: Boolean): Unit = { targetDir.mkdir() fs.listStatus(path).foreach { fileStatus => val innerPath = fileStatus.getPath if (fileStatus.isDirectory) { - doFetchHcfsDir(innerPath, new File(targetDir, innerPath.getName), fs, conf, securityMgr, - hadoopConf) + fetchHcfsFile(innerPath, new File(targetDir, innerPath.getName), fs, conf, securityMgr, + hadoopConf, fileOverwrite) } else { - doFetchFile(innerPath.toString, targetDir, innerPath.getName, conf, securityMgr, hadoopConf) + val in = fs.open(innerPath) + val targetFile = new File(targetDir, innerPath.getName) + downloadFile(innerPath.toString, in, targetFile, fileOverwrite) } } } diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 84310496985a6..9520cc765dbab 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -88,7 +88,7 @@ class SparkContextSuite extends FunSuite with LocalSparkContext { assert(alien2.createNewFile()) sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) - sc.addDirectory(neptune.getAbsolutePath) + sc.addFile(neptune.getAbsolutePath, true) sc.parallelize(Array(1), 1).map(x => { val sep = File.separator if (!new File(SparkFiles.get("neptune" + sep + "alien1")).exists()) { From 31f15a9594acbf7f996d8604a3d63a44c4f38c78 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Wed, 14 Jan 2015 11:31:25 -0800 Subject: [PATCH 3/9] Use cache recursively and fix some compile errors --- .../org/apache/spark/executor/Executor.scala | 2 +- .../scala/org/apache/spark/util/Utils.scala | 31 +++++++++++++++++-- .../org/apache/spark/SparkContextSuite.scala | 3 +- 3 files changed, 32 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index caf9cffbc428f..0e1530f243713 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -171,7 +171,7 @@ private[spark] class Executor( startGCTime = gcTime try { - val (taskFiles, taskJars, taskDirs, taskBytes) = + val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask) updateDependencies(taskFiles, taskJars) task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 96af38ea9ad70..e84260c5f36c8 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -481,7 +481,7 @@ private[spark] object Utils extends Logging { removeSourceFile: Boolean = false): Unit = { if (destFile.exists) { - if (!Files.equal(sourceFile, destFile)) { + if (!filesEqualRecursive(sourceFile, destFile)) { if (fileOverwrite) { logInfo( s"File $destFile exists and does not match contents of $url, replacing it with $url" @@ -516,7 +516,34 @@ private[spark] object Utils extends Logging { Files.move(sourceFile, destFile) } else { logInfo(s"Copying ${sourceFile.getAbsolutePath} to ${destFile.getAbsolutePath}") - Files.copy(sourceFile, destFile) + copyRecursive(sourceFile, destFile) + } + } + + private def filesEqualRecursive(file1: File, file2: File): Boolean = { + if (file1.isDirectory && file2.isDirectory) { + val subfiles1 = file1.listFiles() + val subfiles2 = file2.listFiles() + if (subfiles1.size != subfiles2.size) { + return false + } + subfiles1.sortBy(_.getName).zip(subfiles2.sortBy(_.getName)).dropWhile { + case (f1, f2) => filesEqualRecursive(f1, f2) + }.isEmpty + } else if (file1.isFile && file2.isFile) { + Files.equal(file1, file2) + } else { + false + } + } + + private def copyRecursive(source: File, dest: File): Unit = { + if (source.isDirectory) { + dest.mkdir() + val subfiles = source.listFiles() + subfiles.foreach(f => copyRecursive(f, new File(dest, f.getName))) + } else { + Files.copy(source, dest) } } diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 9520cc765dbab..a1cd6fba6c673 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -17,10 +17,11 @@ package org.apache.spark +import java.io.File + import org.scalatest.FunSuite import org.apache.hadoop.io.BytesWritable -import java.io.File class SparkContextSuite extends FunSuite with LocalSparkContext { From 1941be3b4759d9c5bcf8881c87256017ee50e40d Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Fri, 16 Jan 2015 01:20:38 -0800 Subject: [PATCH 4/9] Fix test and avoid HTTP server in local mode --- .../main/scala/org/apache/spark/SparkContext.scala | 13 ++++++++----- .../scala/org/apache/spark/SparkContextSuite.scala | 13 ++++++++++++- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index bcb791482f99f..aaa738efa4d6d 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1018,6 +1018,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * supported for Hadoop-supported filesystems. */ def addFile(path: String, recursive: Boolean): Unit = { + val isLocalMode = conf.get("spark.master").startsWith("local") val uri = new URI(path) val schemeCorrectedPath = uri.getScheme match { case null | "local" => "file:" + uri.getPath @@ -1032,8 +1033,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli throw new SparkException(s"Added file $hadoopPath does not exist.") } val isDir = fs.isDirectory(hadoopPath) - if (scheme == "file" && isDir) { - throw new SparkException(s"addFile does not support adding local directories.") + if (!isLocalMode && scheme == "file" && isDir) { + throw new SparkException(s"addFile does not support local directories when not running " + + "local mode.") } if (!recursive && isDir) { throw new SparkException(s"Added file $hadoopPath is a directory and recursive is not " + @@ -1041,9 +1043,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } } - val key = scheme match { - case "file" => env.httpFileServer.addFile(new File(uri.getPath)) - case _ => path + val key = if (!isLocalMode && scheme == "file") { + env.httpFileServer.addFile(new File(uri.getPath)) + } else { + schemeCorrectedPath } val timestamp = System.currentTimeMillis addedFiles(key) = timestamp diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index a1cd6fba6c673..a2e66be0a5ff8 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -113,7 +113,18 @@ class SparkContextSuite extends FunSuite with LocalSparkContext { } } - test("addFile recursive can't add the same directory twice") { + test("addFile recursive can't add directories by default") { + val dir = new File("dir") + try { + sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) + sc.addFile(dir.getAbsolutePath) + assert(false, "should have thrown exception") + } catch { + case _: SparkException => + } finally { + sc.stop() + dir.delete() + } } } From ca83849e79167de1772e153bc27b7a8f8b1d3e4f Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Fri, 16 Jan 2015 17:14:18 -0800 Subject: [PATCH 5/9] Add addFile test --- .../org/apache/spark/SparkContextSuite.scala | 32 ++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index a2e66be0a5ff8..53a7fdbe5ffc8 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark -import java.io.File +import java.io.{File, PrintWriter} import org.scalatest.FunSuite @@ -75,6 +75,36 @@ class SparkContextSuite extends FunSuite with LocalSparkContext { assert(byteArray2.length === 0) } + test("addFile works") { + val file = new File("somefile") + val absolutePath = file.getAbsolutePath + try { + val pw = new PrintWriter(file) + pw.print("somewords") + pw.close() + val length = file.length() + sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) + sc.addFile(file.getAbsolutePath) + sc.parallelize(Array(1), 1).map(x => { + val gotten = new File(SparkFiles.get(file.getName)) + if (!gotten.exists()) { + throw new SparkException("file doesn't exist") + } + if (length != gotten.length()) { + throw new SparkException( + s"file has different length $length than added file ${gotten.length()}") + } + if (absolutePath == gotten.getAbsolutePath) { + throw new SparkException("file should have been copied") + } + x + }).count() + } finally { + sc.stop() + file.delete() + } + } + test("addFile recursive works") { val pluto = new File("pluto") val neptune = new File(pluto, "neptune") From 38bf94dc8dcf39b9b5d8003f5f7d0e0d38bf35e8 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Wed, 28 Jan 2015 14:16:47 -0800 Subject: [PATCH 6/9] Marcelo's comments --- .../main/scala/org/apache/spark/SparkContext.scala | 4 ++-- .../main/scala/org/apache/spark/util/Utils.scala | 14 +++++++------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index aaa738efa4d6d..1b4822bc2e7ff 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -34,7 +34,7 @@ import scala.reflect.{ClassTag, classTag} import akka.actor.Props import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.Path import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable} import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat, @@ -1008,7 +1008,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli addFile(path, false) } - /** + /** * Add a file to be downloaded with this Spark job on every node. * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported * filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs, diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index e84260c5f36c8..bca79ced35526 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -539,7 +539,9 @@ private[spark] object Utils extends Logging { private def copyRecursive(source: File, dest: File): Unit = { if (source.isDirectory) { - dest.mkdir() + if (!dest.mkdir()) { + throw new IOException(s"Failed to create directory ${dest.getPath}") + } val subfiles = source.listFiles() subfiles.foreach(f => copyRecursive(f, new File(dest, f.getName))) } else { @@ -593,8 +595,7 @@ private[spark] object Utils extends Logging { case _ => val fs = getHadoopFileSystem(uri, hadoopConf) val path = new Path(uri) - fetchHcfsFile(path, new File(targetDir, path.getName), fs, conf, securityMgr, hadoopConf, - fileOverwrite) + fetchHcfsFile(path, new File(targetDir, path.getName), fs, conf, hadoopConf, fileOverwrite) } } @@ -606,15 +607,14 @@ private[spark] object Utils extends Logging { targetDir: File, fs: FileSystem, conf: SparkConf, - securityMgr: SecurityManager, hadoopConf: Configuration, fileOverwrite: Boolean): Unit = { targetDir.mkdir() fs.listStatus(path).foreach { fileStatus => val innerPath = fileStatus.getPath - if (fileStatus.isDirectory) { - fetchHcfsFile(innerPath, new File(targetDir, innerPath.getName), fs, conf, securityMgr, - hadoopConf, fileOverwrite) + if (fileStatus.isDir) { + fetchHcfsFile(innerPath, new File(targetDir, innerPath.getName), fs, conf, hadoopConf, + fileOverwrite) } else { val in = fs.open(innerPath) val targetFile = new File(targetDir, innerPath.getName) From 13da824dbaaa37529858f284ba97f381b2f96021 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Fri, 30 Jan 2015 13:00:21 -0800 Subject: [PATCH 7/9] Revert executor changes --- .../main/scala/org/apache/spark/executor/Executor.scala | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 0e1530f243713..312bb3a1daaa3 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -50,7 +50,7 @@ private[spark] class Executor( logInfo(s"Starting executor ID $executorId on host $executorHostname") // Application dependencies (added through SparkContext) that we've fetched so far on this node. - // Each map holds the master's timestamp for the version of that file, JAR, or directory we got. + // Each map holds the master's timestamp for the version of that file or JAR we got. private val currentFiles: HashMap[String, Long] = new HashMap[String, Long]() private val currentJars: HashMap[String, Long] = new HashMap[String, Long]() @@ -171,8 +171,7 @@ private[spark] class Executor( startGCTime = gcTime try { - val (taskFiles, taskJars, taskBytes) = - Task.deserializeWithDependencies(serializedTask) + val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask) updateDependencies(taskFiles, taskJars) task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader) @@ -334,9 +333,7 @@ private[spark] class Executor( * Download any missing dependencies if we receive a new set of files and JARs from the * SparkContext. Also adds any new JARs we fetched to the class loader. */ - private def updateDependencies( - newFiles: HashMap[String, Long], - newJars: HashMap[String, Long]) { + private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) { lazy val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) synchronized { // Fetch missing dependencies From 70cd24dc25e91ecd0b3d8562e0d5f57b2c5988a2 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Fri, 30 Jan 2015 16:34:02 -0800 Subject: [PATCH 8/9] Add another test --- .../scala/org/apache/spark/util/Utils.scala | 4 ++- .../org/apache/spark/SparkContextSuite.scala | 9 +++--- .../org/apache/spark/util/UtilsSuite.scala | 31 +++++++++++++++++++ 3 files changed, 39 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index bca79ced35526..577a97519e593 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -601,8 +601,10 @@ private[spark] object Utils extends Logging { /** * Fetch a file or directory from a Hadoop-compatible filesystem. + * + * Visible for testing */ - private def fetchHcfsFile( + private[spark] def fetchHcfsFile( path: Path, targetDir: File, fs: FileSystem, diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 53a7fdbe5ffc8..20771dd58521e 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -17,7 +17,10 @@ package org.apache.spark -import java.io.{File, PrintWriter} +import java.io.File + +import com.google.common.base.Charsets._ +import com.google.common.io.Files import org.scalatest.FunSuite @@ -79,9 +82,7 @@ class SparkContextSuite extends FunSuite with LocalSparkContext { val file = new File("somefile") val absolutePath = file.getAbsolutePath try { - val pw = new PrintWriter(file) - pw.print("somewords") - pw.close() + Files.write("somewords", file, UTF_8) val length = file.length() sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) sc.addFile(file.getAbsolutePath) diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 4544382094f96..fe2b644251157 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -29,6 +29,9 @@ import com.google.common.base.Charsets.UTF_8 import com.google.common.io.Files import org.scalatest.FunSuite +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + import org.apache.spark.SparkConf class UtilsSuite extends FunSuite with ResetSystemProperties { @@ -381,4 +384,32 @@ class UtilsSuite extends FunSuite with ResetSystemProperties { require(cnt === 2, "prepare should be called twice") require(time < 500, "preparation time should not count") } + + test("fetch hcfs dir") { + val tempDir = Utils.createTempDir() + val innerTempDir = Utils.createTempDir(tempDir.getPath) + val tempFile = File.createTempFile("someprefix", "somesuffix", innerTempDir) + val targetDir = new File("target-dir") + Files.write("some text", tempFile, UTF_8) + + try { + val path = new Path("file://" + tempDir.getAbsolutePath) + val conf = new Configuration() + val fs = Utils.getHadoopFileSystem(path.toString, conf) + Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false) + assert(targetDir.exists()) + assert(targetDir.isDirectory()) + val newInnerDir = new File(targetDir, innerTempDir.getName) + println("inner temp dir: " + innerTempDir.getName) + targetDir.listFiles().map(_.getName).foreach(println) + assert(newInnerDir.exists()) + assert(newInnerDir.isDirectory()) + val newInnerFile = new File(newInnerDir, tempFile.getName) + assert(newInnerFile.exists()) + assert(newInnerFile.isFile()) + } finally { + Utils.deleteRecursively(tempDir) + Utils.deleteRecursively(targetDir) + } + } } From f9fc77fe770b84bc0b451d73d35d26a00200b604 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Mon, 2 Feb 2015 16:47:55 -0800 Subject: [PATCH 9/9] Josh's comments --- .../scala/org/apache/spark/SparkContext.scala | 7 ++- .../scala/org/apache/spark/util/Utils.scala | 10 +++-- .../org/apache/spark/SparkContextSuite.scala | 43 ++++++++----------- 3 files changed, 26 insertions(+), 34 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 1b4822bc2e7ff..24c77eb39eb83 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1018,7 +1018,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * supported for Hadoop-supported filesystems. */ def addFile(path: String, recursive: Boolean): Unit = { - val isLocalMode = conf.get("spark.master").startsWith("local") val uri = new URI(path) val schemeCorrectedPath = uri.getScheme match { case null | "local" => "file:" + uri.getPath @@ -1030,10 +1029,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli if (!Array("http", "https", "ftp").contains(scheme)) { val fs = hadoopPath.getFileSystem(hadoopConfiguration) if (!fs.exists(hadoopPath)) { - throw new SparkException(s"Added file $hadoopPath does not exist.") + throw new FileNotFoundException(s"Added file $hadoopPath does not exist.") } val isDir = fs.isDirectory(hadoopPath) - if (!isLocalMode && scheme == "file" && isDir) { + if (!isLocal && scheme == "file" && isDir) { throw new SparkException(s"addFile does not support local directories when not running " + "local mode.") } @@ -1043,7 +1042,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } } - val key = if (!isLocalMode && scheme == "file") { + val key = if (!isLocal && scheme == "file") { env.httpFileServer.addFile(new File(uri.getPath)) } else { schemeCorrectedPath diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 577a97519e593..bf110d9519d29 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -442,7 +442,7 @@ private[spark] object Utils extends Logging { fileOverwrite: Boolean): Unit = { val tempFile = File.createTempFile("fetchFileTemp", null, new File(destFile.getParentFile.getAbsolutePath)) - logInfo("Fetching " + url + " to " + tempFile) + logInfo(s"Fetching $url to $tempFile") try { val out = new FileOutputStream(tempFile) @@ -527,9 +527,9 @@ private[spark] object Utils extends Logging { if (subfiles1.size != subfiles2.size) { return false } - subfiles1.sortBy(_.getName).zip(subfiles2.sortBy(_.getName)).dropWhile { + subfiles1.sortBy(_.getName).zip(subfiles2.sortBy(_.getName)).forall { case (f1, f2) => filesEqualRecursive(f1, f2) - }.isEmpty + } } else if (file1.isFile && file2.isFile) { Files.equal(file1, file2) } else { @@ -611,7 +611,9 @@ private[spark] object Utils extends Logging { conf: SparkConf, hadoopConf: Configuration, fileOverwrite: Boolean): Unit = { - targetDir.mkdir() + if (!targetDir.mkdir()) { + throw new IOException(s"Failed to create directory ${targetDir.getPath}") + } fs.listStatus(path).foreach { fileStatus => val innerPath = fileStatus.getPath if (fileStatus.isDir) { diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 20771dd58521e..50f347f1954de 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -26,6 +26,8 @@ import org.scalatest.FunSuite import org.apache.hadoop.io.BytesWritable +import org.apache.spark.util.Utils + class SparkContextSuite extends FunSuite with LocalSparkContext { test("Only one SparkContext may be active at a time") { @@ -79,7 +81,7 @@ class SparkContextSuite extends FunSuite with LocalSparkContext { } test("addFile works") { - val file = new File("somefile") + val file = File.createTempFile("someprefix", "somesuffix") val absolutePath = file.getAbsolutePath try { Files.write("somewords", file, UTF_8) @@ -102,60 +104,49 @@ class SparkContextSuite extends FunSuite with LocalSparkContext { }).count() } finally { sc.stop() - file.delete() } } test("addFile recursive works") { - val pluto = new File("pluto") - val neptune = new File(pluto, "neptune") - val saturn = new File(neptune, "saturn") - val alien1 = new File(neptune, "alien1") - val alien2 = new File(saturn, "alien2") + val pluto = Utils.createTempDir() + val neptune = Utils.createTempDir(pluto.getAbsolutePath) + val saturn = Utils.createTempDir(neptune.getAbsolutePath) + val alien1 = File.createTempFile("alien", "1", neptune) + val alien2 = File.createTempFile("alien", "2", saturn) try { - assert(neptune.mkdirs()) - assert(saturn.mkdir()) - assert(alien1.createNewFile()) - assert(alien2.createNewFile()) - sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) sc.addFile(neptune.getAbsolutePath, true) sc.parallelize(Array(1), 1).map(x => { val sep = File.separator - if (!new File(SparkFiles.get("neptune" + sep + "alien1")).exists()) { + if (!new File(SparkFiles.get(neptune.getName + sep + alien1.getName)).exists()) { throw new SparkException("can't access file under root added directory") } - if (!new File(SparkFiles.get("neptune" + sep + "saturn" + sep + "alien2")).exists()) { + if (!new File(SparkFiles.get(neptune.getName + sep + saturn.getName + sep + alien2.getName)) + .exists()) { throw new SparkException("can't access file in nested directory") } - if (new File(SparkFiles.get("pluto" + sep + "neptune" + sep + "alien1")).exists()) { + if (new File(SparkFiles.get(pluto.getName + sep + neptune.getName + sep + alien1.getName)) + .exists()) { throw new SparkException("file exists that shouldn't") } x }).count() } finally { sc.stop() - alien2.delete() - saturn.delete() - alien1.delete() - neptune.delete() - pluto.delete() } } test("addFile recursive can't add directories by default") { - val dir = new File("dir") + val dir = Utils.createTempDir() try { sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) - sc.addFile(dir.getAbsolutePath) - assert(false, "should have thrown exception") - } catch { - case _: SparkException => + intercept[SparkException] { + sc.addFile(dir.getAbsolutePath) + } } finally { sc.stop() - dir.delete() } } }