From 40830175b983e71354dfcea79ea02ab7dca1e43e Mon Sep 17 00:00:00 2001 From: rrusso2007 Date: Tue, 21 May 2019 16:42:51 -0700 Subject: [PATCH] Use listLocatedFiles for DistributedFileSystem instead of listFiles with followup listFileBlockLocations --- .../datasources/InMemoryFileIndex.scala | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index db55a06c10822..45cf924266257 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -23,6 +23,7 @@ import scala.collection.mutable import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ +import org.apache.hadoop.hdfs.DistributedFileSystem import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.spark.SparkContext @@ -274,7 +275,21 @@ object InMemoryFileIndex extends Logging { // [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist // Note that statuses only include FileStatus for the files and dirs directly under path, // and does not include anything else recursively. - val statuses = try fs.listStatus(path) catch { + val statuses: Array[FileStatus] = try { + fs match { + // DistributedFileSystem overrides listLocatedStatus to make 1 single call to namenode + // to retrieve the file status with the file block location. The reason to still fallback + // to listStatus is because the default implementation would potentially throw a + // FileNotFoundException which is better handled by doing the lookups manually below. + case _: DistributedFileSystem => + val remoteIter = fs.listLocatedStatus(path) + new Iterator[LocatedFileStatus]() { + def next(): LocatedFileStatus = remoteIter.next + def hasNext(): Boolean = remoteIter.hasNext + }.toArray + case _ => fs.listStatus(path) + } + } catch { case _: FileNotFoundException => logWarning(s"The directory $path was not found. Was it deleted very recently?") Array.empty[FileStatus]