Skip to content

Commit 76aa45d

Browse files
xwu0226liancheng
authored andcommitted
[SPARK-14959][SQL] handle partitioned table directories in distributed filesystem
## What changes were proposed in this pull request? ##### The root cause: When `DataSource.resolveRelation` is trying to build `ListingFileCatalog` object, `ListLeafFiles` is invoked where a list of `FileStatus` objects are retrieved from the provided path. These FileStatus objects include directories for the partitions (id=0 and id=2 in the jira). However, these directory `FileStatus` objects also try to invoke `getFileBlockLocations` where directory is not allowed for `DistributedFileSystem`, hence the exception happens. This PR is to remove the block of code that invokes `getFileBlockLocations` for every FileStatus object of the provided path. Instead, we call `HadoopFsRelation.listLeafFiles` directly because this utility method filters out the directories before calling `getFileBlockLocations` for generating `LocatedFileStatus` objects. ## How was this patch tested? Regtest is run. Manual test: ``` scala> spark.read.format("parquet").load("hdfs://bdavm009.svl.ibm.com:8020/user/spark/SPARK-14959_part").show +-----+---+ | text| id| +-----+---+ |hello| 0| |world| 0| |hello| 1| |there| 1| +-----+---+ spark.read.format("orc").load("hdfs://bdavm009.svl.ibm.com:8020/user/spark/SPARK-14959_orc").show +-----+---+ | text| id| +-----+---+ |hello| 0| |world| 0| |hello| 1| |there| 1| +-----+---+ ``` I also tried it with 2 level of partitioning. I have not found a way to add test case in the unit test bucket that can test a real hdfs file location. Any suggestions will be appreciated. Author: Xin Wu <[email protected]> Closes #13463 from xwu0226/SPARK-14959.
1 parent 6dde274 commit 76aa45d

File tree

3 files changed

+14
-33
lines changed

3 files changed

+14
-33
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala

Lines changed: 3 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -83,40 +83,10 @@ class ListingFileCatalog(
8383
val statuses: Seq[FileStatus] = paths.flatMap { path =>
8484
val fs = path.getFileSystem(hadoopConf)
8585
logInfo(s"Listing $path on driver")
86-
87-
val statuses = {
88-
val stats = Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus])
89-
if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats
90-
}
91-
92-
statuses.map {
93-
case f: LocatedFileStatus => f
94-
95-
// NOTE:
96-
//
97-
// - Although S3/S3A/S3N file system can be quite slow for remote file metadata
98-
// operations, calling `getFileBlockLocations` does no harm here since these file system
99-
// implementations don't actually issue RPC for this method.
100-
//
101-
// - Here we are calling `getFileBlockLocations` in a sequential manner, but it should a
102-
// a big deal since we always use to `listLeafFilesInParallel` when the number of paths
103-
// exceeds threshold.
104-
case f =>
105-
HadoopFsRelation.createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen))
106-
}
107-
}.filterNot { status =>
108-
val name = status.getPath.getName
109-
HadoopFsRelation.shouldFilterOut(name)
110-
}
111-
112-
val (dirs, files) = statuses.partition(_.isDirectory)
113-
114-
// It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500)
115-
if (dirs.isEmpty) {
116-
mutable.LinkedHashSet(files: _*)
117-
} else {
118-
mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath))
86+
Try(HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), pathFilter)).
87+
getOrElse(Array.empty)
11988
}
89+
mutable.LinkedHashSet(statuses: _*)
12090
}
12191
}
12292

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,16 @@ private[sql] object HadoopFsRelation extends Logging {
381381
}
382382
statuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
383383
case f: LocatedFileStatus => f
384+
385+
// NOTE:
386+
//
387+
// - Although S3/S3A/S3N file system can be quite slow for remote file metadata
388+
// operations, calling `getFileBlockLocations` does no harm here since these file system
389+
// implementations don't actually issue RPC for this method.
390+
//
391+
// - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
392+
// be a big deal since we always use to `listLeafFilesInParallel` when the number of
393+
// paths exceeds threshold.
384394
case f => createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen))
385395
}
386396
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,7 @@ class LocalityTestFileSystem extends RawLocalFileSystem {
490490

491491
override def getFileBlockLocations(
492492
file: FileStatus, start: Long, len: Long): Array[BlockLocation] = {
493+
require(!file.isDirectory, "The file path can not be a directory.")
493494
val count = invocations.getAndAdd(1)
494495
Array(new BlockLocation(Array(s"host$count:50010"), Array(s"host$count"), 0, len))
495496
}

0 commit comments

Comments
 (0)