-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-31365][SQL] Enable nested predicate pushdown per data sources #28366
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #121891 has finished for PR 28366 at commit
|
|
Test build #121981 has finished for PR 28366 at commit
|
|
Test build #122051 has finished for PR 28366 at commit
|
| .version("3.0.0") | ||
| .booleanConf | ||
| .createWithDefault(true) | ||
| .stringConf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need .transform(_.toUpperCase(Locale.ROOT))? Also, could we validate input by checkValues? btw, is this feature expected to cover custom data sources except for the prebuilt ones (parquet, orc, ...)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We compare this list with toLowerCase when we need it. So seems to be fine to leave it here. Another similar example is spark.sql.sources.useV1SourceList. And as useV1SourceList too, seems checkValues is not needed.
Currently I think it is safer to assume custom data sources don't support this feature. I actually also think if custom data source wants to support it, it is better to adapt data source v2.
We don't have a common API for v1 data sources that tells if it supports nested predicate pushdown. If we really want to allow custom v1 data sources have that, we can consider adding one common v1 API for the purpose. But, again, seems to me that we will encourage adapting v2 instead adding new things to v1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently I think it is safer to assume custom data sources don't support this feature.
Yea, +1 on your thought.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently I think it is safer to assume custom data sources don't support this feature.
Looks fine. @dbtsai are you good with it? Do you have use cases that need nested predicate pushdown for non-file-source?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In v1, we don't have any use-case for supporting it in custom data source. I'm good with it.
| "while ORC only supports predicates for names containing `dots`. The other data sources" + | ||
| "don't support this feature yet.") | ||
| val NESTED_PREDICATE_PUSHDOWN_V1_SOURCE_LIST = | ||
| buildConf("spark.sql.optimizer.nestedPredicatePushdown.v1sourceList") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: How about v1sourceList -> supportedV1Sources?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure.
| .internal() | ||
| .doc("A comma-separated list of data source short names or fully qualified data source " + | ||
| "implementation class names for which Spark tries to push down predicates for nested " + | ||
| "columns and or names containing `dots` to data sources. Currently, Parquet implements " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: and or -> and?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I coped the wordings. I think it means and/or. I will modify it.
| "implementation class names for which Spark tries to push down predicates for nested " + | ||
| "columns and or names containing `dots` to data sources. Currently, Parquet implements " + | ||
| "both optimizations while ORC only supports predicates for names containing `dots`. The " + | ||
| "other data sources don't support this feature yet.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about listing up a valid set of sources like The value can be 'parquet', 'orc', .... The default value is 'parquet,orc'.?
| val supportedDatasources = | ||
| SQLConf.get.getConf(SQLConf.NESTED_PREDICATE_PUSHDOWN_V1_SOURCE_LIST) | ||
| .toLowerCase(Locale.ROOT) | ||
| .split(",").map(_.trim) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use Utils.stringToSeq?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok.
| * translated [[Filter]]. The map is used for rebuilding | ||
| * [[Expression]] from [[Filter]]. | ||
| * @param nestedPredicatePushdownEnabled Whether nested predicate pushdown is enabled. Default is | ||
| * disabled. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does Default is disabled means? we should add a default value in the argument like nestedPredicatePushdownEnabled: Boolean = false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, forgot to change it. I was adding default value but removed it later.
| PushableColumnAndNestedColumn | ||
| } else { | ||
| PushableColumnWithoutNestedColumn | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about moving this check to thePushableColumn object?
object PushableColumn {
def apply(nestedPredicatePushdownEnabled: Boolean) = {
if (nestedPredicatePushdownEnabled) {
PushableColumnAndNestedColumn
} else {
PushableColumnWithoutNestedColumn
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
| DataSourceUtils.supportNestedPredicatePushdown(fsRelation) | ||
| val pushedFilters = dataFilters | ||
| .flatMap(DataSourceStrategy.translateFilter(_, supportNestedPredicatePushdown)) | ||
| logInfo(s"Pushed Filters: " + s"${pushedFilters.mkString(",")}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: logInfo(s"Pushed Filters: ${pushedFilters.mkString(",")}")
| case v1 if v1.supports(TableCapability.V1_BATCH_WRITE) => | ||
| OverwriteByExpressionExecV1(v1, filters, writeOptions.asOptions, query) :: Nil | ||
| OverwriteByExpressionExecV1( | ||
| v1, transferFilters(filters, false), writeOptions.asOptions, query) :: Nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is v1 fallback API, which is new in DS v2. I think we can always support nested filter pushdown.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. got it. thanks.
|
Test build #122162 has finished for PR 28366 at commit
|
|
Test build #122175 has finished for PR 28366 at commit
|
maropu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No more comment now and looks okay to me.
| DataSourceUtils.supportNestedPredicatePushdown(fsRelation) | ||
| val pushedFilters = dataFilters | ||
| .flatMap(DataSourceStrategy.translateFilter(_, supportNestedPredicatePushdown)) | ||
| logInfo(s"Pushed Filters: ${pushedFilters.mkString(",")}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to have it propagated back so when an user does explain(true), the filters that are pushed down can be shown?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In FileSourceScanExec, this pushed down filters are shown there.
|
LGTM. Thanks. |
| }.toArray | ||
| val filters = splitConjunctivePredicates(deleteExpr) | ||
| def transferFilters = | ||
| (filters: Seq[Expression], supportNestedPredicatePushdown: Boolean) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need the supportNestedPredicatePushdown parameter here as the caller side always pass true?
| "while ORC only supports predicates for names containing `dots`. The other data sources" + | ||
| "don't support this feature yet.") | ||
| val NESTED_PREDICATE_PUSHDOWN_V1_SOURCE_LIST = | ||
| buildConf("spark.sql.optimizer.nestedPredicatePushdown.supportedV1Sources") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
supportedV1Sources -> supportedFileSources?
DS v1 and file source are different APIs and have different planner rules/physical nodes.
|
Test build #122305 has finished for PR 28366 at commit
|
|
Test build #122304 has finished for PR 28366 at commit
|
|
retest this please |
|
Test build #122310 has finished for PR 28366 at commit
|
|
thanks, merging to master/3.0! |
### What changes were proposed in this pull request? This patch proposes to replace `NESTED_PREDICATE_PUSHDOWN_ENABLED` with `NESTED_PREDICATE_PUSHDOWN_V1_SOURCE_LIST` which can configure which v1 data sources are enabled with nested predicate pushdown. ### Why are the changes needed? We added nested predicate pushdown feature that is configured by `NESTED_PREDICATE_PUSHDOWN_ENABLED`. However, this config is all or nothing config, and applies on all data sources. In order to not introduce API breaking change after enabling nested predicate pushdown, we'd like to set nested predicate pushdown per data sources. Please also refer to the comments #27728 (comment). ### Does this PR introduce any user-facing change? No ### How was this patch tested? Added/Modified unit tests. Closes #28366 from viirya/SPARK-31365. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 4952f1a) Signed-off-by: Wenchen Fan <[email protected]>
HyukjinKwon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM too, one comment. Thanks for working on this @viirya.
| "implementation class names for which Spark tries to push down predicates for nested " + | ||
| "columns and/or names containing `dots` to data sources. Currently, Parquet implements " + | ||
| "both optimizations while ORC only supports predicates for names containing `dots`. The " + | ||
| "other data sources don't support this feature yet. So the default value is 'parquet,orc'.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems we decided to only make this configuration effective against DSv1, which seems okay because only DSv1 will have compatibility issues.
But shell we at least explicitly mention that this configuration is only effective with DSv1, (or make this configuration effective against DSv2)? Seems like it's going to be confusing to both end users or developers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think DSv2 API supposes nested column capacity like pushdown and pruning, so we only need to deal with DSv1 compatibility issues here. Precisely, file source.
I will create a simple followup to refine the doc of this configuration for this point. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
…ate pushdown ### What changes were proposed in this pull request? This is a followup to address the #28366 (comment) by refining the SQL config document. ### Why are the changes needed? Make developers less confusing. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Only doc change. Closes #28468 from viirya/SPARK-31365-followup. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Takeshi Yamamuro <[email protected]>
…ate pushdown ### What changes were proposed in this pull request? This is a followup to address the #28366 (comment) by refining the SQL config document. ### Why are the changes needed? Make developers less confusing. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Only doc change. Closes #28468 from viirya/SPARK-31365-followup. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Takeshi Yamamuro <[email protected]> (cherry picked from commit 9bf7387) Signed-off-by: Takeshi Yamamuro <[email protected]>
What changes were proposed in this pull request?
This patch proposes to replace
NESTED_PREDICATE_PUSHDOWN_ENABLEDwithNESTED_PREDICATE_PUSHDOWN_V1_SOURCE_LISTwhich can configure which v1 data sources are enabled with nested predicate pushdown.Why are the changes needed?
We added nested predicate pushdown feature that is configured by
NESTED_PREDICATE_PUSHDOWN_ENABLED. However, this config is all or nothing config, and applies on all data sources.In order to not introduce API breaking change after enabling nested predicate pushdown, we'd like to set nested predicate pushdown per data sources. Please also refer to the comments #27728 (comment).
Does this PR introduce any user-facing change?
No
How was this patch tested?
Added/Modified unit tests.