-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-20364][SQL] Disable Parquet predicate pushdown for fields having dots in the names #18000
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc @gatorsmile who reviewed the latter way in that PR, @cloud-fan who reviewed some of my previous Parquet related PR, @liancheng and @viirya who I believe are used to this code path, @ash211 who found this issue and @marmbrus who took an action on the JIRA. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the link to PARQUET-389 in the JIRA SPARK-20364
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about?
Parquet does not allow dots in the column name because dots are used as a column path delimiter. Since Parquet 1.8.2 (PARQUET-389), Parquet accepts the filter predicates with missing columns. The incorrect results could be got from Parquet when we push down filters for the column having dots in the names. Thus, we do not push down such filters. See SPARK-20364.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, it looks much nicer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dataTypeOf -> nameTypeMap
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(this is a nested function)
|
Now, the fix is much safer for merging to 2.2. Will review the test case later. |
|
Test build #76958 has finished for PR 18000 at commit
|
|
In addition, we need to know the limitation of column names in Parquet in the future. See the related PR in Parquet: apache/parquet-java#361 |
|
^ Yea, maybe. we need to make sure on Lines 565 to 572 in 3bfb639
|
ae8ea08 to
20adf72
Compare
|
Test build #76987 has finished for PR 18000 at commit
|
|
Test build #76989 has finished for PR 18000 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to me that a missing column is treated like a NULL value. The results will be changed only for some predicates, e.g, IsNull and IsNotNull. For other predicates, can we still push down them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but the problem is, it (almost) always evaluates it with NULL when the columns have dots in the names because column paths become nested (a.b not `a.b`) in the Parquet predicate filter up to my knowledge.
You are right for IsNull. I pointed out this in #17680 (comment) as it looks they (almost) always evaluate it to true in Parquet-side but it is filtered in Spark-side. So, for input/output, it is not an issue in this case but I believe we should disable this for this case too.
I think this example explains the case
val dfs = Seq(
Seq(Some(1), None).toDF("col.dots"),
Seq(Some(1L), None).toDF("col.dots"),
Seq(Some(1.0F), None).toDF("col.dots"),
Seq(Some(1.0D), None).toDF("col.dots"),
Seq(true, false).toDF("col.dots"),
Seq("apple", null).toDF("col.dots"),
Seq("apple", null).toDF("col.dots")
)
val predicates = Seq(
"`col.dots` > 0",
"`col.dots` >= 1L",
"`col.dots` < 2.0",
"`col.dots` <= 1.0D",
"`col.dots` == true",
"`col.dots` IS NOT NULL",
"`col.dots` IS NULL"
)
dfs.zip(predicates).zipWithIndex.foreach { case ((df, predicate), i) =>
val path = s"/tmp/abcd$i"
df.write.mode("overwrite").parquet(path)
spark.read.parquet(path).where(predicate).show()
}+--------+
|col.dots|
+--------+
+--------+
+--------+
|col.dots|
+--------+
+--------+
+--------+
|col.dots|
+--------+
+--------+
+--------+
|col.dots|
+--------+
+--------+
+--------+
|col.dots|
+--------+
+--------+
+--------+
|col.dots|
+--------+
+--------+
+--------+
|col.dots|
+--------+
| null|
+--------+
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In any way, I believe we should disable because it appears the pushed Parquet filter indicates another column.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to consider other special characters, e.g., those in apache/parquet-java#361?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should disallow when initially loading or writing out if it is still allowed in any way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need the check of filterClass? Why remove it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Shouldn't generate filter predicate for $pred"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also check correctness of the results.
|
@gatorsmile, @viirya, I addressed your comments. Could you take another look when you have sometime? |
|
Test build #77026 has finished for PR 18000 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of checking dots for each predicate recursively in ParquetFilters, we can check Filter.references of the predicate at top level in ParquetFileFormat, and skip ParquetFilters.createFilter at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I expect this is a non-critical path and not executed multiple times. Also, it does not look particularly faster to call, Filter.references -> Filter.findReferences -> Filter.references ... . Another downside (maybe nitpicking) is, this will introduce another small code path that returns None for filter creation failure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not just for speed. Also for the number of codes needed to change. But I think it is ok for me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should log something for users because it might not be straightforward for users to know predicate pushdown is disabled for dot-columns. This is bad for performance, it seems to me that it's better to let users know what's happened.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is. However, we don't already log pushed filters failed to create, e.g., In AFAIK. Probably, we should log in those cases across all the sources. If you don't strongly feel about this, I would like to not log here for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. Sounds making sense.
|
Just to make sure, I don't feel strongly for both comments @viirya. I am willing to fix if you feel strongly (already have fixed version in my local). Please let me know. |
|
Sounds ok for me. |
|
Thank you @viirya. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we make $ to useUnresolvedAttribute.quotedString?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, actually my initial version in my local included the change for symbol and $ to match them to Column. It also looks making sense per #7969. I believe this is an internal API -
| * considered an internal API to Spark SQL and are subject to change between minor releases. |
Nevertheless, I believe some guys don't like this change much and wanted to avoid such changes here for now (it is single place it needs anyway for now ... ).
|
a high-level question, is it a parquet bug or Spark doesn't use parquet reader correctly? |
|
I would rather like to say it is a limitation in Parquet API. It looks there is no way to set column names having dots in Parquet filters properly. #17680 suggests a hacky workaround to set this. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: nameToType
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we just have a simple end-to-end test? The fix is actually very simple and seems not worth such complex tests to verify it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I just reverted it back and made a simple test.
c83f1b7 to
1eae64a
Compare
| } | ||
| } | ||
|
|
||
| test("SPARK-20364: Disable Parquet predicate pushdown for fields having dots in the names") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks much better now.
|
LGTM |
|
LGTM, is parquet going to fix it in the future? or is there any official way to support filter push down for column names with dot? |
|
Based on the discussion in apache/parquet-java#361, it does not sound Parquet will support it in the short term. We might need to live with it for a long time. |
|
retest this please |
|
test this please |
|
ok to test |
|
Seems jenkins doesn't work for now. |
|
LGTM pending Jenkins |
|
Test build #77051 has finished for PR 18000 at commit
|
|
Thanks! Merging to master/2.2 |
…ng dots in the names ## What changes were proposed in this pull request? This is an alternative workaround by simply avoiding the predicate pushdown for columns having dots in the names. This is an approach different with #17680. The downside of this PR is, literally it does not push down filters on the column having dots in Parquet files at all (both no record level and no rowgroup level) whereas the downside of the approach in that PR, it does not use the Parquet's API properly but in a hacky way to support this case. I assume we prefer a safe way here by using the Parquet API properly but this does close that PR as we are basically just avoiding here. This way looks a simple workaround and probably it is fine given the problem looks arguably rather corner cases (although it might end up with reading whole row groups under the hood but either looks not the best). Currently, if there are dots in the column name, predicate pushdown seems being failed in Parquet. **With dots** ```scala val path = "/tmp/abcde" Seq(Some(1), None).toDF("col.dots").write.parquet(path) spark.read.parquet(path).where("`col.dots` IS NOT NULL").show() ``` ``` +--------+ |col.dots| +--------+ +--------+ ``` **Without dots** ```scala val path = "/tmp/abcde" Seq(Some(1), None).toDF("coldots").write.parquet(path) spark.read.parquet(path).where("`coldots` IS NOT NULL").show() ``` ``` +-------+ |coldots| +-------+ | 1| +-------+ ``` **After** ```scala val path = "/tmp/abcde" Seq(Some(1), None).toDF("col.dots").write.parquet(path) spark.read.parquet(path).where("`col.dots` IS NOT NULL").show() ``` ``` +--------+ |col.dots| +--------+ | 1| +--------+ ``` ## How was this patch tested? Unit tests added in `ParquetFilterSuite`. Author: hyukjinkwon <[email protected]> Closes #18000 from HyukjinKwon/SPARK-20364-workaround. (cherry picked from commit 8fb3d5c) Signed-off-by: Xiao Li <[email protected]>
…ng dots in the names ## What changes were proposed in this pull request? This is an alternative workaround by simply avoiding the predicate pushdown for columns having dots in the names. This is an approach different with apache#17680. The downside of this PR is, literally it does not push down filters on the column having dots in Parquet files at all (both no record level and no rowgroup level) whereas the downside of the approach in that PR, it does not use the Parquet's API properly but in a hacky way to support this case. I assume we prefer a safe way here by using the Parquet API properly but this does close that PR as we are basically just avoiding here. This way looks a simple workaround and probably it is fine given the problem looks arguably rather corner cases (although it might end up with reading whole row groups under the hood but either looks not the best). Currently, if there are dots in the column name, predicate pushdown seems being failed in Parquet. **With dots** ```scala val path = "/tmp/abcde" Seq(Some(1), None).toDF("col.dots").write.parquet(path) spark.read.parquet(path).where("`col.dots` IS NOT NULL").show() ``` ``` +--------+ |col.dots| +--------+ +--------+ ``` **Without dots** ```scala val path = "/tmp/abcde" Seq(Some(1), None).toDF("coldots").write.parquet(path) spark.read.parquet(path).where("`coldots` IS NOT NULL").show() ``` ``` +-------+ |coldots| +-------+ | 1| +-------+ ``` **After** ```scala val path = "/tmp/abcde" Seq(Some(1), None).toDF("col.dots").write.parquet(path) spark.read.parquet(path).where("`col.dots` IS NOT NULL").show() ``` ``` +--------+ |col.dots| +--------+ | 1| +--------+ ``` ## How was this patch tested? Unit tests added in `ParquetFilterSuite`. Author: hyukjinkwon <[email protected]> Closes apache#18000 from HyukjinKwon/SPARK-20364-workaround.
|
This PR enables Parquet predicate pushdown for fields having dots in the names, #27780 |
What changes were proposed in this pull request?
This is an alternative workaround by simply avoiding the predicate pushdown for columns having dots in the names. This is an approach different with #17680.
The downside of this PR is, literally it does not push down filters on the column having dots in Parquet files at all (both no record level and no rowgroup level) whereas the downside of the approach in that PR, it does not use the Parquet's API properly but in a hacky way to support this case.
I assume we prefer a safe way here by using the Parquet API properly but this does close that PR as we are basically just avoiding here.
This way looks a simple workaround and probably it is fine given the problem looks arguably rather corner cases (although it might end up with reading whole row groups under the hood but either looks not the best).
Currently, if there are dots in the column name, predicate pushdown seems being failed in Parquet.
With dots
Without dots
After
How was this patch tested?
Unit tests added in
ParquetFilterSuite.