@@ -106,16 +106,14 @@ class ListingFileCatalog(
106106
107107object ListingFileCatalog extends Logging {
108108
109- // `FileStatus` is Writable but not serializable. What make it worse, somehow it doesn't play
110- // well with `SerializableWritable`. So there seems to be no way to serialize a `FileStatus`.
111- // Here we use `SerializableFileStatus` to extract key components of a `FileStatus` to serialize
112- // it from executor side and reconstruct it on driver side.
109+ /** A serializable variant of HDFS's BlockLocation. */
113110 private case class SerializableBlockLocation (
114111 names : Array [String ],
115112 hosts : Array [String ],
116113 offset : Long ,
117114 length : Long )
118115
116+ /** A serializable variant of HDFS's FileStatus. */
119117 private case class SerializableFileStatus (
120118 path : String ,
121119 length : Long ,
@@ -137,17 +135,8 @@ object ListingFileCatalog extends Logging {
137135 val filter = FileInputFormat .getInputPathFilter(jobConf)
138136
139137 paths.flatMap { path =>
140- logTrace(s " Listing $path" )
141138 val fs = path.getFileSystem(hadoopConf)
142-
143- // [SPARK-17599] Prevent ListingFileCatalog from failing if path doesn't exist
144- val status : Option [FileStatus ] = try Option (fs.getFileStatus(path)) catch {
145- case _ : FileNotFoundException =>
146- logWarning(s " The directory $path was not found. Was it deleted very recently? " )
147- None
148- }
149-
150- status.map(listLeafFiles0(fs, _, filter)).getOrElse(Seq .empty)
139+ listLeafFiles0(fs, path, filter)
151140 }
152141 }
153142
@@ -218,19 +207,28 @@ object ListingFileCatalog extends Logging {
218207 * List a single path, provided as a FileStatus, in serial.
219208 */
220209 private def listLeafFiles0 (
221- fs : FileSystem , status : FileStatus , filter : PathFilter ): Seq [FileStatus ] = {
222- logTrace(s " Listing ${status.getPath} " )
223- val name = status.getPath .getName.toLowerCase
210+ fs : FileSystem , path : Path , filter : PathFilter ): Seq [FileStatus ] = {
211+ logTrace(s " Listing $path " )
212+ val name = path .getName.toLowerCase
224213 if (shouldFilterOut(name)) {
225214 Seq .empty[FileStatus ]
226215 } else {
227- val statuses = {
228- val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDirectory)
229- val stats = files ++ dirs.flatMap(dir => listLeafFiles0(fs, dir, filter))
216+ // [SPARK-17599] Prevent ListingFileCatalog from failing if path doesn't exist
217+ // Note that statuses only include FileStatus for the files and dirs directly under path,
218+ // and does not include anything else recursively.
219+ val statuses = try fs.listStatus(path) catch {
220+ case _ : FileNotFoundException =>
221+ logWarning(s " The directory $path was not found. Was it deleted very recently? " )
222+ Array .empty[FileStatus ]
223+ }
224+
225+ val allLeafStatuses = {
226+ val (dirs, files) = statuses.partition(_.isDirectory)
227+ val stats = files ++ dirs.flatMap(dir => listLeafFiles0(fs, dir.getPath, filter))
230228 if (filter != null ) stats.filter(f => filter.accept(f.getPath)) else stats
231229 }
232- // statuses do not have any dirs.
233- statuses .filterNot(status => shouldFilterOut(status.getPath.getName)).map {
230+
231+ allLeafStatuses .filterNot(status => shouldFilterOut(status.getPath.getName)).map {
234232 case f : LocatedFileStatus =>
235233 f
236234
0 commit comments