Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 20 additions & 12 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,8 @@ private[spark] object Utils extends Logging {
case _ =>
val fs = getHadoopFileSystem(uri, hadoopConf)
val path = new Path(uri)
fetchHcfsFile(path, new File(targetDir, path.getName), fs, conf, hadoopConf, fileOverwrite)
fetchHcfsFile(path, targetDir, fs, conf, hadoopConf, fileOverwrite,
Some(filename))
}
}

Expand All @@ -632,20 +633,27 @@ private[spark] object Utils extends Logging {
fs: FileSystem,
conf: SparkConf,
hadoopConf: Configuration,
fileOverwrite: Boolean): Unit = {
if (!targetDir.mkdir()) {
fileOverwrite: Boolean,
filename: Option[String] = None): Unit = {
if (!targetDir.exists() && !targetDir.mkdir()) {
throw new IOException(s"Failed to create directory ${targetDir.getPath}")
}
fs.listStatus(path).foreach { fileStatus =>
val innerPath = fileStatus.getPath
if (fileStatus.isDir) {
fetchHcfsFile(innerPath, new File(targetDir, innerPath.getName), fs, conf, hadoopConf,
fileOverwrite)
} else {
val in = fs.open(innerPath)
val targetFile = new File(targetDir, innerPath.getName)
downloadFile(innerPath.toString, in, targetFile, fileOverwrite)
if (fs.isDirectory(path)) {
fs.listStatus(path).foreach { fileStatus =>
val innerPath = fileStatus.getPath
if (fileStatus.isDir) {
fetchHcfsFile(innerPath, new File(targetDir, innerPath.getName), fs, conf, hadoopConf,
fileOverwrite)
} else {
val in = fs.open(innerPath)
val targetFile = new File(targetDir, innerPath.getName)
downloadFile(innerPath.toString, in, targetFile, fileOverwrite)
}
}
} else {
val in = fs.open(path)
val targetFile = new File(targetDir, filename.getOrElse(path.getName))
downloadFile(path.toString, in, targetFile, fileOverwrite)
}
}

Expand Down
56 changes: 56 additions & 0 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -389,16 +389,30 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
val tempDir = Utils.createTempDir()
val innerTempDir = Utils.createTempDir(tempDir.getPath)
val tempFile = File.createTempFile("someprefix", "somesuffix", innerTempDir)
val tempFile1 = File.createTempFile("someprefix1", "somesuffix1", innerTempDir)
val tempFile2 = File.createTempFile("someprefix2", "somesuffix2", innerTempDir)
val tempFile3 = File.createTempFile("someprefix3", "somesuffix3", tempDir)
val targetDir = new File("target-dir")
val renameTargetDir = new File("rename-target-dir")
val fileTargetDir = new File("file-target-dir")
val multiFileTargetDir = new File("multi-file-target-dir")
Files.write("some text", tempFile, UTF_8)
Files.write("some text", tempFile1, UTF_8)
Files.write("some text", tempFile2, UTF_8)

try {
val path = new Path("file://" + tempDir.getAbsolutePath)
val conf = new Configuration()
val fs = Utils.getHadoopFileSystem(path.toString, conf)
// Testing subdirs are copied across
Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false)
assert(targetDir.exists())
assert(targetDir.isDirectory())
// Testing to make sure it doesn't error if the dir already exists
Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false)
val newTempFile = new File(targetDir, tempFile3.getName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is where our patches diverge a bit.

The behavior I expect when I upload a directory "foo", is that there will be a directory called "foo" inside the target directory, so that when I do SparkFiles.get("foo") I get the location of that directory.

Your patch seems to place the contents of "foo" under the target dir, which is not what I'd expect.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, fair enough misunderstanding on my behalf, I've tested your patch and it works in my case, which I expected. I'm happy to close this as likewise I just want this solved for the next release.

assert(newTempFile.exists())
assert(newTempFile.isFile())
val newInnerDir = new File(targetDir, innerTempDir.getName)
println("inner temp dir: " + innerTempDir.getName)
targetDir.listFiles().map(_.getName).foreach(println)
Expand All @@ -407,9 +421,51 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
val newInnerFile = new File(newInnerDir, tempFile.getName)
assert(newInnerFile.exists())
assert(newInnerFile.isFile())



// Testing that you can copy a single file over
val filePath = new Path("file://" + tempFile.getAbsolutePath)
val testFilefs = Utils.getHadoopFileSystem(filePath.toString, conf)
Utils.fetchHcfsFile(filePath, fileTargetDir, fs, new SparkConf(),
conf, false)
val newFile = new File(fileTargetDir, tempFile.getName)
assert(newFile.exists())
assert(newFile.isFile())

// Testing that when copying a single file you can rename it
val testFileName = "testFName"
Utils.fetchHcfsFile(filePath, renameTargetDir, testFilefs,
new SparkConf(), conf, false, Some(testFileName))
val newFileName = new File(renameTargetDir, testFileName)
assert(newFileName.exists())
assert(newFileName.isFile())

// Testing that you can copy a dir with files in it and the filenames
// will be correct
val dirPath = new Path("file://" + innerTempDir.getAbsolutePath)
val testDirfs = Utils.getHadoopFileSystem(dirPath.toString, conf)
Utils.fetchHcfsFile(dirPath, multiFileTargetDir, testDirfs, new SparkConf(),
conf, false, Some(testFileName))
var newTmpFile1 = new File(multiFileTargetDir, tempFile.getName)
assert(newTmpFile1.exists())
assert(newTmpFile1.isFile())
var newTmpFile2 = new File(multiFileTargetDir, tempFile1.getName)
assert(newTmpFile2.exists())
assert(newTmpFile2.isFile())
var newTmpFile3 = new File(multiFileTargetDir, tempFile2.getName)
assert(newTmpFile3.exists())
assert(newTmpFile3.isFile())
// Testing that a combination of files and dirs within a dir copies correctly



} finally {
Utils.deleteRecursively(tempDir)
Utils.deleteRecursively(targetDir)
Utils.deleteRecursively(renameTargetDir)
Utils.deleteRecursively(fileTargetDir)
Utils.deleteRecursively(multiFileTargetDir)
}
}
}