From 5e873cb67513cd2b9906afac7a0bec20ad217677 Mon Sep 17 00:00:00 2001 From: "wangguangxin.cn" Date: Wed, 20 Mar 2019 23:59:20 +0800 Subject: [PATCH] ignore file locality in InMemoryFileIndex --- .../datasources/InMemoryFileIndex.scala | 29 ++++++++++++------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index fe418e610da8f..16c06a4059c61 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.fs._ import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.spark.SparkContext -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{config, Logging} import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.streaming.FileStreamSink @@ -168,10 +168,12 @@ object InMemoryFileIndex extends Logging { filter: PathFilter, sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = { + val ignoreFileLocality = sparkSession.sparkContext.conf.get[Long](config.LOCALITY_WAIT) == 0L + // Short-circuits parallel listing when serial listing is likely to be faster. if (paths.size <= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { return paths.map { path => - (path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession))) + (path, listLeafFiles(path, hadoopConf, filter, ignoreFileLocality, Some(sparkSession))) } } @@ -204,7 +206,7 @@ object InMemoryFileIndex extends Logging { .mapPartitions { pathStrings => val hadoopConf = serializableConfiguration.value pathStrings.map(new Path(_)).toSeq.map { path => - (path, listLeafFiles(path, hadoopConf, filter, None)) + (path, listLeafFiles(path, hadoopConf, filter, ignoreFileLocality, None)) }.iterator }.map { case (path, statuses) => val serializableStatuses = statuses.map { status => @@ -267,6 +269,7 @@ object InMemoryFileIndex extends Logging { path: Path, hadoopConf: Configuration, filter: PathFilter, + ignoreFileLocality: Boolean, sessionOpt: Option[SparkSession]): Seq[FileStatus] = { logTrace(s"Listing $path") val fs = path.getFileSystem(hadoopConf) @@ -288,7 +291,8 @@ object InMemoryFileIndex extends Logging { case Some(session) => bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, session).flatMap(_._2) case _ => - dirs.flatMap(dir => listLeafFiles(dir.getPath, hadoopConf, filter, sessionOpt)) + dirs.flatMap(dir => + listLeafFiles(dir.getPath, hadoopConf, filter, ignoreFileLocality, sessionOpt)) } val allFiles = topLevelFiles ++ nestedFiles if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles @@ -315,14 +319,19 @@ object InMemoryFileIndex extends Logging { // which is very slow on some file system (RawLocalFileSystem, which is launch a // subprocess and parse the stdout). try { - val locations = fs.getFileBlockLocations(f, 0, f.getLen).map { loc => - // Store BlockLocation objects to consume less memory - if (loc.getClass == classOf[BlockLocation]) { - loc + val locations = + if (ignoreFileLocality) { + Array.empty[BlockLocation] } else { - new BlockLocation(loc.getNames, loc.getHosts, loc.getOffset, loc.getLength) + fs.getFileBlockLocations(f, 0, f.getLen).map { loc => + // Store BlockLocation objects to consume less memory + if (loc.getClass == classOf[BlockLocation]) { + loc + } else { + new BlockLocation(loc.getNames, loc.getHosts, loc.getOffset, loc.getLength) + } + } } - } val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, f.getModificationTime, 0, null, null, null, null, f.getPath, locations) if (f.isSymlink) {