From f1bf4f0385a8e5da14a1d4b01bbbea17b98c4aa3 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 28 Dec 2012 16:13:23 -0800 Subject: [PATCH 1/4] Skip deletion of files in clearFiles(). This fixes an issue where Spark could delete original files in the current working directory that were added to the job using addFile(). There was also the potential for addFile() to overwrite local files, which is addressed by changing Utils.fetchFile() to log a warning instead of overwriting a file with new contents. This is a short-term fix; a better long-term solution would be to remove the dependence on storing files in the current working directory, since we can't change the cwd from Java. --- core/src/main/scala/spark/SparkContext.scala | 9 ++-- core/src/main/scala/spark/Utils.scala | 57 ++++++++++++++------ 2 files changed, 46 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 0afab522af..4fd81bc63b 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -419,8 +419,9 @@ class SparkContext( } addedFiles(key) = System.currentTimeMillis - // Fetch the file locally in case the task is executed locally - val filename = new File(path.split("/").last) + // Fetch the file locally in case a job is executed locally. + // Jobs that run through LocalScheduler will already fetch the required dependencies, + // but jobs run in DAGScheduler.runLocally() will not so we must fetch the files here. Utils.fetchFile(path, new File(".")) logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key)) @@ -437,11 +438,10 @@ class SparkContext( } /** - * Clear the job's list of files added by `addFile` so that they do not get donwloaded to + * Clear the job's list of files added by `addFile` so that they do not get downloaded to * any new nodes. */ def clearFiles() { - addedFiles.keySet.map(_.split("/").last).foreach { k => new File(k).delete() } addedFiles.clear() } @@ -465,7 +465,6 @@ class SparkContext( * any new nodes. */ def clearJars() { - addedJars.keySet.map(_.split("/").last).foreach { k => new File(k).delete() } addedJars.clear() } diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index 6d64b32174..c10b415a93 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -9,6 +9,7 @@ import org.apache.hadoop.fs.{Path, FileSystem, FileUtil} import scala.collection.mutable.ArrayBuffer import scala.collection.JavaConversions._ import scala.io.Source +import com.google.common.io.Files /** * Various utility methods used by Spark. @@ -130,28 +131,47 @@ private object Utils extends Logging { */ def fetchFile(url: String, targetDir: File) { val filename = url.split("/").last + val tempDir = System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")) + val tempFile = File.createTempFile("fetchFileTemp", null, new File(tempDir)) val targetFile = new File(targetDir, filename) val uri = new URI(url) uri.getScheme match { case "http" | "https" | "ftp" => - logInfo("Fetching " + url + " to " + targetFile) + logInfo("Fetching " + url + " to " + tempFile) val in = new URL(url).openStream() - val out = new FileOutputStream(targetFile) + val out = new FileOutputStream(tempFile) Utils.copyStream(in, out, true) + if (targetFile.exists && !Files.equal(tempFile, targetFile)) { + logWarning("File " + targetFile + " exists and does not match contents of " + url + + "; using existing version") + tempFile.delete() + } else { + Files.move(tempFile, targetFile) + } case "file" | null => - // Remove the file if it already exists - targetFile.delete() - // Symlink the file locally. - if (uri.isAbsolute) { - // url is absolute, i.e. it starts with "file:///". Extract the source - // file's absolute path from the url. - val sourceFile = new File(uri) - logInfo("Symlinking " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath) - FileUtil.symLink(sourceFile.getAbsolutePath, targetFile.getAbsolutePath) + val sourceFile = if (uri.isAbsolute) { + new File(uri) + } else { + new File(url) + } + if (targetFile.exists && !Files.equal(sourceFile, targetFile)) { + logWarning("File " + targetFile + " exists and does not match contents of " + url + + "; using existing version") } else { - // url is not absolute, i.e. itself is the path to the source file. - logInfo("Symlinking " + url + " to " + targetFile.getAbsolutePath) - FileUtil.symLink(url, targetFile.getAbsolutePath) + // Remove the file if it already exists + targetFile.delete() + // Symlink the file locally. + if (uri.isAbsolute) { + // url is absolute, i.e. it starts with "file:///". Extract the source + // file's absolute path from the url. + val sourceFile = new File(uri) + logInfo("Symlinking " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath) + FileUtil.symLink(sourceFile.getAbsolutePath, targetFile.getAbsolutePath) + } else { + // url is not absolute, i.e. itself is the path to the source file. + logInfo("Symlinking " + url + " to " + targetFile.getAbsolutePath) + FileUtil.symLink(url, targetFile.getAbsolutePath) + } } case _ => // Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others @@ -159,8 +179,15 @@ private object Utils extends Logging { val conf = new Configuration() val fs = FileSystem.get(uri, conf) val in = fs.open(new Path(uri)) - val out = new FileOutputStream(targetFile) + val out = new FileOutputStream(tempFile) Utils.copyStream(in, out, true) + if (targetFile.exists && !Files.equal(tempFile, targetFile)) { + logWarning("File " + targetFile + " exists and does not match contents of " + url + + "; using existing version") + tempFile.delete() + } else { + Files.move(tempFile, targetFile) + } } // Decompress the file if it's a .tar or .tar.gz if (filename.endsWith(".tar.gz") || filename.endsWith(".tgz")) { From bd237d4a9d7f08eb143b2a2b8636a6a8453225ea Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 28 Dec 2012 16:14:36 -0800 Subject: [PATCH 2/4] Add synchronization to LocalScheduler.updateDependencies(). --- .../scheduler/local/LocalScheduler.scala | 34 ++++++++++--------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala index eb20fe41b2..5d927efb65 100644 --- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala +++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala @@ -108,22 +108,24 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon * SparkContext. Also adds any new JARs we fetched to the class loader. */ private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) { - // Fetch missing dependencies - for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) { - logInfo("Fetching " + name + " with timestamp " + timestamp) - Utils.fetchFile(name, new File(".")) - currentFiles(name) = timestamp - } - for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) { - logInfo("Fetching " + name + " with timestamp " + timestamp) - Utils.fetchFile(name, new File(".")) - currentJars(name) = timestamp - // Add it to our class loader - val localName = name.split("/").last - val url = new File(".", localName).toURI.toURL - if (!classLoader.getURLs.contains(url)) { - logInfo("Adding " + url + " to class loader") - classLoader.addURL(url) + this.synchronized { + // Fetch missing dependencies + for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) { + logInfo("Fetching " + name + " with timestamp " + timestamp) + Utils.fetchFile(name, new File(".")) + currentFiles(name) = timestamp + } + for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) { + logInfo("Fetching " + name + " with timestamp " + timestamp) + Utils.fetchFile(name, new File(".")) + currentJars(name) = timestamp + // Add it to our class loader + val localName = name.split("/").last + val url = new File(".", localName).toURI.toURL + if (!classLoader.getURLs.contains(url)) { + logInfo("Adding " + url + " to class loader") + classLoader.addURL(url) + } } } } From d64fa72d2e4a8290d15e65459337f544e55b3b48 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 28 Dec 2012 16:20:38 -0800 Subject: [PATCH 3/4] Add addFile() and addJar() to JavaSparkContext. --- .../spark/api/java/JavaSparkContext.scala | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/core/src/main/scala/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/spark/api/java/JavaSparkContext.scala index edbb187b1b..b7725313c4 100644 --- a/core/src/main/scala/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/spark/api/java/JavaSparkContext.scala @@ -301,6 +301,40 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork * (in that order of preference). If neither of these is set, return None. */ def getSparkHome(): Option[String] = sc.getSparkHome() + + /** + * Add a file to be downloaded into the working directory of this Spark job on every node. + * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported + * filesystems), or an HTTP, HTTPS or FTP URI. + */ + def addFile(path: String) { + sc.addFile(path) + } + + /** + * Adds a JAR dependency for all tasks to be executed on this SparkContext in the future. + * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported + * filesystems), or an HTTP, HTTPS or FTP URI. + */ + def addJar(path: String) { + sc.addJar(path) + } + + /** + * Clear the job's list of JARs added by `addJar` so that they do not get downloaded to + * any new nodes. + */ + def clearJars() { + sc.clearJars() + } + + /** + * Clear the job's list of files added by `addFile` so that they do not get downloaded to + * any new nodes. + */ + def clearFiles() { + sc.clearFiles() + } } object JavaSparkContext { From 397e67103c18ba22c8c63e9692f0096cd0094797 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 28 Dec 2012 17:37:13 -0800 Subject: [PATCH 4/4] Change Utils.fetchFile() warning to SparkException. --- core/src/main/scala/spark/Utils.scala | 15 +++++++++------ .../spark/scheduler/local/LocalScheduler.scala | 2 +- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index c10b415a93..0e7007459d 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -128,6 +128,9 @@ private object Utils extends Logging { /** * Download a file requested by the executor. Supports fetching the file in a variety of ways, * including HTTP, HDFS and files on a standard filesystem, based on the URL parameter. + * + * Throws SparkException if the target file already exists and has different contents than + * the requested file. */ def fetchFile(url: String, targetDir: File) { val filename = url.split("/").last @@ -142,9 +145,9 @@ private object Utils extends Logging { val out = new FileOutputStream(tempFile) Utils.copyStream(in, out, true) if (targetFile.exists && !Files.equal(tempFile, targetFile)) { - logWarning("File " + targetFile + " exists and does not match contents of " + url + - "; using existing version") tempFile.delete() + throw new SparkException("File " + targetFile + " exists and does not match contents of" + + " " + url) } else { Files.move(tempFile, targetFile) } @@ -155,8 +158,8 @@ private object Utils extends Logging { new File(url) } if (targetFile.exists && !Files.equal(sourceFile, targetFile)) { - logWarning("File " + targetFile + " exists and does not match contents of " + url + - "; using existing version") + throw new SparkException("File " + targetFile + " exists and does not match contents of" + + " " + url) } else { // Remove the file if it already exists targetFile.delete() @@ -182,9 +185,9 @@ private object Utils extends Logging { val out = new FileOutputStream(tempFile) Utils.copyStream(in, out, true) if (targetFile.exists && !Files.equal(tempFile, targetFile)) { - logWarning("File " + targetFile + " exists and does not match contents of " + url + - "; using existing version") tempFile.delete() + throw new SparkException("File " + targetFile + " exists and does not match contents of" + + " " + url) } else { Files.move(tempFile, targetFile) } diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala index 5d927efb65..2593c0e3a0 100644 --- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala +++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala @@ -108,7 +108,7 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon * SparkContext. Also adds any new JARs we fetched to the class loader. */ private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) { - this.synchronized { + synchronized { // Fetch missing dependencies for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) { logInfo("Fetching " + name + " with timestamp " + timestamp)