Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.hadoop.fs._
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}

import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.streaming.FileStreamSink
Expand Down Expand Up @@ -168,10 +168,12 @@ object InMemoryFileIndex extends Logging {
filter: PathFilter,
sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = {

val ignoreFileLocality = sparkSession.sparkContext.conf.get[Long](config.LOCALITY_WAIT) == 0L
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'd be safer to check the more specific confs as well and only perform this optimization if they're all 0.

private[spark] val LOCALITY_WAIT_PROCESS = ConfigBuilder("spark.locality.wait.process")
.fallbackConf(LOCALITY_WAIT)
private[spark] val LOCALITY_WAIT_NODE = ConfigBuilder("spark.locality.wait.node")
.fallbackConf(LOCALITY_WAIT)
private[spark] val LOCALITY_WAIT_RACK = ConfigBuilder("spark.locality.wait.rack")
.fallbackConf(LOCALITY_WAIT)


// Short-circuits parallel listing when serial listing is likely to be faster.
if (paths.size <= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
return paths.map { path =>
(path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession)))
(path, listLeafFiles(path, hadoopConf, filter, ignoreFileLocality, Some(sparkSession)))
}
}

Expand Down Expand Up @@ -204,7 +206,7 @@ object InMemoryFileIndex extends Logging {
.mapPartitions { pathStrings =>
val hadoopConf = serializableConfiguration.value
pathStrings.map(new Path(_)).toSeq.map { path =>
(path, listLeafFiles(path, hadoopConf, filter, None))
(path, listLeafFiles(path, hadoopConf, filter, ignoreFileLocality, None))
}.iterator
}.map { case (path, statuses) =>
val serializableStatuses = statuses.map { status =>
Expand Down Expand Up @@ -267,6 +269,7 @@ object InMemoryFileIndex extends Logging {
path: Path,
hadoopConf: Configuration,
filter: PathFilter,
ignoreFileLocality: Boolean,
sessionOpt: Option[SparkSession]): Seq[FileStatus] = {
logTrace(s"Listing $path")
val fs = path.getFileSystem(hadoopConf)
Expand All @@ -288,7 +291,8 @@ object InMemoryFileIndex extends Logging {
case Some(session) =>
bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, session).flatMap(_._2)
case _ =>
dirs.flatMap(dir => listLeafFiles(dir.getPath, hadoopConf, filter, sessionOpt))
dirs.flatMap(dir =>
listLeafFiles(dir.getPath, hadoopConf, filter, ignoreFileLocality, sessionOpt))
}
val allFiles = topLevelFiles ++ nestedFiles
if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles
Expand All @@ -315,14 +319,19 @@ object InMemoryFileIndex extends Logging {
// which is very slow on some file system (RawLocalFileSystem, which is launch a
// subprocess and parse the stdout).
try {
val locations = fs.getFileBlockLocations(f, 0, f.getLen).map { loc =>
// Store BlockLocation objects to consume less memory
if (loc.getClass == classOf[BlockLocation]) {
loc
val locations =
if (ignoreFileLocality) {
Array.empty[BlockLocation]
} else {
new BlockLocation(loc.getNames, loc.getHosts, loc.getOffset, loc.getLength)
fs.getFileBlockLocations(f, 0, f.getLen).map { loc =>
// Store BlockLocation objects to consume less memory
if (loc.getClass == classOf[BlockLocation]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part doesn't change in this PR. The new thing here is that we don't look up the block locations, but return an empty array instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sorry, you're right about that. Please disregard.

loc
} else {
new BlockLocation(loc.getNames, loc.getHosts, loc.getOffset, loc.getLength)
}
}
}
}
val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
f.getModificationTime, 0, null, null, null, null, f.getPath, locations)
if (f.isSymlink) {
Expand Down