Skip to content

Conversation

@rrusso2007
Copy link
Contributor

@rrusso2007 rrusso2007 commented May 22, 2019

What changes were proposed in this pull request?

InMemoryFileIndex.listLeafFiles should use listLocatedStatus for DistributedFileSystem. DistributedFileSystem overrides the listLocatedStatus method in order to do it with 1 single namenode call thus saving thousands of calls to getBlockLocations.

Currently in InMemoryFileIndex, all directory listings are done using FileSystem.listStatus following by individual calls to FileSystem.getFileBlockLocations. This is painstakingly slow for folders that have large numbers of files because this process happens serially and parallelism is only applied at the folder level, not the file level.

FileSystem also provides another API listLocatedStatus which returns the LocatedFileStatus objects that already have the block locations. In FileSystem main class this just delegates to listStatus and getFileBlockLocations similarly to the way Spark does it. However when HDFS specifically is the backing file system, DistributedFileSystem overrides this method and simply makes one single call to the namenode to retrieve the directory listing with the block locations. This avoids potentially thousands or more calls to namenode and also is more consistent because files will either exist with locations or not exist instead of having the FileNotFoundException exception case.

For our example directory with 6500 files, the load time of spark.read.parquet was reduced 96x from 76 seconds to .8 seconds. This savings only goes up with the number of files in the directory.

In the pull request instead of using this method always which could lead to a FileNotFoundException that could be tough to decipher in the default FileSystem implementation, this method is only used when the FileSystem is a DistributedFileSystem and otherwise the old logic still applies.

How was this patch tested?

test suite ran

@JoshRosen
Copy link
Contributor

This is an awesome find 🎉 .

Fellow readers / reviewers, definitely check out the JIRA description, too, since it lists some impressive perf. numbers: https://issues.apache.org/jira/browse/SPARK-27801

JoshRosen added a commit to JoshRosen/spark that referenced this pull request May 22, 2019
@JoshRosen
Copy link
Contributor

Is this behavior affected by Hadoop / HDFS version? For example, is the class DistributedFileSystem present in all Hadoop versions that Spark 3.x will support? Does DistributedFileSystem. listLocatedStatus() listing implement this performance optimization in all Hadoop versions or do some versions fall back to making multiple internal calls (leaving us vulnerable to the file deletion race condition that we discussed upthread)?

It's unfortunate that the bug fixed by #24668 is adding so many edge cases here 😦

@rrusso2007
Copy link
Contributor Author

@JoshRosen I believe all supported versions of Hadoop use this version of listLocatedStatus. Based on the spark documentation page that says only 2.6.5+ is supported I think we're good for that.

@rrusso2007 rrusso2007 changed the title [SPARK-27801] InMemoryFileIndex.listLeafFiles should use listLocatedStatus for DistributedFileSystem [SPARK-27801] Improve performance of InMemoryFileIndex.listLeafFiles for HDFS directories with many files May 23, 2019
@wangyum
Copy link
Member

wangyum commented May 23, 2019

Thank you @rrusso2007 @JoshRosen I did simple benchmark in our production environment(Hadoop version is 2.7.1):

19/05/22 19:53:18 WARN InMemoryFileIndex: Elements: 10. default time token: 41, SPARK-27801 time token: 18, SPARK-27807 time token: 30
19/05/22 19:53:29 WARN InMemoryFileIndex: Elements: 20. default time token: 22, SPARK-27801 time token: 10, SPARK-27807 time token: 24
19/05/22 19:53:30 WARN InMemoryFileIndex: Elements: 50. default time token: 47, SPARK-27801 time token: 13, SPARK-27807 time token: 25
19/05/22 19:53:33 WARN InMemoryFileIndex: Elements: 100. default time token: 54, SPARK-27801 time token: 10, SPARK-27807 time token: 30
19/05/22 19:53:42 WARN InMemoryFileIndex: Elements: 200. default time token: 86, SPARK-27801 time token: 19, SPARK-27807 time token: 40
19/05/22 19:53:52 WARN InMemoryFileIndex: Elements: 500. default time token: 254, SPARK-27801 time token: 30, SPARK-27807 time token: 90
19/05/22 19:54:06 WARN InMemoryFileIndex: Elements: 1000. default time token: 507, SPARK-27801 time token: 165, SPARK-27807 time token: 117
19/05/22 19:54:18 WARN InMemoryFileIndex: Elements: 2000. default time token: 1193, SPARK-27801 time token: 114, SPARK-27807 time token: 216
19/05/22 19:54:34 WARN InMemoryFileIndex: Elements: 5000. default time token: 2401, SPARK-27801 time token: 430, SPARK-27807 time token: 565
19/05/22 19:54:56 WARN InMemoryFileIndex: Elements: 10000. default time token: 4831, SPARK-27801 time token: 646, SPARK-27807 time token: 1202
19/05/22 19:55:40 WARN InMemoryFileIndex: Elements: 20000. default time token: 9121, SPARK-27801 time token: 1535, SPARK-27807 time token: 1920
19/05/22 19:56:45 WARN InMemoryFileIndex: Elements: 40000. default time token: 18873, SPARK-27801 time token: 2784, SPARK-27807 time token: 3997
19/05/22 19:58:18 WARN InMemoryFileIndex: Elements: 80000. default time token: 33658, SPARK-27801 time token: 6476, SPARK-27807 time token: 8326

@rrusso2007
Copy link
Contributor Author

@cloud-fan @srowen

@dongjoon-hyun
Copy link
Member

Retest this please.

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-27801] Improve performance of InMemoryFileIndex.listLeafFiles for HDFS directories with many files [SPARK-27801][SQL] Improve performance of InMemoryFileIndex.listLeafFiles for HDFS directories with many files May 25, 2019
@SparkQA
Copy link

