Skip to content

Commit c22ccc0

Browse files
sryzaJoshRosen
authored andcommitted
SPARK-4687. Add a recursive option to the addFile API
This adds a recursive option to the addFile API to satisfy Hive's needs. It only allows specifying HDFS dirs that will be copied down on every executor. There are a couple outstanding questions. * Should we allow specifying local dirs as well? The best way to do this would probably be to archive them. The drawback is that it would require a fair bit of code that I don't know of any current use cases for. * The addFiles implementation has a caching component that I don't entirely understand. What events are we caching between? AFAICT it's users calling addFile on the same file in the same app at different times? Do we want/need to add something similar for addDirectory. * The addFiles implementation will check to see if an added file already exists and has the same contents. I imagine we want the same behavior, so planning to add this unless people think otherwise. I plan to add some tests if people are OK with the approach. Author: Sandy Ryza <[email protected]> Closes #3670 from sryza/sandy-spark-4687 and squashes the following commits: f9fc77f [Sandy Ryza] Josh's comments 70cd24d [Sandy Ryza] Add another test 13da824 [Sandy Ryza] Revert executor changes 38bf94d [Sandy Ryza] Marcelo's comments ca83849 [Sandy Ryza] Add addFile test 1941be3 [Sandy Ryza] Fix test and avoid HTTP server in local mode 31f15a9 [Sandy Ryza] Use cache recursively and fix some compile errors 0239c3d [Sandy Ryza] Change addDirectory to addFile with recursive 46fe70a [Sandy Ryza] SPARK-4687. Add a addDirectory API (cherry picked from commit c4b1108) Signed-off-by: Josh Rosen <[email protected]>
1 parent 4074674 commit c22ccc0

File tree

4 files changed

+237
-27
lines changed

4 files changed

+237
-27
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 56 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,29 +25,37 @@ import java.net.URI
2525
import java.util.{Arrays, Properties, UUID}
2626
import java.util.concurrent.atomic.AtomicInteger
2727
import java.util.UUID.randomUUID
28+
2829
import scala.collection.{Map, Set}
2930
import scala.collection.JavaConversions._
3031
import scala.collection.generic.Growable
3132
import scala.collection.mutable.HashMap
3233
import scala.reflect.{ClassTag, classTag}
34+
35+
import akka.actor.Props
36+
3337
import org.apache.hadoop.conf.Configuration
3438
import org.apache.hadoop.fs.Path
35-
import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
36-
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat, TextInputFormat}
39+
import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable,
40+
FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
41+
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat,
42+
TextInputFormat}
3743
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob}
3844
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
45+
3946
import org.apache.mesos.MesosNativeLibrary
40-
import akka.actor.Props
4147

4248
import org.apache.spark.annotation.{DeveloperApi, Experimental}
4349
import org.apache.spark.broadcast.Broadcast
4450
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
4551
import org.apache.spark.executor.TriggerThreadDump
46-
import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextFileInputFormat, FixedLengthBinaryInputFormat}
52+
import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextFileInputFormat,
53+
FixedLengthBinaryInputFormat}
4754
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
4855
import org.apache.spark.rdd._
4956
import org.apache.spark.scheduler._
50-
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SparkDeploySchedulerBackend, SimrSchedulerBackend}
57+
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend,
58+
SparkDeploySchedulerBackend, SimrSchedulerBackend}
5159
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
5260
import org.apache.spark.scheduler.local.LocalBackend
5361
import org.apache.spark.storage._
@@ -1016,12 +1024,48 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
10161024
* filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
10171025
* use `SparkFiles.get(fileName)` to find its download location.
10181026
*/
1019-
def addFile(path: String) {
1027+
def addFile(path: String): Unit = {
1028+
addFile(path, false)
1029+
}
1030+
1031+
/**
1032+
* Add a file to be downloaded with this Spark job on every node.
1033+
* The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
1034+
* filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
1035+
* use `SparkFiles.get(fileName)` to find its download location.
1036+
*
1037+
* A directory can be given if the recursive option is set to true. Currently directories are only
1038+
* supported for Hadoop-supported filesystems.
1039+
*/
1040+
def addFile(path: String, recursive: Boolean): Unit = {
10201041
val uri = new URI(path)
1021-
val key = uri.getScheme match {
1022-
case null | "file" => env.httpFileServer.addFile(new File(uri.getPath))
1023-
case "local" => "file:" + uri.getPath
1024-
case _ => path
1042+
val schemeCorrectedPath = uri.getScheme match {
1043+
case null | "local" => "file:" + uri.getPath
1044+
case _ => path
1045+
}
1046+
1047+
val hadoopPath = new Path(schemeCorrectedPath)
1048+
val scheme = new URI(schemeCorrectedPath).getScheme
1049+
if (!Array("http", "https", "ftp").contains(scheme)) {
1050+
val fs = hadoopPath.getFileSystem(hadoopConfiguration)
1051+
if (!fs.exists(hadoopPath)) {
1052+
throw new FileNotFoundException(s"Added file $hadoopPath does not exist.")
1053+
}
1054+
val isDir = fs.isDirectory(hadoopPath)
1055+
if (!isLocal && scheme == "file" && isDir) {
1056+
throw new SparkException(s"addFile does not support local directories when not running " +
1057+
"local mode.")
1058+
}
1059+
if (!recursive && isDir) {
1060+
throw new SparkException(s"Added file $hadoopPath is a directory and recursive is not " +
1061+
"turned on.")
1062+
}
1063+
}
1064+
1065+
val key = if (!isLocal && scheme == "file") {
1066+
env.httpFileServer.addFile(new File(uri.getPath))
1067+
} else {
1068+
schemeCorrectedPath
10251069
}
10261070
val timestamp = System.currentTimeMillis
10271071
addedFiles(key) = timestamp
@@ -1633,8 +1677,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
16331677
val schedulingMode = getSchedulingMode.toString
16341678
val addedJarPaths = addedJars.keys.toSeq
16351679
val addedFilePaths = addedFiles.keys.toSeq
1636-
val environmentDetails =
1637-
SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths, addedFilePaths)
1680+
val environmentDetails = SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths,
1681+
addedFilePaths)
16381682
val environmentUpdate = SparkListenerEnvironmentUpdate(environmentDetails)
16391683
listenerBus.post(environmentUpdate)
16401684
}

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 73 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -386,8 +386,10 @@ private[spark] object Utils extends Logging {
386386
}
387387

