Skip to content

Commit 2a76ec1

Browse files
committed
[SPARK-17661][SQL] Consolidate various listLeafFiles implementations
1 parent 62ccf27 commit 2a76ec1

File tree

4 files changed

+200
-226
lines changed

4 files changed

+200
-226
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala

Lines changed: 166 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,16 @@
1717

1818
package org.apache.spark.sql.execution.datasources
1919

20-
import java.io.FileNotFoundException
21-
2220
import scala.collection.mutable
2321

24-
import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
22+
import org.apache.hadoop.conf.Configuration
23+
import org.apache.hadoop.fs._
2524
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
2625

26+
import org.apache.spark.internal.Logging
2727
import org.apache.spark.sql.SparkSession
2828
import org.apache.spark.sql.types.StructType
29+
import org.apache.spark.util.SerializableConfiguration
2930

3031

3132
/**
@@ -82,73 +83,177 @@ class ListingFileCatalog(
8283
* This is publicly visible for testing.
8384
*/
8485
def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
85-
if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
86-
HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession)
87-
} else {
88-
// Right now, the number of paths is less than the value of
89-
// parallelPartitionDiscoveryThreshold. So, we will list file statues at the driver.
90-
// If there is any child that has more files than the threshold, we will use parallel
91-
// listing.
92-
93-
// Dummy jobconf to get to the pathFilter defined in configuration
94-
val jobConf = new JobConf(hadoopConf, this.getClass)
95-
val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
96-
97-
val statuses: Seq[FileStatus] = paths.flatMap { path =>
98-
val fs = path.getFileSystem(hadoopConf)
99-
logTrace(s"Listing $path on driver")
100-
101-
val childStatuses = {
102-
try {
103-
val stats = fs.listStatus(path)
104-
if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats
105-
} catch {
106-
case _: FileNotFoundException =>
107-
logWarning(s"The directory $path was not found. Was it deleted very recently?")
108-
Array.empty[FileStatus]
109-
}
110-
}
86+
val files =
87+
if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
88+
ListingFileCatalog.listLeafFilesInParallel(paths, hadoopConf, sparkSession)
89+
} else {
90+
ListingFileCatalog.listLeafFilesInSerial(paths, hadoopConf)
91+
}
92+
93+
mutable.LinkedHashSet(files: _*)
94+
}
95+
96+
override def equals(other: Any): Boolean = other match {
97+
case hdfs: ListingFileCatalog => paths.toSet == hdfs.paths.toSet
98+
case _ => false
99+
}
100+
101+
override def hashCode(): Int = paths.toSet.hashCode()
102+
}
103+
104+
105+
object ListingFileCatalog extends Logging {
111106

112-
childStatuses.map {
113-
case f: LocatedFileStatus => f
114-
115-
// NOTE:
116-
//
117-
// - Although S3/S3A/S3N file system can be quite slow for remote file metadata
118-
// operations, calling `getFileBlockLocations` does no harm here since these file system
119-
// implementations don't actually issue RPC for this method.
120-
//
121-
// - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
122-
// be a big deal since we always use to `listLeafFilesInParallel` when the number of
123-
// paths exceeds threshold.
124-
case f =>
125-
if (f.isDirectory ) {
126-
// If f is a directory, we do not need to call getFileBlockLocations (SPARK-14959).
127-
f
128-
} else {
129-
HadoopFsRelation.createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen))
107+
// `FileStatus` is Writable but not serializable. What make it worse, somehow it doesn't play
108+
// well with `SerializableWritable`. So there seems to be no way to serialize a `FileStatus`.
109+
// Here we use `SerializableFileStatus` to extract key components of a `FileStatus` to serialize
110+
// it from executor side and reconstruct it on driver side.
111+
private case class SerializableBlockLocation(
112+
names: Array[String],
113+
hosts: Array[String],
114+
offset: Long,
115+
length: Long)
116+
117+
private case class SerializableFileStatus(
118+
path: String,
119+
length: Long,
120+
isDir: Boolean,
121+
blockReplication: Short,
122+
blockSize: Long,
123+
modificationTime: Long,
124+
accessTime: Long,
125+
blockLocations: Array[SerializableBlockLocation])
126+
127+
/**
128+
* List a collection of path recursively.
129+
*/
130+
private def listLeafFilesInSerial(
131+
paths: Seq[Path],
132+
hadoopConf: Configuration): Seq[FileStatus] = {
133+
// Dummy jobconf to get to the pathFilter defined in configuration
134+
val jobConf = new JobConf(hadoopConf, this.getClass)
135+
val filter = FileInputFormat.getInputPathFilter(jobConf)
136+
137+
paths.flatMap { path =>
138+
logTrace(s"Listing $path")
139+
val fs = path.getFileSystem(hadoopConf)
140+
listLeafFiles0(fs, fs.getFileStatus(path), filter)
141+
}
142+
}
143+
144+
/**
145+
* List a collection of path recursively in parallel (using Spark executors).
146+
* Each task launched will use [[listLeafFilesInSerial]] to list.
147+
*/
148+
private def listLeafFilesInParallel(
149+
paths: Seq[Path],
150+
hadoopConf: Configuration,
151+
sparkSession: SparkSession): Seq[FileStatus] = {
152+
assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold)
153+
logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")
154+
155+
val sparkContext = sparkSession.sparkContext
156+
val serializableConfiguration = new SerializableConfiguration(hadoopConf)
157+
val serializedPaths = paths.map(_.toString)
158+
159+
// Set the number of parallelism to prevent following file listing from generating many tasks
160+
// in case of large #defaultParallelism.
161+
val numParallelism = Math.min(paths.size, 10000)
162+
163+
val statuses = sparkContext
164+
.parallelize(serializedPaths, numParallelism)
165+
.mapPartitions { paths =>
166+
val hadoopConf = serializableConfiguration.value
167+
listLeafFilesInSerial(paths.map(new Path(_)).toSeq, hadoopConf).iterator
168+
}.map { status =>
169+
// Turn FileStatus into SerializableFileStatus so we can send it back to the driver
170+
val blockLocations = status match {
171+
case f: LocatedFileStatus =>
172+
f.getBlockLocations.map { loc =>
173+
SerializableBlockLocation(
174+
loc.getNames,
175+
loc.getHosts,
176+
loc.getOffset,
177+
loc.getLength)
130178
}
179+
180+
case _ =>
181+
Array.empty[SerializableBlockLocation]
131182
}
132-
}.filterNot { status =>
133-
val name = status.getPath.getName
134-
HadoopFsRelation.shouldFilterOut(name)
135-
}
136183

137-
val (dirs, files) = statuses.partition(_.isDirectory)
184+
SerializableFileStatus(
185+
status.getPath.toString,
186+
status.getLen,
187+
status.isDirectory,
188+
status.getReplication,
189+
status.getBlockSize,
190+
status.getModificationTime,
191+
status.getAccessTime,
192+
blockLocations)
193+
}.collect()
138194