SparkQA commented May 25, 2019

Test build #105781 has finished for PR 24672 at commit 4083017.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Merged to master.

I also tested this in the cluster and saw a lot of improvement.

Thank you for your first contribution, @rrusso2007 . Welcome!
And, thank you for review, @JoshRosen , @srowen , @wangyum .

@melin
Copy link

melin commented May 26, 2019

@RequestMapping(value = "v1/hdfsPref", produces = "text/plain;charset=UTF-8")
    @ResponseBody
    public String hdfsPref(String path) throws Exception {
        DataCenter dataCenter = new DataCenter("hangzhou", "杭州");
        UserGroupInformation userGroupInformation = confLoader.loginUserFromKeytab(dataCenter);
        return userGroupInformation.doAs(new PrivilegedExceptionAction<String>() {

            @Override
            public String run() throws Exception {
                FileSystem fileSystem = confLoader.getFileSystem(dataCenter);
                org.apache.hadoop.fs.Path filePath = new org.apache.hadoop.fs.Path(path);

                Stopwatch stopwatch = Stopwatch.createStarted();
                FileStatus[] fileStatuses = fileSystem.listStatus(filePath);
                stopwatch.stop();
                String msg = "listStatus: " + fileStatuses.length + ", times: " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + "ms";

                stopwatch.reset().start();
                RemoteIterator<LocatedFileStatus> remoteIter = fileSystem.listLocatedStatus(filePath);
                List<LocatedFileStatus> myList = Lists.newArrayList();
                while (remoteIter.hasNext()) {
                    myList.add(remoteIter.next());
                }

                stopwatch.stop();
                msg += "\nlistLocatedStatus: " + myList.size() + ", times: " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + "ms";

                return msg;
            }
        });
    }

test data:
listStatus: 8001, times: 114ms
listLocatedStatus: 8001, times: 807ms

@dongjoon-hyun
Copy link
Member

Please file a JIRA with the context if you want to report something, @melin .

// to retrieve the file status with the file block location. The reason to still fallback
// to listStatus is because the default implementation would potentially throw a
// FileNotFoundException which is better handled by doing the lookups manually below.
case _: DistributedFileSystem =>
Copy link
Member

@gatorsmile gatorsmile May 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@melin Are you hitting a perf regression? Do we need an internal SQLConf?

cc @JoshRosen

Copy link
Contributor

@JoshRosen JoshRosen May 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless I'm misunderstanding something, I don't think that @melin's posted benchmark is an apples-to-apples comparison: listLocatedStatus is going to be slower than listStatus because it's doing more work in order to get location information. In order for this to be a fair performance comparison you'd want to compare listStatus + getBlockLocations to listLocatedStatus (because we'd end up having to call getBlockLocations further down if we didn't compute the locations as part of listLocatedStatus).

The direct comparison is only useful in cases where we're never going to make use of the location information and to my knowledge we currently do not have a flag to bypass locality fetching in InMemoryFileIndex (so the location-free performance isn't relevant).

Copy link
Member

@dongjoon-hyun dongjoon-hyun May 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm +1 for @JoshRosen 's comment.
And, Yep. Of course, if there is a corner case, we had better have conf for this. For now, it looks like we need more description about what he is hitting. SPARK JIRA would be good for that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before coming up with this change I was actually wondering why there was no conf options to ignore locality and why it was required to do those extra lookups. In situations where the spark job is running on a small set of servers out of a big HDFS cluster, locality is a bit useless as the hit rate is super low, the time to get block locations was getting out of control for us and locality wait per task was slowing down the jobs.

I think this change to get the blocks from the namenode is good enough for us now but I can definitely see the benefit of having a conf option to disable locality entirely which would turn off fetching of getFileBlockLocations and also use listStatus always instead of my optimization to use listLocatedStatus.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please file a JIRA for the further discussions on new conf if it's not filed yet. (I didn't search it, but might be exist in some other context.)

having a conf option to disable locality entirely which would turn off fetching of getFileBlockLocations

otterc pushed a commit to linkedin/spark that referenced this pull request Mar 22, 2023
…iles for HDFS directories with many files

InMemoryFileIndex.listLeafFiles should use listLocatedStatus for DistributedFileSystem. DistributedFileSystem overrides the listLocatedStatus method in order to do it with 1 single namenode call thus saving thousands of calls to getBlockLocations.

Currently in InMemoryFileIndex, all directory listings are done using FileSystem.listStatus following by individual calls to FileSystem.getFileBlockLocations. This is painstakingly slow for folders that have large numbers of files because this process happens serially and parallelism is only applied at the folder level, not the file level.

FileSystem also provides another API listLocatedStatus which returns the LocatedFileStatus objects that already have the block locations. In FileSystem main class this just delegates to listStatus and getFileBlockLocations similarly to the way Spark does it. However when HDFS specifically is the backing file system, DistributedFileSystem overrides this method and simply makes one single call to the namenode to retrieve the directory listing with the block locations. This avoids potentially thousands or more calls to namenode and also is more consistent because files will either exist with locations or not exist instead of having the FileNotFoundException exception case.

For our example directory with 6500 files, the load time of spark.read.parquet was reduced 96x from 76 seconds to .8 seconds. This savings only goes up with the number of files in the directory.

In the pull request instead of using this method always which could lead to a FileNotFoundException that could be tough to decipher in the default FileSystem implementation, this method is only used when the FileSystem is a DistributedFileSystem and otherwise the old logic still applies.

test suite ran

Closes apache#24672 from rrusso2007/master.

Authored-by: rrusso2007 <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit ebd1431)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants