Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ import java.io.FileNotFoundException

import scala.collection.mutable

import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration


/**
Expand Down Expand Up @@ -82,73 +85,183 @@ class ListingFileCatalog(
* This is publicly visible for testing.
*/
def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession)
} else {
// Right now, the number of paths is less than the value of
// parallelPartitionDiscoveryThreshold. So, we will list file statues at the driver.
// If there is any child that has more files than the threshold, we will use parallel
// listing.

// Dummy jobconf to get to the pathFilter defined in configuration
val jobConf = new JobConf(hadoopConf, this.getClass)
val pathFilter = FileInputFormat.getInputPathFilter(jobConf)

val statuses: Seq[FileStatus] = paths.flatMap { path =>
val fs = path.getFileSystem(hadoopConf)
logTrace(s"Listing $path on driver")

val childStatuses = {
try {
val stats = fs.listStatus(path)
if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats
} catch {
case _: FileNotFoundException =>
logWarning(s"The directory $path was not found. Was it deleted very recently?")
Array.empty[FileStatus]
}
}
val files =
if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
ListingFileCatalog.listLeafFilesInParallel(paths, hadoopConf, sparkSession)
} else {
ListingFileCatalog.listLeafFilesInSerial(paths, hadoopConf)
}

mutable.LinkedHashSet(files: _*)
}

override def equals(other: Any): Boolean = other match {
case hdfs: ListingFileCatalog => paths.toSet == hdfs.paths.toSet
case _ => false
}

override def hashCode(): Int = paths.toSet.hashCode()
}


object ListingFileCatalog extends Logging {

/** A serializable variant of HDFS's BlockLocation. */
private case class SerializableBlockLocation(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed this from "Fake" to "Serializable" to more accurately describe its purpose.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

names: Array[String],
hosts: Array[String],
offset: Long,
length: Long)

/** A serializable variant of HDFS's FileStatus. */
private case class SerializableFileStatus(
path: String,
length: Long,
isDir: Boolean,
blockReplication: Short,
blockSize: Long,
modificationTime: Long,
accessTime: Long,
blockLocations: Array[SerializableBlockLocation])

/**
* List a collection of path recursively.
*/
private def listLeafFilesInSerial(
paths: Seq[Path],
hadoopConf: Configuration): Seq[FileStatus] = {
// Dummy jobconf to get to the pathFilter defined in configuration
val jobConf = new JobConf(hadoopConf, this.getClass)
val filter = FileInputFormat.getInputPathFilter(jobConf)

paths.flatMap { path =>
val fs = path.getFileSystem(hadoopConf)
listLeafFiles0(fs, path, filter)
}
}

childStatuses.map {
case f: LocatedFileStatus => f

// NOTE:
//
// - Although S3/S3A/S3N file system can be quite slow for remote file metadata
// operations, calling `getFileBlockLocations` does no harm here since these file system
// implementations don't actually issue RPC for this method.
//
// - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
// be a big deal since we always use to `listLeafFilesInParallel` when the number of
// paths exceeds threshold.
case f =>
if (f.isDirectory ) {
// If f is a directory, we do not need to call getFileBlockLocations (SPARK-14959).
f
} else {
HadoopFsRelation.createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen))
/**
* List a collection of path recursively in parallel (using Spark executors).
* Each task launched will use [[listLeafFilesInSerial]] to list.
*/
private def listLeafFilesInParallel(
paths: Seq[Path],
hadoopConf: Configuration,
sparkSession: SparkSession): Seq[FileStatus] = {
assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold)
logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")

val sparkContext = sparkSession.sparkContext
val serializableConfiguration = new SerializableConfiguration(hadoopConf)
val serializedPaths = paths.map(_.toString)

// Set the number of parallelism to prevent following file listing from generating many tasks
// in case of large #defaultParallelism.
val numParallelism = Math.min(paths.size, 10000)

val statuses = sparkContext
.parallelize(serializedPaths, numParallelism)
.mapPartitions { paths =>
val hadoopConf = serializableConfiguration.value
listLeafFilesInSerial(paths.map(new Path(_)).toSeq, hadoopConf).iterator
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is very similar to the old listLeafFilesInParallel, except I replaced the code within this mapPartitions call with listLeafFilesInSerial

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you shouldn't just call listLeafFilesInSerial here. It's more likely that one level down, you're going to have a bunch more directories that you may want to list, where you want more parallelization. You should iteratively list sub directories in parallel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was following the old behavior.

}.map { status =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't you just call map on the iterator but call it on the rdd?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was pre-existing code.

// Turn FileStatus into SerializableFileStatus so we can send it back to the driver
val blockLocations = status match {
case f: LocatedFileStatus =>
f.getBlockLocations.map { loc =>
SerializableBlockLocation(
loc.getNames,
loc.getHosts,
loc.getOffset,
loc.getLength)
}

case _ =>
Array.empty[SerializableBlockLocation]
}
}.filterNot { status =>
val name = status.getPath.getName
HadoopFsRelation.shouldFilterOut(name)
}

val (dirs, files) = statuses.partition(_.isDirectory)
SerializableFileStatus(
status.getPath.toString,
status.getLen,
status.isDirectory,
status.getReplication,
status.getBlockSize,
status.getModificationTime,
status.getAccessTime,
blockLocations)
}.collect()

