-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-25164][SQL] Avoid rebuilding column and path list for each column in parquet reader #22188
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #95118 has finished for PR 22188 at commit
|
|
LGTM. Will leave here for a bit to see if anyone else comments... |
|
Thanks @vanzin. In my benchmark tests, the tiny degradation (0.5%) in the lower column count cases is pretty consistent, which concerns me a little. I am going to re-run those tests in a different environment and see what happens. Edit: Follow up below. |
|
That does seem counter intuitive, but no idea what could explain that since the new code seems like a straight-forward better version. |
|
OK, I reran the tests for the lower column count cases, and the runs with the patch consistently show a tiny (1-3%) improvement compared to the master branch. So even the lower column count cases benefit a little. |
HyukjinKwon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
gatorsmile
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
| // Check that the requested schema is supported. | ||
| missingColumns = new boolean[requestedSchema.getFieldCount()]; | ||
| List<ColumnDescriptor> columns = requestedSchema.getColumns(); | ||
| List<String[]> paths = requestedSchema.getPaths(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
thanks, merging to master! |
|
@cloud-fan @gatorsmile Should we merge this also onto 2.2? It was a clean cherry-pick for me (from master to branch-2.2), and I ran the top and bottom tests (6000 columns, 1 million rows, 67 32M files, and 60 columns, 100 million rows, 67 32M files) from the PR description and got the same results. |
|
@bersprockets The risk is pretty small I think. I am fine to backport it to the previous versions. Why 2.2 only? |
Only that I forgot that master is already on 2.4. We should do 2.3 as well, but I haven't tested it yet. Do I need to do anything on my end to get it into 2.2, and once I test, into 2.3? |
|
Normally, we do not backport such improvement PRs. However, the risk of this PR is pretty small. I think it is fine. Let me do this. |
|
@gatorsmile Thanks much! |
…umn in parquet reader
## What changes were proposed in this pull request?
VectorizedParquetRecordReader::initializeInternal rebuilds the column list and path list once for each column. Therefore, it indirectly iterates 2\*colCount\*colCount times for each parquet file.
This inefficiency impacts jobs that read parquet-backed tables with many columns and many files. Jobs that read tables with few columns or few files are not impacted.
This PR changes initializeInternal so that it builds each list only once.
I ran benchmarks on my laptop with 1 worker thread, running this query:
<pre>
sql("select * from parquet_backed_table where id1 = 1").collect
</pre>
There are roughly one matching row for every 425 rows, and the matching rows are sprinkled pretty evenly throughout the table (that is, every page for column <code>id1</code> has at least one matching row).
6000 columns, 1 million rows, 67 32M files:
master | branch | improvement
-------|---------|-----------
10.87 min | 6.09 min | 44%
6000 columns, 1 million rows, 23 98m files:
master | branch | improvement
-------|---------|-----------
7.39 min | 5.80 min | 21%
600 columns 10 million rows, 67 32M files:
master | branch | improvement
-------|---------|-----------
1.95 min | 1.96 min | -0.5%
60 columns, 100 million rows, 67 32M files:
master | branch | improvement
-------|---------|-----------
0.55 min | 0.55 min | 0%
## How was this patch tested?
- sql unit tests
- pyspark-sql tests
Closes #22188 from bersprockets/SPARK-25164.
Authored-by: Bruce Robbins <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
…umn in parquet reader
## What changes were proposed in this pull request?
VectorizedParquetRecordReader::initializeInternal rebuilds the column list and path list once for each column. Therefore, it indirectly iterates 2\*colCount\*colCount times for each parquet file.
This inefficiency impacts jobs that read parquet-backed tables with many columns and many files. Jobs that read tables with few columns or few files are not impacted.
This PR changes initializeInternal so that it builds each list only once.
I ran benchmarks on my laptop with 1 worker thread, running this query:
<pre>
sql("select * from parquet_backed_table where id1 = 1").collect
</pre>
There are roughly one matching row for every 425 rows, and the matching rows are sprinkled pretty evenly throughout the table (that is, every page for column <code>id1</code> has at least one matching row).
6000 columns, 1 million rows, 67 32M files:
master | branch | improvement
-------|---------|-----------
10.87 min | 6.09 min | 44%
6000 columns, 1 million rows, 23 98m files:
master | branch | improvement
-------|---------|-----------
7.39 min | 5.80 min | 21%
600 columns 10 million rows, 67 32M files:
master | branch | improvement
-------|---------|-----------
1.95 min | 1.96 min | -0.5%
60 columns, 100 million rows, 67 32M files:
master | branch | improvement
-------|---------|-----------
0.55 min | 0.55 min | 0%
## How was this patch tested?
- sql unit tests
- pyspark-sql tests
Closes #22188 from bersprockets/SPARK-25164.
Authored-by: Bruce Robbins <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
…umn in parquet reader
## What changes were proposed in this pull request?
VectorizedParquetRecordReader::initializeInternal rebuilds the column list and path list once for each column. Therefore, it indirectly iterates 2\*colCount\*colCount times for each parquet file.
This inefficiency impacts jobs that read parquet-backed tables with many columns and many files. Jobs that read tables with few columns or few files are not impacted.
This PR changes initializeInternal so that it builds each list only once.
I ran benchmarks on my laptop with 1 worker thread, running this query:
<pre>
sql("select * from parquet_backed_table where id1 = 1").collect
</pre>
There are roughly one matching row for every 425 rows, and the matching rows are sprinkled pretty evenly throughout the table (that is, every page for column <code>id1</code> has at least one matching row).
6000 columns, 1 million rows, 67 32M files:
master | branch | improvement
-------|---------|-----------
10.87 min | 6.09 min | 44%
6000 columns, 1 million rows, 23 98m files:
master | branch | improvement
-------|---------|-----------
7.39 min | 5.80 min | 21%
600 columns 10 million rows, 67 32M files:
master | branch | improvement
-------|---------|-----------
1.95 min | 1.96 min | -0.5%
60 columns, 100 million rows, 67 32M files:
master | branch | improvement
-------|---------|-----------
0.55 min | 0.55 min | 0%
## How was this patch tested?
- sql unit tests
- pyspark-sql tests
Closes apache#22188 from bersprockets/SPARK-25164.
Authored-by: Bruce Robbins <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
…umn in parquet reader
## What changes were proposed in this pull request?
VectorizedParquetRecordReader::initializeInternal rebuilds the column list and path list once for each column. Therefore, it indirectly iterates 2\*colCount\*colCount times for each parquet file.
This inefficiency impacts jobs that read parquet-backed tables with many columns and many files. Jobs that read tables with few columns or few files are not impacted.
This PR changes initializeInternal so that it builds each list only once.
I ran benchmarks on my laptop with 1 worker thread, running this query:
<pre>
sql("select * from parquet_backed_table where id1 = 1").collect
</pre>
There are roughly one matching row for every 425 rows, and the matching rows are sprinkled pretty evenly throughout the table (that is, every page for column <code>id1</code> has at least one matching row).
6000 columns, 1 million rows, 67 32M files:
master | branch | improvement
-------|---------|-----------
10.87 min | 6.09 min | 44%
6000 columns, 1 million rows, 23 98m files:
master | branch | improvement
-------|---------|-----------
7.39 min | 5.80 min | 21%
600 columns 10 million rows, 67 32M files:
master | branch | improvement
-------|---------|-----------
1.95 min | 1.96 min | -0.5%
60 columns, 100 million rows, 67 32M files:
master | branch | improvement
-------|---------|-----------
0.55 min | 0.55 min | 0%
## How was this patch tested?
- sql unit tests
- pyspark-sql tests
Closes apache#22188 from bersprockets/SPARK-25164.
Authored-by: Bruce Robbins <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
VectorizedParquetRecordReader::initializeInternal rebuilds the column list and path list once for each column. Therefore, it indirectly iterates 2*colCount*colCount times for each parquet file.
This inefficiency impacts jobs that read parquet-backed tables with many columns and many files. Jobs that read tables with few columns or few files are not impacted.
This PR changes initializeInternal so that it builds each list only once.
I ran benchmarks on my laptop with 1 worker thread, running this query:
sql("select * from parquet_backed_table where id1 = 1").collectThere are roughly one matching row for every 425 rows, and the matching rows are sprinkled pretty evenly throughout the table (that is, every page for column
id1has at least one matching row).6000 columns, 1 million rows, 67 32M files:
6000 columns, 1 million rows, 23 98m files:
600 columns 10 million rows, 67 32M files:
60 columns, 100 million rows, 67 32M files:
How was this patch tested?