Skip to content

Commit 3fc74f4

Browse files
Marcelo VanzinAndrew Or
authored andcommitted
[SPARK-6144] [core] Fix addFile when source files are on "hdfs:"
The code failed in two modes: it complained when it tried to re-create a directory that already existed, and it was placing some files in the wrong parent directory. The patch fixes both issues. Author: Marcelo Vanzin <[email protected]> Author: trystanleftwich <[email protected]> Closes #4894 from vanzin/SPARK-6144 and squashes the following commits: 100b3a1 [Marcelo Vanzin] Style fix. 58266aa [Marcelo Vanzin] Fix fetchHcfs file for directories. 91733b7 [trystanleftwich] [SPARK-6144]When in cluster mode using ADD JAR with a hdfs:// sourced jar will fail (cherry picked from commit 3a35a0d) Signed-off-by: Andrew Or <[email protected]>
1 parent bfa4e31 commit 3fc74f4

File tree

2 files changed

+63
-50
lines changed

2 files changed

+63
-50
lines changed

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -617,7 +617,8 @@ private[spark] object Utils extends Logging {
617617
case _ =>
618618
val fs = getHadoopFileSystem(uri, hadoopConf)
619619
val path = new Path(uri)
620-
fetchHcfsFile(path, new File(targetDir, path.getName), fs, conf, hadoopConf, fileOverwrite)
620+
fetchHcfsFile(path, targetDir, fs, conf, hadoopConf, fileOverwrite,
621+
filename = Some(filename))
621622
}
622623
}
623624