// It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500)
if (dirs.isEmpty) {
mutable.LinkedHashSet(files: _*)
} else {
mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath))
// Turn SerializableFileStatus back to Status
statuses.map { f =>
val blockLocations = f.blockLocations.map { loc =>
new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)
}
new LocatedFileStatus(
new FileStatus(
f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, new Path(f.path)),
blockLocations)
}
}

override def equals(other: Any): Boolean = other match {
case hdfs: ListingFileCatalog => paths.toSet == hdfs.paths.toSet
case _ => false
/**
* List a single path, provided as a FileStatus, in serial.
*/
private def listLeafFiles0(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is almost the same as the old HadoopFsRelation.listLeafFiles. The old code was:

  def listLeafFiles(fs: FileSystem, status: FileStatus, filter: PathFilter): Array[FileStatus] = {
    logTrace(s"Listing ${status.getPath}")
    val name = status.getPath.getName.toLowerCase
    if (shouldFilterOut(name)) {
      Array.empty[FileStatus]
    } else {
      val statuses = {
        val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDirectory)
        val stats = files ++ dirs.flatMap(dir => listLeafFiles(fs, dir, filter))
        if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats
      }
      // statuses do not have any dirs.
      statuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
        case f: LocatedFileStatus => f

        // NOTE:
        //
        // - Although S3/S3A/S3N file system can be quite slow for remote file metadata
        //   operations, calling `getFileBlockLocations` does no harm here since these file system
        //   implementations don't actually issue RPC for this method.
        //
        // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
        //   be a big deal since we always use to `listLeafFilesInParallel` when the number of
        //   paths exceeds threshold.
        case f => createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen))
      }
    }
  }

  def createLocatedFileStatus(f: FileStatus, locations: Array[BlockLocation]): LocatedFileStatus = {
    // The other constructor of LocatedFileStatus will call FileStatus.getPermission(), which is
    // very slow on some file system (RawLocalFileSystem, which is launch a subprocess and parse the
    // stdout).
    val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
      f.getModificationTime, 0, null, null, null, null, f.getPath, locations)
    if (f.isSymlink) {
      lfs.setSymlink(f.getSymlink)
    }
    lfs
  }

fs: FileSystem, path: Path, filter: PathFilter): Seq[FileStatus] = {
logTrace(s"Listing $path")
val name = path.getName.toLowerCase
if (shouldFilterOut(name)) {
Seq.empty[FileStatus]
} else {
// [SPARK-17599] Prevent ListingFileCatalog from failing if path doesn't exist
// Note that statuses only include FileStatus for the files and dirs directly under path,
// and does not include anything else recursively.
val statuses = try fs.listStatus(path) catch {
case _: FileNotFoundException =>
logWarning(s"The directory $path was not found. Was it deleted very recently?")
Array.empty[FileStatus]
}

val allLeafStatuses = {
val (dirs, files) = statuses.partition(_.isDirectory)
val stats = files ++ dirs.flatMap(dir => listLeafFiles0(fs, dir.getPath, filter))
if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats
}

allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
case f: LocatedFileStatus =>
f

// NOTE:
//
// - Although S3/S3A/S3N file system can be quite slow for remote file metadata
// operations, calling `getFileBlockLocations` does no harm here since these file system
// implementations don't actually issue RPC for this method.
//
// - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
// be a big deal since we always use to `listLeafFilesInParallel` when the number of
// paths exceeds threshold.
case f =>
// The other constructor of LocatedFileStatus will call FileStatus.getPermission(),
// which is very slow on some file system (RawLocalFileSystem, which is launch a
// subprocess and parse the stdout).
val locations = fs.getFileBlockLocations(f, 0, f.getLen)
val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
f.getModificationTime, 0, null, null, null, null, f.getPath, locations)
if (f.isSymlink) {
lfs.setSymlink(f.getSymlink)
}
lfs
}
}
}

override def hashCode(): Int = paths.toSet.hashCode()
/** Checks if we should filter out this path name. */
def shouldFilterOut(pathName: String): Boolean = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is identical to the old code in HadoopFsRelation.shouldFilterOut

// We filter everything that starts with _ and ., except _common_metadata and _metadata
// because Parquet needs to find those metadata files from leaf files returned by this method.
// We should refactor this logic to not mix metadata files with data files.
((pathName.startsWith("_") && !pathName.contains("=")) || pathName.startsWith(".")) &&
!pathName.startsWith("_common_metadata") && !pathName.startsWith("_metadata")
}
}
Loading