-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-20364][SQL] Support Parquet predicate pushdown on columns with dots #17680
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc @ash211, @robert3005 and @liancheng. @liancheng, do you mind if I ask to review this please? |
|
Test build #75932 has finished for PR 17680 at commit
|
ash211
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like it fixes the issue I reported (last test confirms that) but I'm worried it might have caused a regression in pushdown on struct columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there another existing test that checks that pushdown for struct.field1 syntax works correctly? I'm not sure how to reference those inner fields in a struct field as I don't use it much personally, but want to make sure that's not broken as a result of this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Up to my knolwedge, we don't push down filters with nested columns. Let me check if we already have the negative case explicitly and then add it if missing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the change -- I wasn't sure if predicate pushdown worked on nested columns and it looks like that change confirms it does not after this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please also do the check for IS NULL having 1 row too, so this is symmetric
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, IS NULL is not the problem here to users.
val path = "/tmp/abc"
Seq(Some(1), None).toDF("col.dots").write.parquet(path)
spark.read.parquet(path).where("`col.dots` IS NULL").show()+--------+
|col.dots|
+--------+
| null|
+--------+
The reason is Parquet internally produces null permissively if the column does not exist after we upgrade it to 1.8.2 so it evaluates it as true in this case AFAIK. If this reason should be verified, I will look further. But in terms of the output, the issue is not reproduced.
I could add a test only when record-by-record filter is enabled though after stripping the Spark-side filter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for adding the additional test below
|
I added a test case so that we make sure it does not push down filters in |
|
cc @davies too. |
|
Test build #75962 has finished for PR 17680 at commit
|
|
Test build #75963 has finished for PR 17680 at commit
|
|
Test build #75964 has finished for PR 17680 at commit
|
ash211
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great!
Now we need someone with merge permissions to review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: pushdown
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the change -- I wasn't sure if predicate pushdown worked on nested columns and it looks like that change confirms it does not after this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for adding the additional test below
|
@ash211, Thanks for your approval. |
|
Test build #76006 has finished for PR 17680 at commit
|
|
gentle ping @liancheng and @davies. |
|
@liancheng and @davies, if you are not sure of this way, I could simply avoid to push down the filters in this case for now. Please let me know. |
|
Any further thoughts on this? It was quite surprising for one of our users so I wanted to make sure it was fixed in a future Apache release |
|
gentle ping @liancheng and @davies |
1 similar comment
|
gentle ping @liancheng and @davies |
|
friendly ping ... |
|
Are there any comments on this PR or is it ready to be merged? |
|
gentle ping ... |
1 similar comment
|
gentle ping ... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of duplicating the codes, please write a helper function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to explain the functions in this object ParquetColumns are based on the codes in org.apache.parquet.filter2.predicate. Thus, when upgrading the Parquet versions, we need to check whether they are still the same.
|
This sounds from the JIRA: https://issues.apache.org/jira/browse/PARQUET-389 (apache/parquet-java#354) after we upgrading to Parquet 1.8.2. Maybe @rdblue can help us review this PR? Thanks! |
|
@HyukjinKwon This PR only verifies the behavior when column names have |
|
@HyukjinKwon Could we just stop pushing down the predicates that involve the column names containing the dots? It will be a safe/simple fix. |
|
Yes, it looks related with that, in particular, here #17680 (comment). Up to my knowledge, we don't support pushing down filters with nested column access and we already have this assumption in Sure, this tries latter case as described in the PR description. Probably, let me open another one for the former approach (simply avoid) as a simple workaround for now. |
|
Yes. Please open the PR to stop predicate push-down for this corner cases. Will review it when it is done. |
|
Test build #76951 has finished for PR 17680 at commit
|
…ng dots in the names ## What changes were proposed in this pull request? This is an alternative workaround by simply avoiding the predicate pushdown for columns having dots in the names. This is an approach different with #17680. The downside of this PR is, literally it does not push down filters on the column having dots in Parquet files at all (both no record level and no rowgroup level) whereas the downside of the approach in that PR, it does not use the Parquet's API properly but in a hacky way to support this case. I assume we prefer a safe way here by using the Parquet API properly but this does close that PR as we are basically just avoiding here. This way looks a simple workaround and probably it is fine given the problem looks arguably rather corner cases (although it might end up with reading whole row groups under the hood but either looks not the best). Currently, if there are dots in the column name, predicate pushdown seems being failed in Parquet. **With dots** ```scala val path = "/tmp/abcde" Seq(Some(1), None).toDF("col.dots").write.parquet(path) spark.read.parquet(path).where("`col.dots` IS NOT NULL").show() ``` ``` +--------+ |col.dots| +--------+ +--------+ ``` **Without dots** ```scala val path = "/tmp/abcde" Seq(Some(1), None).toDF("coldots").write.parquet(path) spark.read.parquet(path).where("`coldots` IS NOT NULL").show() ``` ``` +-------+ |coldots| +-------+ | 1| +-------+ ``` **After** ```scala val path = "/tmp/abcde" Seq(Some(1), None).toDF("col.dots").write.parquet(path) spark.read.parquet(path).where("`col.dots` IS NOT NULL").show() ``` ``` +--------+ |col.dots| +--------+ | 1| +--------+ ``` ## How was this patch tested? Unit tests added in `ParquetFilterSuite`. Author: hyukjinkwon <[email protected]> Closes #18000 from HyukjinKwon/SPARK-20364-workaround.
…ng dots in the names ## What changes were proposed in this pull request? This is an alternative workaround by simply avoiding the predicate pushdown for columns having dots in the names. This is an approach different with #17680. The downside of this PR is, literally it does not push down filters on the column having dots in Parquet files at all (both no record level and no rowgroup level) whereas the downside of the approach in that PR, it does not use the Parquet's API properly but in a hacky way to support this case. I assume we prefer a safe way here by using the Parquet API properly but this does close that PR as we are basically just avoiding here. This way looks a simple workaround and probably it is fine given the problem looks arguably rather corner cases (although it might end up with reading whole row groups under the hood but either looks not the best). Currently, if there are dots in the column name, predicate pushdown seems being failed in Parquet. **With dots** ```scala val path = "/tmp/abcde" Seq(Some(1), None).toDF("col.dots").write.parquet(path) spark.read.parquet(path).where("`col.dots` IS NOT NULL").show() ``` ``` +--------+ |col.dots| +--------+ +--------+ ``` **Without dots** ```scala val path = "/tmp/abcde" Seq(Some(1), None).toDF("coldots").write.parquet(path) spark.read.parquet(path).where("`coldots` IS NOT NULL").show() ``` ``` +-------+ |coldots| +-------+ | 1| +-------+ ``` **After** ```scala val path = "/tmp/abcde" Seq(Some(1), None).toDF("col.dots").write.parquet(path) spark.read.parquet(path).where("`col.dots` IS NOT NULL").show() ``` ``` +--------+ |col.dots| +--------+ | 1| +--------+ ``` ## How was this patch tested? Unit tests added in `ParquetFilterSuite`. Author: hyukjinkwon <[email protected]> Closes #18000 from HyukjinKwon/SPARK-20364-workaround. (cherry picked from commit 8fb3d5c) Signed-off-by: Xiao Li <[email protected]>
|
@gatorsmile, sorry for not responding, I was on vacation for a few days. Should I still review this even though it is merged? |
|
There's an open PR (#361), to support quoted column names, but the discussion on the merits of it is on-going. I don't see a huge benefit to supporting |
|
That merged PR is a workaround simply avoiding this case and we still should deal with this (in Parquet or Spark). I am closing this because I don't think this is going to be merged soon. |
…ng dots in the names ## What changes were proposed in this pull request? This is an alternative workaround by simply avoiding the predicate pushdown for columns having dots in the names. This is an approach different with apache#17680. The downside of this PR is, literally it does not push down filters on the column having dots in Parquet files at all (both no record level and no rowgroup level) whereas the downside of the approach in that PR, it does not use the Parquet's API properly but in a hacky way to support this case. I assume we prefer a safe way here by using the Parquet API properly but this does close that PR as we are basically just avoiding here. This way looks a simple workaround and probably it is fine given the problem looks arguably rather corner cases (although it might end up with reading whole row groups under the hood but either looks not the best). Currently, if there are dots in the column name, predicate pushdown seems being failed in Parquet. **With dots** ```scala val path = "/tmp/abcde" Seq(Some(1), None).toDF("col.dots").write.parquet(path) spark.read.parquet(path).where("`col.dots` IS NOT NULL").show() ``` ``` +--------+ |col.dots| +--------+ +--------+ ``` **Without dots** ```scala val path = "/tmp/abcde" Seq(Some(1), None).toDF("coldots").write.parquet(path) spark.read.parquet(path).where("`coldots` IS NOT NULL").show() ``` ``` +-------+ |coldots| +-------+ | 1| +-------+ ``` **After** ```scala val path = "/tmp/abcde" Seq(Some(1), None).toDF("col.dots").write.parquet(path) spark.read.parquet(path).where("`col.dots` IS NOT NULL").show() ``` ``` +--------+ |col.dots| +--------+ | 1| +--------+ ``` ## How was this patch tested? Unit tests added in `ParquetFilterSuite`. Author: hyukjinkwon <[email protected]> Closes apache#18000 from HyukjinKwon/SPARK-20364-workaround.
|
This PR enables Parquet predicate pushdown for fields having dots in the names, #27780 |
What changes were proposed in this pull request?
Currently, if there are dots in the column name, predicate pushdown seems being failed in Parquet.
With dots
Without dots
It seems dot in the column names via
FilterApitries to separate the field name with dot (ColumnPathwith multiple column paths) whereas the actual column name iscol.dots. (See FilterApi.java#L71 and it calls ColumnPath.java#L44).I just tried to come up with ways to resolve it and I came up with two as below (as I could not find
a way to set dots as are):
One is simply to don't push down filters when there are dots in column names so that it reads all and filters in Spark-side.
The other way creates Spark's
FilterApifor those columns (it seems final) to get always use single column path it in Spark-side (this seems hacky) as we are not pushing down nested columns currently. So, it looks we can get a field name viaColumnPath.getnotColumnPath.fromDotStringin this way.This PR proposes the latter way because I think we need to be sure on that it passes the tests.
After
How was this patch tested?
Existing tests should cover this. Some tests were added in
ParquetFilterSuite.scala. Manually, I ran related tests and Jenkins tests will cover this.