Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.collection.mutable

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}

import org.apache.spark.SparkContext
Expand Down Expand Up @@ -274,7 +275,21 @@ object InMemoryFileIndex extends Logging {
// [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist
// Note that statuses only include FileStatus for the files and dirs directly under path,
// and does not include anything else recursively.
val statuses = try fs.listStatus(path) catch {
val statuses: Array[FileStatus] = try {
fs match {
// DistributedFileSystem overrides listLocatedStatus to make 1 single call to namenode
// to retrieve the file status with the file block location. The reason to still fallback
// to listStatus is because the default implementation would potentially throw a
// FileNotFoundException which is better handled by doing the lookups manually below.
case _: DistributedFileSystem =>
Copy link
Member

@gatorsmile gatorsmile May 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@melin Are you hitting a perf regression? Do we need an internal SQLConf?

cc @JoshRosen

Copy link
Contributor

@JoshRosen JoshRosen May 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless I'm misunderstanding something, I don't think that @melin's posted benchmark is an apples-to-apples comparison: listLocatedStatus is going to be slower than listStatus because it's doing more work in order to get location information. In order for this to be a fair performance comparison you'd want to compare listStatus + getBlockLocations to listLocatedStatus (because we'd end up having to call getBlockLocations further down if we didn't compute the locations as part of listLocatedStatus).

The direct comparison is only useful in cases where we're never going to make use of the location information and to my knowledge we currently do not have a flag to bypass locality fetching in InMemoryFileIndex (so the location-free performance isn't relevant).

Copy link
Member

@dongjoon-hyun dongjoon-hyun May 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm +1 for @JoshRosen 's comment.
And, Yep. Of course, if there is a corner case, we had better have conf for this. For now, it looks like we need more description about what he is hitting. SPARK JIRA would be good for that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before coming up with this change I was actually wondering why there was no conf options to ignore locality and why it was required to do those extra lookups. In situations where the spark job is running on a small set of servers out of a big HDFS cluster, locality is a bit useless as the hit rate is super low, the time to get block locations was getting out of control for us and locality wait per task was slowing down the jobs.

I think this change to get the blocks from the namenode is good enough for us now but I can definitely see the benefit of having a conf option to disable locality entirely which would turn off fetching of getFileBlockLocations and also use listStatus always instead of my optimization to use listLocatedStatus.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please file a JIRA for the further discussions on new conf if it's not filed yet. (I didn't search it, but might be exist in some other context.)

having a conf option to disable locality entirely which would turn off fetching of getFileBlockLocations

val remoteIter = fs.listLocatedStatus(path)
new Iterator[LocatedFileStatus]() {
def next(): LocatedFileStatus = remoteIter.next
def hasNext(): Boolean = remoteIter.hasNext
}.toArray
case _ => fs.listStatus(path)
}
} catch {
case _: FileNotFoundException =>
logWarning(s"The directory $path was not found. Was it deleted very recently?")
Array.empty[FileStatus]
Expand Down