diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index db55a06c10822..bd2a44e0b8770 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.streaming.FileStreamSink import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration +import org.apache.spark.util.ThreadUtils.parmap /** @@ -297,7 +298,7 @@ object InMemoryFileIndex extends Logging { val missingFiles = mutable.ArrayBuffer.empty[String] val filteredLeafStatuses = allLeafStatuses.filterNot( status => shouldFilterOut(status.getPath.getName)) - val resolvedLeafStatuses = filteredLeafStatuses.flatMap { + val resolvedLeafStatuses = parmap(filteredLeafStatuses.toSeq, "resolveLeafStatuses", 8) { case f: LocatedFileStatus => Some(f) @@ -334,7 +335,7 @@ object InMemoryFileIndex extends Logging { missingFiles += f.getPath.toString None } - } + }.flatten if (missingFiles.nonEmpty) { logWarning(