Skip to content

Commit 6e9f069

Browse files
author
trystanleftwich
committed
[SPARK-6144]When in cluster mode using ADD JAR with a hdfs:// sourced jar will fail
1 parent 8446ad0 commit 6e9f069

File tree

2 files changed

+18
-4
lines changed

2 files changed

+18
-4
lines changed

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -617,7 +617,8 @@ private[spark] object Utils extends Logging {
617617
case _ =>
618618
val fs = getHadoopFileSystem(uri, hadoopConf)
619619
val path = new Path(uri)
620-
fetchHcfsFile(path, new File(targetDir, path.getName), fs, conf, hadoopConf, fileOverwrite)
620+
fetchHcfsFile(path, targetDir, fs, conf, hadoopConf, fileOverwrite,
621+
Some(filename))
621622
}
622623
}
623624

@@ -632,8 +633,9 @@ private[spark] object Utils extends Logging {
632633
fs: FileSystem,
633634
conf: SparkConf,
634635
hadoopConf: Configuration,
635-
fileOverwrite: Boolean): Unit = {
636-
if (!targetDir.mkdir()) {
636+
fileOverwrite: Boolean,
637+
filename: Option[String] = None): Unit = {
638+
if (!targetDir.exists() && !targetDir.mkdir()) {
637639
throw new IOException(s"Failed to create directory ${targetDir.getPath}")
638640
}
639641
fs.listStatus(path).foreach { fileStatus =>
@@ -643,7 +645,7 @@ private[spark] object Utils extends Logging {
643645
fileOverwrite)
644646
} else {
645647
val in = fs.open(innerPath)
646-
val targetFile = new File(targetDir, innerPath.getName)
648+
val targetFile = new File(targetDir, filename.getOrElse(innerPath.getName))
647649
downloadFile(innerPath.toString, in, targetFile, fileOverwrite)
648650
}
649651
}

core/src/test/scala/org/apache/spark/util/UtilsSuite.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
390390
val innerTempDir = Utils.createTempDir(tempDir.getPath)
391391
val tempFile = File.createTempFile("someprefix", "somesuffix", innerTempDir)
392392
val targetDir = new File("target-dir")
393+
val testFileDir = new File("test-filename")
393394
Files.write("some text", tempFile, UTF_8)
394395

395396
try {
@@ -399,6 +400,8 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
399400
Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false)
400401
assert(targetDir.exists())
401402
assert(targetDir.isDirectory())
403+
// Testing to make sure it doesn't error if the dir already exists
404+
Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false)
402405
val newInnerDir = new File(targetDir, innerTempDir.getName)
403406
println("inner temp dir: " + innerTempDir.getName)
404407
targetDir.listFiles().map(_.getName).foreach(println)
@@ -407,9 +410,18 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
407410
val newInnerFile = new File(newInnerDir, tempFile.getName)
408411
assert(newInnerFile.exists())
409412
assert(newInnerFile.isFile())
413+
val filePath = new Path("file://" + tempFile.getAbsolutePath)
414+
val testFileName = "testFName"
415+
val testFilefs = Utils.getHadoopFileSystem(filePath.toString, conf)
416+
Utils.fetchHcfsFile(filePath, testFileDir, testFilefs, new SparkConf(),
417+
conf, false, Some(testFileName))
418+
val newFileName = new File(testFileDir, testFileName)
419+
assert(newFileName.exists())
420+
assert(newFileName.isFile())
410421
} finally {
411422
Utils.deleteRecursively(tempDir)
412423
Utils.deleteRecursively(targetDir)
424+
Utils.deleteRecursively(testFileDir)
413425
}
414426
}
415427
}

0 commit comments

Comments
 (0)