-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-39002][SQL] StringEndsWith/Contains support push down to Parquet #36328
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-39002][SQL] StringEndsWith/Contains support push down to Parquet #36328
Conversation
| .createWithDefault(true) | ||
|
|
||
| val PARQUET_FILTER_PUSHDOWN_STRING_PREDICATE_ENABLED = | ||
| buildConf("spark.sql.parquet.filterPushdown.stringPredicate") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since spark.sql.parquet.filterPushdown.string.startsWith is internal why not replacing it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm afraid exising users who have already use it.
|
Can one of the admins verify this patch? |
| } | ||
|
|
||
| case sources.StringEndsWith(name, prefix) | ||
| if pushDownStringPredicate && canMakeFilterOn(name, prefix) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indent, following L750.
| } | ||
|
|
||
| case sources.StringContains(name, value) | ||
| if pushDownStringPredicate && canMakeFilterOn(name, value) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
| Option(prefix).map { v => | ||
| FilterApi.userDefined(binaryColumn(nameToParquetField(name).fieldNames), | ||
| new UserDefinedPredicate[Binary] with Serializable { | ||
| private val strToBinary = Binary.fromReusedByteArray(v.getBytes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can just keep UTF8String.fromBytes(strToBinary.getBytes) instead doing it every time in keep.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch. updated
viirya
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a few test failures in ParquetV2FilterSuite.
| .doc("If true, enables Parquet filter push-down optimization for string predicate such " + | ||
| "as startsWith/endsWith/contains function. This configuration only has an effect when " + | ||
| "'${PARQUET_FILTER_PUSHDOWN_ENABLED.key}' is enabled.") | ||
| .version("3.3.0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
3.4.0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
| } | ||
| } | ||
|
|
||
| test("filter pushdown - StringEndsWith/Contains") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't need to test StringStartsWith here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StringStartsWith push down is an existing feature and has been tested at L1426
Line 1426 in 5046b8c
| test("filter pushdown - StringStartsWith") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I mean this test specially does test with testStringPredicateWithDictionaryFilter, but I don't see StringStartsWith is included here. Don't we need to do testStringPredicateWithDictionaryFilter for it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we merge the existing startswith test into the new one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
|
cc @wangyum |
|
lgtm if CI can pass. |
|
cc @huaxingao |
|
Looks good to me overall. Do we need a test similar to this one test("filter pushdown - StringStartsWith") to make sure the the filters for |
done |
|
@WangGuangxin can you fix the error in "Scala 2.13 build with SBT"? |
done |
|
Thanks. Merging to master. |
| Seq( | ||
| "value like 'a%'", // StartsWith | ||
| "value like '%a'", // EndsWith | ||
| "value like '%a%'" // Contains |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A quick comment. How does this verify the "keep()" test? Shouldn't it also be "canDrop()"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this test assume that dictionary filtering is enabled or not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the test is buggy and does not reflect the actual implementation of the filter. NumRowGroupAcc does not actually count row groups, it counts the number of records passed through the filter. For example, for the contains filter we should still read all of the row groups.
Example of the log:
[canDrop] statistics=org.apache.parquet.filter2.predicate.Statistics@52cd90cd => false
[canDrop] statistics=org.apache.parquet.filter2.predicate.Statistics@64e59f7e => false
[keep] statistics=Binary{1 constant bytes, [49]} => false
[keep] statistics=Binary{1 constant bytes, [50]} => false
[keep] statistics=Binary{1 constant bytes, [51]} => false
[keep] statistics=Binary{1 constant bytes, [52]} => false
...
Can the author update the test to reflect the implementation? cc @cloud-fan @sunchao.
You may need to enforce things like row group/dictionary filtering as well as record level filtering.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @sadikovi , the NumRowGroupsAcc is the actually filtered row groups, you can find it here
Line 130 in a1aa200
| if (accu.isDefined() && accu.get().getClass().getSimpleName().equals("NumRowGroupsAcc")) { |
As to the keep() test, the dictionary filter is enabled and there are duplicated records in test data, so parquet will generate dictionary when writing data and dictionary filter is used when reading it.
When we test canDrop, the test data has no duplicate so there is no dictionary generated in parquet, statistics row group filter is used which will call canDrop.
Correct me if I'm wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that was my point - the test needs to be updated to make sure dictionary pages are written and the dictionary filtering is enabled. Without it, the test does not verify the implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you open a follow-up PR to update the test? You can explicitly enable dictionary filtering in the test for the "keep" part of the test to highlight that the test passes due to dictionary filtering, otherwise it could be confusing for people.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Enabling dictionary does not control dictionary filtering, there is a separate flag for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given the discussions here, seems this is not a simple thing. @sadikovi can you open a followup PR directly to demonstrate your idea?
### What changes were proposed in this pull request? This PR updates `FilterPushdownBenchmark` results. ### Why are the changes needed? These PRs do not or do not fully update the `FilterPushdownBenchmark` results: #36328 #36629 #36892 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? N/A. Closes #37022 from wangyum/SPARK-39631. Authored-by: Yuming Wang <[email protected]> Signed-off-by: Yuming Wang <[email protected]>
What changes were proposed in this pull request?
Push down StringEndsWith/Contains to Parquet so that we can leverage Parquet Dictionary Filtering
Why are the changes needed?
Improve performance.
FilterPushDownBenchmark:
Does this PR introduce any user-facing change?
No
How was this patch tested?
added UT