diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index db55a06c10822..45cf924266257 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -23,6 +23,7 @@ import scala.collection.mutable import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ +import org.apache.hadoop.hdfs.DistributedFileSystem import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.spark.SparkContext @@ -274,7 +275,21 @@ object InMemoryFileIndex extends Logging { // [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist // Note that statuses only include FileStatus for the files and dirs directly under path, // and does not include anything else recursively. - val statuses = try fs.listStatus(path) catch { + val statuses: Array[FileStatus] = try { + fs match { + // DistributedFileSystem overrides listLocatedStatus to make 1 single call to namenode + // to retrieve the file status with the file block location. The reason to still fallback + // to listStatus is because the default implementation would potentially throw a + // FileNotFoundException which is better handled by doing the lookups manually below. + case _: DistributedFileSystem => + val remoteIter = fs.listLocatedStatus(path) + new Iterator[LocatedFileStatus]() { + def next(): LocatedFileStatus = remoteIter.next + def hasNext(): Boolean = remoteIter.hasNext + }.toArray + case _ => fs.listStatus(path) + } + } catch { case _: FileNotFoundException => logWarning(s"The directory $path was not found. Was it deleted very recently?") Array.empty[FileStatus]