@@ -632,19 +633,22 @@ private[spark] object Utils extends Logging {
632633
fs: FileSystem,
633634
conf: SparkConf,
634635
hadoopConf: Configuration,
635-
fileOverwrite: Boolean): Unit = {
636-
if (!targetDir.mkdir()) {
636+
fileOverwrite: Boolean,
637+
filename: Option[String] = None): Unit = {
638+
if (!targetDir.exists() && !targetDir.mkdir()) {
637639
throw new IOException(s"Failed to create directory ${targetDir.getPath}")
638640
}
639-
fs.listStatus(path).foreach { fileStatus =>
640-
val innerPath = fileStatus.getPath
641-
if (fileStatus.isDir) {
642-
fetchHcfsFile(innerPath, new File(targetDir, innerPath.getName), fs, conf, hadoopConf,
643-
fileOverwrite)
644-
} else {
645-
val in = fs.open(innerPath)
646-
val targetFile = new File(targetDir, innerPath.getName)
647-
downloadFile(innerPath.toString, in, targetFile, fileOverwrite)
641+
val dest = new File(targetDir, filename.getOrElse(path.getName))
642+
if (fs.isFile(path)) {
643+
val in = fs.open(path)
644+
try {
645+
downloadFile(path.toString, in, dest, fileOverwrite)
646+
} finally {
647+
in.close()
648+
}
649+
} else {
650+
fs.listStatus(path).foreach { fileStatus =>
651+
fetchHcfsFile(fileStatus.getPath(), dest, fs, conf, hadoopConf, fileOverwrite)
648652
}
649653
}
650654
}

core/src/test/scala/org/apache/spark/util/UtilsSuite.scala

Lines changed: 47 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -208,18 +208,18 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
208208
child1.setLastModified(System.currentTimeMillis() - (1000 * 30))
209209

210210
// although child1 is old, child2 is still new so return true
211-
assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
211+
assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
212212

213213
child2.setLastModified(System.currentTimeMillis - (1000 * 30))
214-
assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
214+
assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
215215

216216
parent.setLastModified(System.currentTimeMillis - (1000 * 30))
217217
// although parent and its immediate children are new, child3 is still old
218218
// we expect a full recursive search for new files.
219-
assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
219+
assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
220220

221221
child3.setLastModified(System.currentTimeMillis - (1000 * 30))
222-
assert(!Utils.doesDirectoryContainAnyNewFiles(parent, 5))
222+
assert(!Utils.doesDirectoryContainAnyNewFiles(parent, 5))
223223
}
224224

225225
test("resolveURI") {
@@ -339,21 +339,21 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
339339
assert(!tempDir1.exists())
340340

341341
val tempDir2 = Utils.createTempDir()
342-
val tempFile1 = new File(tempDir2, "foo.txt")
343-
Files.touch(tempFile1)
344-
assert(tempFile1.exists())
345-
Utils.deleteRecursively(tempFile1)
346-
assert(!tempFile1.exists())
342+
val sourceFile1 = new File(tempDir2, "foo.txt")
343+
Files.touch(sourceFile1)
344+
assert(sourceFile1.exists())
345+
Utils.deleteRecursively(sourceFile1)
346+
assert(!sourceFile1.exists())
347347

348348
val tempDir3 = new File(tempDir2, "subdir")
349349
assert(tempDir3.mkdir())
350-
val tempFile2 = new File(tempDir3, "bar.txt")
351-
Files.touch(tempFile2)
352-
assert(tempFile2.exists())
350+
val sourceFile2 = new File(tempDir3, "bar.txt")
351+
Files.touch(sourceFile2)
352+
assert(sourceFile2.exists())
353353
Utils.deleteRecursively(tempDir2)
354354
assert(!tempDir2.exists())
355355
assert(!tempDir3.exists())
356-
assert(!tempFile2.exists())
356+
assert(!sourceFile2.exists())
357357
}
358358

359359
test("loading properties from file") {
@@ -386,30 +386,39 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
386386
}
387387

388388
test("fetch hcfs dir") {
389-
val tempDir = Utils.createTempDir()
390-
val innerTempDir = Utils.createTempDir(tempDir.getPath)
391-
val tempFile = File.createTempFile("someprefix", "somesuffix", innerTempDir)
392-
val targetDir = new File("target-dir")
393-
Files.write("some text", tempFile, UTF_8)
394-
395-
try {
396-
val path = new Path("file://" + tempDir.getAbsolutePath)
397-
val conf = new Configuration()
398-
val fs = Utils.getHadoopFileSystem(path.toString, conf)
399-
Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false)
400-
assert(targetDir.exists())
401-
assert(targetDir.isDirectory())
402-
val newInnerDir = new File(targetDir, innerTempDir.getName)
403-
println("inner temp dir: " + innerTempDir.getName)
404-
targetDir.listFiles().map(_.getName).foreach(println)
405-
assert(newInnerDir.exists())
406-
assert(newInnerDir.isDirectory())
407-
val newInnerFile = new File(newInnerDir, tempFile.getName)
408-
assert(newInnerFile.exists())
409-
assert(newInnerFile.isFile())
410-
} finally {
411-
Utils.deleteRecursively(tempDir)
412-
Utils.deleteRecursively(targetDir)
413-
}
389+
val sourceDir = Utils.createTempDir()
390+
val innerSourceDir = Utils.createTempDir(root=sourceDir.getPath)
391+
val sourceFile = File.createTempFile("someprefix", "somesuffix", innerSourceDir)
392+
val targetDir = new File(Utils.createTempDir(), "target-dir")
393+
Files.write("some text", sourceFile, UTF_8)
394+
395+
val path = new Path("file://" + sourceDir.getAbsolutePath)
396+
val conf = new Configuration()
397+
val fs = Utils.getHadoopFileSystem(path.toString, conf)
398+
399+
assert(!targetDir.isDirectory())
400+
Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false)
401+
assert(targetDir.isDirectory())
402+
403+
// Copy again to make sure it doesn't error if the dir already exists.
404+
Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false)
405+
406+
val destDir = new File(targetDir, sourceDir.getName())
407+
assert(destDir.isDirectory())
408+
409+
val destInnerDir = new File(destDir, innerSourceDir.getName)
410+
assert(destInnerDir.isDirectory())
411+
412+
val destInnerFile = new File(destInnerDir, sourceFile.getName)
413+
assert(destInnerFile.isFile())
414+
415+
val filePath = new Path("file://" + sourceFile.getAbsolutePath)
416+
val testFileDir = new File("test-filename")
417+
val testFileName = "testFName"
418+
val testFilefs = Utils.getHadoopFileSystem(filePath.toString, conf)
419+
Utils.fetchHcfsFile(filePath, testFileDir, testFilefs, new SparkConf(),
420+
conf, false, Some(testFileName))
421+
val newFileName = new File(testFileDir, testFileName)
422+
assert(newFileName.isFile())
414423
}
415424
}

0 commit comments

Comments
 (0)