-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-8125] [SQL] Accelerates Parquet schema merging and partition discovery #7396
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc @marmbrus |
|
Test build #37231 has finished for PR 7396 at commit
|
|
Test build #37232 has finished for PR 7396 at commit
|
|
Test build #37233 has finished for PR 7396 at commit
|
|
One of the test failure above is legitimate, which was caused by making |
|
retest please |
|
retest this please |
|
Test build #37287 has finished for PR 7396 at commit
|
|
Test build #37325 has finished for PR 7396 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be a per relation option?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is one. Defined in object ParquetRelation2, named mergeSchema.
|
retest this please |
|
Test build #37441 has finished for PR 7396 at commit
|
Removes some dead code Parallelizes input paths listing
|
Test build #37492 has finished for PR 7396 at commit
|
|
Test build #37490 has finished for PR 7396 at commit
|
|
Thanks, merging to master! |
This PR tries to accelerate Parquet schema discovery and
HadoopFsRelationpartition discovery. The acceleration is done by the following means:Turning off schema merging by default
Schema merging is not the most common case, but requires reading footers of all Parquet part-files and can be very slow.
Avoiding
FileSystem.globStatus()call when possibleFileSystem.globStatus()may issue multiple synchronous RPC calls, and can be very slow (esp. on S3). This PR addsSparkHadoopUtil.globPathIfNecessary(), which only issues RPC calls when the path contain glob-pattern specific character(s) ({}[]*?\).This is especially useful when converting a metastore Parquet table with lots of partitions, since Spark SQL adds all partition directories as the input paths, and currently we do a
globStatuscall on each input path sequentially.Listing leaf files in parallel when the number of input paths exceeds a threshold
Listing leaf files is required by partition discovery. Currently it is done on driver side, and can be slow when there are lots of (nested) directories, since each
FileSystem.listStatus()call issues an RPC. In this PR, we list leaf files in a BFS style, and resort to a Spark job once we found that the number of directories need to be listed exceed a threshold.The threshold is controlled by
SQLConfoptionspark.sql.sources.parallelPartitionDiscovery.threshold, which defaults to 32.Discovering Parquet schema in parallel
Currently, schema merging is also done on driver side, and needs to read footers of all part-files. This PR uses a Spark job to do schema merging. Together with task side metadata reading in Parquet 1.7.0, we never read any footers on driver side now.