-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-21971][CORE] Too many open files in Spark due to concurrent fi… #19184
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
hmm, shouldn't we just change system config to increase the limit of open file? |
|
I got into this with the limit of 32K. "unlimited" is another option which can be a workaround for this. But that may not be a preferable option in production systems. For e.g, with Q67 I observed 9000+ spill files in the task. And with multiple tasks per executor, it ended up easily reaching the limits. |
| if (this.din == null) { | ||
| // Good time to init (if all files are opened, we can get Too Many files exception) | ||
| initStreams(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this solve the too many file open issue? When we do merging the readers, it is possibly that all the readers in priority queue still have records and are asked for records (so their files open). You still can encounter too many file open issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. PR has been tried with queries involving window functions (e.g Q67) for which it worked fine.
During spill merges (esp getSortedIterator), it is possible to encounter too many open files issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you need to first describe more about how to fix this issue in the description.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @viirya , we're using priority queue to do merge sort, this will turn out to be all the readers in the priority queue is opened, so still cannot solve this issue.
I think a valid fix is to control the number of concurrent merged files, like MR's io.sort.factor.
Also we still need to address similar issue in ExternalSorter and other places in Shuffle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, this PR does not reduce the number of total open files. Since this PR tries to open files when they are required, this PR may reduce possibility of occurring an error of too may open files.
As @viirya pointed out, it is necessary to provide a feature to control the number of opening files at one point (e.g. priority queue).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The valid fix should be to import a new config to control the concurrent number of opened spill files, it also means you should use some data structure to keep and track the request of open spill files.
|
Test build #81614 has finished for PR 19184 at commit
|
| bufferSizeBytes = DEFAULT_BUFFER_SIZE_BYTES; | ||
| } | ||
|
|
||
| try (InputStream bs = new NioBufferedFileInputStream(file, (int) bufferSizeBytes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a comment here to say we don't need to hold the file open until we actually want to load the records, so we can prevent too many file open issue partially.
| this.blockId = blockId; | ||
| this.serializerManager = serializerManager; | ||
|
|
||
| logger.debug("bufSize: {}, file: {}, records: {}", buffSize, file, this.numRecords); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this log useful? If the number of spill readers is so many, I guess we don't want to see so many log info?
| taskContext.killTaskIfInterrupted(); | ||
| } | ||
| if (this.din == null) { | ||
| // Good time to init (if all files are opened, we can get Too Many files exception) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment looks confusing. Maybe It is the time to initialize and hold the input stream of the spill file for loading records. Keeps the input stream open too early will very possibly encounter too many file open issue.
|
Thanks @viirya . I have updated the patch to address your comments. This fixes the "too many files open" issue for (e.g Q67, Q72, Q14 etc) which involves window functions; but for the merger the issue needs to be addressed still. Agreed that this would be partial patch. |
|
@rajeshbalamohan Thanks for updating. I think we need a complete fix instead of a partial one as previous comments from the reviewers @jerryshao @kiszk @jiangxb1987 suggested. Can you try to fix this according to the comments? Thanks. |
|
Test build #81628 has finished for PR 19184 at commit
|
|
@viirya @jerryshao To take a step back here. This specific issue is applicable to window operations and not to shuffle. In shuffle, you a much larger volume of data written per file vs 4k records per file for window operation. To get to 9k files with shuffle, you are typically processing a TB or more data per shuffle task (unless executor is strapped of memory and spilt large number of files). On other hand, with 4k window size (the default in spark), getting to 9k files is possible within a single task. From what I see, there is actually no functional/performance reason to keep all the files opened, unlike in shuffle. While getting it fixed for all cases would be ideal, the solution for window operation does not transfer to shuffle (and vice versa) due to the difference in the nature of how files are used in both. In case I missed something here, please let me know. |
|
Hi @mridulm , sorry for late response. I agree with you that the scenario is different between here and shuffle, but the underlying structure and solutions to spill data is the same, so the problem is the same. While in the shuffle side, we could control the memory size to hold more data before spilling to avoid too many spills, but as you mentioned here we cannot do it. Yes it is not necessary to open all the files beforehand. But since we're using priority queue to do merge sort, which will make all the file handler opened very likely. And this fix only reduces the chances to encounter too many files issue. Maybe we can call this fix as an intermittent fix, what do you think? |
|
@jerryshao Actually the second half of your comment is not valid in this case.
The primary usecase of this PR is for So this fix is orthogonal to whether we improve sort shuffle or not - the requirement is to get to all tuples. If/when we do improve merge sort, |
|
After discussed with @mridulm offline. Though the patch here cannot address the issue of So this fix could solve the problem of What do you think @viirya @kiszk @maropu @jiangxb1987 ? |
|
Thanks to @jerryshao for pointing me to SPARK-21595. Given the changes in #18843 this PR is no longer relevant. In your tests, you can set the threshold to 512M - since that is the value going forward in spark 2.3 |
|
Thanks @jerryshao and @mridulm for investigating this further. It is very reasonable. I think we don't need this fix as the spill won't be too frequent in window operations now. |
|
Thanks @mridulm , @jerryshao , @viirya . closing this PR. |
…les being opened
What changes were proposed in this pull request?
In UnsafeExternalSorter::getIterator(), for every spillWriter a file is opened in UnsafeSorterSpillReader and these files get closed later point in time as a part of close() call.
However, when large number of spill files are present, number of files opened increases to a great extent and ends up throwing "Too many files" open exception.
This can easily be reproduced with TPC-DS Q67 at 1 TB scale in multi node cluster with multiple cores per executor.
There are ways to reduce the number of spill files that are generated in Q67. E.g, increase "spark.sql.windowExec.buffer.spill.threshold" where 4096 is the default. Another option is to increase ulimit to much higher values.
But those are workarounds.
This PR reduces the number of files that are kept open at in UnsafeSorterSpillReader.
How was this patch tested?
Manual testing of Q67 in 1 TB and 10 TB scale on multi node cluster.