Skip to content

Commit 1fbca41

Browse files
committed
[SPARK-12220][CORE] Make Utils.fetchFile support files that contain special characters
This PR encodes and decodes the file name to fix the issue. Author: Shixiong Zhu <[email protected]> Closes #10208 from zsxwing/uri. (cherry picked from commit 86e405f) Signed-off-by: Shixiong Zhu <[email protected]>
1 parent 41ad8ac commit 1fbca41

File tree

5 files changed

+46
-6
lines changed

5 files changed

+46
-6
lines changed

core/src/main/scala/org/apache/spark/HttpFileServer.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,12 @@ private[spark] class HttpFileServer(
6363

6464
def addFile(file: File) : String = {
6565
addFileToDir(file, fileDir)
66-
serverUri + "/files/" + file.getName
66+
serverUri + "/files/" + Utils.encodeFileNameToURIRawPath(file.getName)
6767
}
6868

6969
def addJar(file: File) : String = {
7070
addFileToDir(file, jarDir)
71-
serverUri + "/jars/" + file.getName
71+
serverUri + "/jars/" + Utils.encodeFileNameToURIRawPath(file.getName)
7272
}
7373

7474
def addFileToDir(file: File, dir: File) : String = {
@@ -80,7 +80,7 @@ private[spark] class HttpFileServer(
8080
throw new IllegalArgumentException(s"$file cannot be a directory.")
8181
}
8282
Files.copy(file, new File(dir, file.getName))
83-
dir + "/" + file.getName
83+
dir + "/" + Utils.encodeFileNameToURIRawPath(file.getName)
8484
}
8585

8686
}

core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap
2222
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
2323
import org.apache.spark.network.server.StreamManager
2424
import org.apache.spark.rpc.RpcEnvFileServer
25+
import org.apache.spark.util.Utils
2526

2627
/**
2728
* StreamManager implementation for serving files from a NettyRpcEnv.
@@ -51,13 +52,13 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
5152
override def addFile(file: File): String = {
5253
require(files.putIfAbsent(file.getName(), file) == null,
5354
s"File ${file.getName()} already registered.")
54-
s"${rpcEnv.address.toSparkURL}/files/${file.getName()}"
55+
s"${rpcEnv.address.toSparkURL}/files/${Utils.encodeFileNameToURIRawPath(file.getName())}"
5556
}
5657

5758
override def addJar(file: File): String = {
5859
require(jars.putIfAbsent(file.getName(), file) == null,
5960
s"JAR ${file.getName()} already registered.")
60-
s"${rpcEnv.address.toSparkURL}/jars/${file.getName()}"
61+
s"${rpcEnv.address.toSparkURL}/jars/${Utils.encodeFileNameToURIRawPath(file.getName())}"
6162
}
6263

6364
}

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,30 @@ private[spark] object Utils extends Logging {
318318
}
319319

320320
/**
321+
* A file name may contain some invalid URI characters, such as " ". This method will convert the
322+
* file name to a raw path accepted by `java.net.URI(String)`.
323+
*
324+
* Note: the file name must not contain "/" or "\"
325+
*/
326+
def encodeFileNameToURIRawPath(fileName: String): String = {
327+
require(!fileName.contains("/") && !fileName.contains("\\"))
328+
// `file` and `localhost` are not used. Just to prevent URI from parsing `fileName` as
329+
// scheme or host. The prefix "/" is required because URI doesn't accept a relative path.
330+
// We should remove it after we get the raw path.
331+
new URI("file", null, "localhost", -1, "/" + fileName, null, null).getRawPath.substring(1)
332+
}
333+
334+
/**
335+
* Get the file name from uri's raw path and decode it. If the raw path of uri ends with "/",
336+
* return the name before the last "/".
337+
*/
338+
def decodeFileNameInURI(uri: URI): String = {
339+
val rawPath = uri.getRawPath
340+
val rawFileName = rawPath.split("/").last
341+
new URI("file:///" + rawFileName).getPath.substring(1)
342+
}
343+
344+
/**
321345
* Download a file or directory to target directory. Supports fetching the file in a variety of
322346
* ways, including HTTP, Hadoop-compatible filesystems, and files on a standard filesystem, based
323347
* on the URL parameter. Fetching directories is only supported from Hadoop-compatible
@@ -338,7 +362,7 @@ private[spark] object Utils extends Logging {
338362
hadoopConf: Configuration,
339363
timestamp: Long,
340364
useCache: Boolean) {
341-
val fileName = url.split("/").last
365+
val fileName = decodeFileNameInURI(new URI(url))
342366
val targetFile = new File(targetDir, fileName)
343367
val fetchCacheEnabled = conf.getBoolean("spark.files.useFetchCache", defaultValue = true)
344368
if (useCache && fetchCacheEnabled) {

core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -771,12 +771,15 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
771771
val tempDir = Utils.createTempDir()
772772
val file = new File(tempDir, "file")
773773
Files.write(UUID.randomUUID().toString(), file, UTF_8)
774+
val fileWithSpecialChars = new File(tempDir, "file name")
775+
Files.write(UUID.randomUUID().toString(), fileWithSpecialChars, UTF_8)
774776
val empty = new File(tempDir, "empty")
775777
Files.write("", empty, UTF_8);
776778
val jar = new File(tempDir, "jar")
777779
Files.write(UUID.randomUUID().toString(), jar, UTF_8)
778780

779781
val fileUri = env.fileServer.addFile(file)
782+
val fileWithSpecialCharsUri = env.fileServer.addFile(fileWithSpecialChars)
780783
val emptyUri = env.fileServer.addFile(empty)
781784
val jarUri = env.fileServer.addJar(jar)
782785

@@ -786,6 +789,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
786789

787790
val files = Seq(
788791
(file, fileUri),
792+
(fileWithSpecialChars, fileWithSpecialCharsUri),
789793
(empty, emptyUri),
790794
(jar, jarUri))
791795
files.foreach { case (f, uri) =>

core/src/test/scala/org/apache/spark/util/UtilsSuite.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -734,4 +734,15 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
734734
conf.set("spark.executor.instances", "0")) === true)
735735
}
736736

737+
test("encodeFileNameToURIRawPath") {
738+
assert(Utils.encodeFileNameToURIRawPath("abc") === "abc")
739+
assert(Utils.encodeFileNameToURIRawPath("abc xyz") === "abc%20xyz")
740+
assert(Utils.encodeFileNameToURIRawPath("abc:xyz") === "abc:xyz")
741+
}
742+
743+
test("decodeFileNameInURI") {
744+
assert(Utils.decodeFileNameInURI(new URI("files:///abc/xyz")) === "xyz")
745+
assert(Utils.decodeFileNameInURI(new URI("files:///abc")) === "abc")
746+
assert(Utils.decodeFileNameInURI(new URI("files:///abc%20xyz")) === "abc xyz")
747+
}
737748
}

0 commit comments

Comments
 (0)