388388
/**
389-
* Download a file to target directory. Supports fetching the file in a variety of ways,
390-
* including HTTP, HDFS and files on a standard filesystem, based on the URL parameter.
389+
* Download a file or directory to target directory. Supports fetching the file in a variety of
390+
* ways, including HTTP, Hadoop-compatible filesystems, and files on a standard filesystem, based
391+
* on the URL parameter. Fetching directories is only supported from Hadoop-compatible
392+
* filesystems.
391393
*
392394
* If `useCache` is true, first attempts to fetch the file to a local cache that's shared
393395
* across executors running the same application. `useCache` is used mainly for
@@ -456,17 +458,18 @@ private[spark] object Utils extends Logging {
456458
*
457459
* @param url URL that `sourceFile` originated from, for logging purposes.
458460
* @param in InputStream to download.
459-
* @param tempFile File path to download `in` to.
460461
* @param destFile File path to move `tempFile` to.
461462
* @param fileOverwrite Whether to delete/overwrite an existing `destFile` that does not match
462463
* `sourceFile`
463464
*/
464465
private def downloadFile(
465466
url: String,
466467
in: InputStream,
467-
tempFile: File,
468468
destFile: File,
469469
fileOverwrite: Boolean): Unit = {
470+
val tempFile = File.createTempFile("fetchFileTemp", null,
471+
new File(destFile.getParentFile.getAbsolutePath))
472+
logInfo(s"Fetching $url to $tempFile")
470473

471474
try {
472475
val out = new FileOutputStream(tempFile)
@@ -505,7 +508,7 @@ private[spark] object Utils extends Logging {
505508
removeSourceFile: Boolean = false): Unit = {
506509

507510
if (destFile.exists) {
508-
if (!Files.equal(sourceFile, destFile)) {
511+
if (!filesEqualRecursive(sourceFile, destFile)) {
509512
if (fileOverwrite) {
510513
logInfo(
511514
s"File $destFile exists and does not match contents of $url, replacing it with $url"
@@ -540,13 +543,44 @@ private[spark] object Utils extends Logging {
540543
Files.move(sourceFile, destFile)
541544
} else {
542545
logInfo(s"Copying ${sourceFile.getAbsolutePath} to ${destFile.getAbsolutePath}")
543-
Files.copy(sourceFile, destFile)
546+
copyRecursive(sourceFile, destFile)
547+
}
548+
}
549+
550+
private def filesEqualRecursive(file1: File, file2: File): Boolean = {
551+
if (file1.isDirectory && file2.isDirectory) {
552+
val subfiles1 = file1.listFiles()
553+
val subfiles2 = file2.listFiles()
554+
if (subfiles1.size != subfiles2.size) {
555+
return false
556+
}
557+
subfiles1.sortBy(_.getName).zip(subfiles2.sortBy(_.getName)).forall {
558+
case (f1, f2) => filesEqualRecursive(f1, f2)
559+
}
560+
} else if (file1.isFile && file2.isFile) {
561+
Files.equal(file1, file2)
562+
} else {
563+
false
564+
}
565+
}
566+
567+
private def copyRecursive(source: File, dest: File): Unit = {
568+
if (source.isDirectory) {
569+
if (!dest.mkdir()) {
570+
throw new IOException(s"Failed to create directory ${dest.getPath}")
571+
}
572+
val subfiles = source.listFiles()
573+
subfiles.foreach(f => copyRecursive(f, new File(dest, f.getName)))
574+
} else {
575+
Files.copy(source, dest)
544576
}
545577
}
546578

547579
/**
548-
* Download a file to target directory. Supports fetching the file in a variety of ways,
549-
* including HTTP, HDFS and files on a standard filesystem, based on the URL parameter.
580+
* Download a file or directory to target directory. Supports fetching the file in a variety of
581+
* ways, including HTTP, Hadoop-compatible filesystems, and files on a standard filesystem, based
582+
* on the URL parameter. Fetching directories is only supported from Hadoop-compatible
583+
* filesystems.
550584
*
551585
* Throws SparkException if the target file already exists and has different contents than
552586
* the requested file.
@@ -558,14 +592,11 @@ private[spark] object Utils extends Logging {
558592
conf: SparkConf,
559593
securityMgr: SecurityManager,
560594
hadoopConf: Configuration) {
561-
val tempFile = File.createTempFile("fetchFileTemp", null, new File(targetDir.getAbsolutePath))
562595
val targetFile = new File(targetDir, filename)
563596
val uri = new URI(url)
564597
val fileOverwrite = conf.getBoolean("spark.files.overwrite", defaultValue = false)
565598
Option(uri.getScheme).getOrElse("file") match {
566599
case "http" | "https" | "ftp" =>
567-
logInfo("Fetching " + url + " to " + tempFile)
568-
569600
var uc: URLConnection = null
570601
if (securityMgr.isAuthenticationEnabled()) {
571602
logDebug("fetchFile with security enabled")
@@ -583,17 +614,44 @@ private[spark] object Utils extends Logging {
583614
uc.setReadTimeout(timeout)
584615
uc.connect()
585616
val in = uc.getInputStream()
586-
downloadFile(url, in, tempFile, targetFile, fileOverwrite)
617+
downloadFile(url, in, targetFile, fileOverwrite)
587618
case "file" =>
588619
// In the case of a local file, copy the local file to the target directory.
589620
// Note the difference between uri vs url.
590621
val sourceFile = if (uri.isAbsolute) new File(uri) else new File(url)
591622
copyFile(url, sourceFile, targetFile, fileOverwrite)
592623
case _ =>
593-
// Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
594624
val fs = getHadoopFileSystem(uri, hadoopConf)
595-
val in = fs.open(new Path(uri))
596-
downloadFile(url, in, tempFile, targetFile, fileOverwrite)
625+
val path = new Path(uri)
626+
fetchHcfsFile(path, new File(targetDir, path.getName), fs, conf, hadoopConf, fileOverwrite)
627+
}
628+
}
629+
630+
/**
631+
* Fetch a file or directory from a Hadoop-compatible filesystem.
632+
*
633+
* Visible for testing
634+
*/
635+
private[spark] def fetchHcfsFile(
636+
path: Path,
637+
targetDir: File,
638+
fs: FileSystem,
639+
conf: SparkConf,
640+
hadoopConf: Configuration,
641+
fileOverwrite: Boolean): Unit = {
642+
if (!targetDir.mkdir()) {
643+
throw new IOException(s"Failed to create directory ${targetDir.getPath}")
644+
}
645+
fs.listStatus(path).foreach { fileStatus =>
646+
val innerPath = fileStatus.getPath
647+
if (fileStatus.isDir) {
648+
fetchHcfsFile(innerPath, new File(targetDir, innerPath.getName), fs, conf, hadoopConf,
649+
fileOverwrite)
650+
} else {
651+
val in = fs.open(innerPath)
652+
val targetFile = new File(targetDir, innerPath.getName)
653+
downloadFile(innerPath.toString, in, targetFile, fileOverwrite)
654+
}
597655
}
598656
}
599657

core/src/test/scala/org/apache/spark/SparkContextSuite.scala

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,17 @@
1717

1818
package org.apache.spark
1919

20+
import java.io.File
21+
22+
import com.google.common.base.Charsets._
23+
import com.google.common.io.Files
24+
2025
import org.scalatest.FunSuite
2126

2227
import org.apache.hadoop.io.BytesWritable
2328

29+
import org.apache.spark.util.Utils
30+
2431
class SparkContextSuite extends FunSuite with LocalSparkContext {
2532

2633
test("Only one SparkContext may be active at a time") {
@@ -72,4 +79,74 @@ class SparkContextSuite extends FunSuite with LocalSparkContext {
7279
val byteArray2 = converter.convert(bytesWritable)
7380
assert(byteArray2.length === 0)
7481
}
82+
83+
test("addFile works") {
84+
val file = File.createTempFile("someprefix", "somesuffix")
85+
val absolutePath = file.getAbsolutePath
86+
try {
87+
Files.write("somewords", file, UTF_8)
88+
val length = file.length()
89+
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
90+
sc.addFile(file.getAbsolutePath)
91+
sc.parallelize(Array(1), 1).map(x => {
92+
val gotten = new File(SparkFiles.get(file.getName))
93+
if (!gotten.exists()) {
94+
throw new SparkException("file doesn't exist")
95+
}
96+
if (length != gotten.length()) {
97+
throw new SparkException(
98+
s"file has different length $length than added file ${gotten.length()}")
99+
}
100+
if (absolutePath == gotten.getAbsolutePath) {
101+
throw new SparkException("file should have been copied")
102+
}
103+
x
104+
}).count()
105+
} finally {
106+
sc.stop()
107+
}
108+
}
109+
110+
test("addFile recursive works") {
111+
val pluto = Utils.createTempDir()
112+
val neptune = Utils.createTempDir(pluto.getAbsolutePath)
113+
val saturn = Utils.createTempDir(neptune.getAbsolutePath)
114+
val alien1 = File.createTempFile("alien", "1", neptune)
115+
val alien2 = File.createTempFile("alien", "2", saturn)
116+
117+
try {
118+
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
119+
sc.addFile(neptune.getAbsolutePath, true)
120+
sc.parallelize(Array(1), 1).map(x => {
121+
val sep = File.separator
122+
if (!new File(SparkFiles.get(neptune.getName + sep + alien1.getName)).exists()) {
123+
throw new SparkException("can't access file under root added directory")
124+
}
125+
if (!new File(SparkFiles.get(neptune.getName + sep + saturn.getName + sep + alien2.getName))
126+
.exists()) {
127+
throw new SparkException("can't access file in nested directory")
128+
}
129+
if (new File(SparkFiles.get(pluto.getName + sep + neptune.getName + sep + alien1.getName))
130+
.exists()) {
131+
throw new SparkException("file exists that shouldn't")
132+
}
133+
x
134+
}).count()
135+
} finally {
136+
sc.stop()
137+
}
138+
}
139+
140+
test("addFile recursive can't add directories by default") {
141+
val dir = Utils.createTempDir()
142+
143+
try {
144+
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
145+
intercept[SparkException] {
146+
sc.addFile(dir.getAbsolutePath)
147+
}
148+
} finally {
149+
sc.stop()
150+
}
151+
}
75152
}

core/src/test/scala/org/apache/spark/util/UtilsSuite.scala

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ import com.google.common.base.Charsets.UTF_8
2929
import com.google.common.io.Files
3030
import org.scalatest.FunSuite
3131

32+
import org.apache.hadoop.conf.Configuration
33+
import org.apache.hadoop.fs.Path
34+
3235
import org.apache.spark.SparkConf
3336

3437
class UtilsSuite extends FunSuite with ResetSystemProperties {
@@ -381,4 +384,32 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
381384
require(cnt === 2, "prepare should be called twice")
382385
require(time < 500, "preparation time should not count")
383386
}
387+
388+
test("fetch hcfs dir") {
389+
val tempDir = Utils.createTempDir()
390+
val innerTempDir = Utils.createTempDir(tempDir.getPath)
391+
val tempFile = File.createTempFile("someprefix", "somesuffix", innerTempDir)
392+
val targetDir = new File("target-dir")
393+
Files.write("some text", tempFile, UTF_8)
394+
395+
try {
396+
val path = new Path("file://" + tempDir.getAbsolutePath)
397+
val conf = new Configuration()
398+
val fs = Utils.getHadoopFileSystem(path.toString, conf)
399+
Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false)
400+
assert(targetDir.exists())
401+
assert(targetDir.isDirectory())
402+
val newInnerDir = new File(targetDir, innerTempDir.getName)
403+
println("inner temp dir: " + innerTempDir.getName)
404+
targetDir.listFiles().map(_.getName).foreach(println)
405+
assert(newInnerDir.exists())
406+
assert(newInnerDir.isDirectory())
407+
val newInnerFile = new File(newInnerDir, tempFile.getName)
408+
assert(newInnerFile.exists())
409+
assert(newInnerFile.isFile())
410+
} finally {
411+
Utils.deleteRecursively(tempDir)
412+
Utils.deleteRecursively(targetDir)
413+
}
414+
}
384415
}

0 commit comments

Comments
 (0)