139-
// It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500)
140-
if (dirs.isEmpty) {
141-
mutable.LinkedHashSet(files: _*)
142-
} else {
143-
mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath))
195+
// Turn SerializableFileStatus back to Status
196+
statuses.map { f =>
197+
val blockLocations = f.blockLocations.map { loc =>
198+
new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)
144199
}
200+
new LocatedFileStatus(
201+
new FileStatus(
202+
f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, new Path(f.path)),
203+
blockLocations)
145204
}
146205
}
147206

148-
override def equals(other: Any): Boolean = other match {
149-
case hdfs: ListingFileCatalog => paths.toSet == hdfs.paths.toSet
150-
case _ => false
207+
/**
208+
* List a single path, provided as a FileStatus, in serial.
209+
*/
210+
private def listLeafFiles0(
211+
fs: FileSystem, status: FileStatus, filter: PathFilter): Seq[FileStatus] = {
212+
logTrace(s"Listing ${status.getPath}")
213+
val name = status.getPath.getName.toLowerCase
214+
if (shouldFilterOut(name)) {
215+
Seq.empty[FileStatus]
216+
} else {
217+
val statuses = {
218+
val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDirectory)
219+
val stats = files ++ dirs.flatMap(dir => listLeafFiles0(fs, dir, filter))
220+
if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats
221+
}
222+
// statuses do not have any dirs.
223+
statuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
224+
case f: LocatedFileStatus =>
225+
f
226+
227+
// NOTE:
228+
//
229+
// - Although S3/S3A/S3N file system can be quite slow for remote file metadata
230+
// operations, calling `getFileBlockLocations` does no harm here since these file system
231+
// implementations don't actually issue RPC for this method.
232+
//
233+
// - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
234+
// be a big deal since we always use to `listLeafFilesInParallel` when the number of
235+
// paths exceeds threshold.
236+
case f =>
237+
// The other constructor of LocatedFileStatus will call FileStatus.getPermission(),
238+
// which is very slow on some file system (RawLocalFileSystem, which is launch a
239+
// subprocess and parse the stdout).
240+
val locations = fs.getFileBlockLocations(f, 0, f.getLen)
241+
val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
242+
f.getModificationTime, 0, null, null, null, null, f.getPath, locations)
243+
if (f.isSymlink) {
244+
lfs.setSymlink(f.getSymlink)
245+
}
246+
lfs
247+
}
248+
}
151249
}
152250

153-
override def hashCode(): Int = paths.toSet.hashCode()
251+
/** Checks if we should filter out this path name. */
252+
def shouldFilterOut(pathName: String): Boolean = {
253+
// We filter everything that starts with _ and ., except _common_metadata and _metadata
254+
// because Parquet needs to find those metadata files from leaf files returned by this method.
255+
// We should refactor this logic to not mix metadata files with data files.
256+
((pathName.startsWith("_") && !pathName.contains("=")) || pathName.startsWith(".")) &&
257+
!pathName.startsWith("_common_metadata") && !pathName.startsWith("_metadata")
258+
}
154259
}

0 commit comments

Comments
 (0)