-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-14172][SQL] Hive table partition predicate not passed down correctly #13893
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Likely there's a bug. I think you want to use "substitutedFilters" instead of "deterministicFilters" here. I think you can also add a test with substitution for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@AlekseiS U r right! I have fixed this, thank you!
|
@cloud-fan could you please have a look at this PR? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if it's safe to push it down. For non-deterministic expressions, the order(or number) of input rows matters. If we push down the deterministic part of filter condition, then the input rows to the remaining filter condition will change and may result to wrong answer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @liancheng @yhuai
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. I think technically we can't push down any predicates that are placed after a non-deterministic predicate. Otherwise number of input rows may change and lead to wrong query results.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes you are right. I thought the deterministic part can always be PPDed safely but it was not, in fact, the order of each part should also be considered. For example:
rand() < 0.01 AND partition_col = 'some_value'
should not be PPDed, but
partition_col = 'some_value' AND rand() < 0.01
still could be.
Thank you for your kindly reply!
|
@liancheng I think partition predicates are a bit different. If you explicitly specify a partition predicate, like "date=2016-06-27", do you really expect other partitions being scanned regardless of whether you use non-deterministic function or not? Most likely, no, so if partition filter is specified and it's deterministic it's expected to be always used. |
|
@AlekseiS , I think we should always consider the risk and take care of it even it's not a common case. We can't assume what users expect, and prepare for the worst case. |
|
@cloud-fan I pushed a commit to apply predicate pushdown on deterministic parts placed before any non-deterministic predicates, should it be safe to do this optimization? |
|
no, the predicates order doesn't matter. Our optimizer can reorder the predicates to run them more efficient. |
|
Predicates should not be reordered if a condition contains non-deterministic parts, for example, 'rand() < 0.1 AND a=1' should not be reordered to 'a=1 AND rand() < 0.1' as the number of calls rand() will change and thus output different rows.@cloud-fan @liancheng |
|
It's a good point, looks like we can also improve the |
|
If PushDownPredicate should be improved, I would like to send a PR in one or two days. Should I open a separate JIRA to track that issue?@cloud-fan |
|
@jiangxb1987 Please feel free to create a new JIRA ticket and PR for this, thanks! |
|
With PR#14012 the order between deterministic and non-deterministic predicates would not be changed arbitrarily, so I think we could apply this improvement which push down predicates placed before non-deterministic parts in partition conditions so that we could do partition pruning even when condition contains non-deterministic fields. @liancheng @cloud-fan |
|
retest this please |
|
Test build #62596 has finished for PR 13893 at commit
|
|
ping @cloud-fan |
|
|
||
| // Deterministic parts of filter condition placed before non-deterministic predicates could | ||
| // be pushed down safely. | ||
| val (pushDown, rest) = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
after think about it more, I think it's not safe to do so. collectProjectsAndFilters should return all deterministic projects and filters upon a scan node. And the returned filter conditions are not only used for filter pushdown, but also treated as the whole filters upon this scan node. So the rest conditions here won't get executed.
cc @liancheng to confirm this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Thanks for your comment! But I did searched the codebase and found the returned filters only used for predicates pushdown or partition pruning, in both case it should be safe for us to drop the rest condition. Thank you!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you write a test about this? The logic in DataSourceStrategy shows that, when we get a scan node with the projects and filters upon it, we will rebuild the project and filter(with project lists and filter conditions merged) and wrap the scan node with it. So the filter condition that isn't returned by collectProjectsAndFilters won't get executed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also think that silently dropping nondeterministic filters can be dangerous. Maybe we should just return all operators beneath the top-most nondeterministic filter as the bottom operator?
For example, say we have a plan tree like this:
Project a, b
Filter a > 1
Filter b < 3
Filter RAND(42) > 0.5
Filter c < 2
TableScan t
We should return the following result:
(
// Project list
Seq(a, b),
// Deterministic filters
Seq(b < 3, a > 1),
// The top-most nondeterministic filter with all operators beneath
Filter(RAND(42) > 0.5,
Filter(c < 2,
TableScan(t)))
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @cloud-fan for pointing that out, I realized my previous thoughts were wrong. I fully agree with @liancheng 's improvement idea. Will update related code as well as new testcases tomorrow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
after an offline discussion with @liancheng , we think it would be better to have a wrapper node for scan(table scan or file scan), and this wrapper node can also hold project list and filter conditions. Then in optimizer we can improve the ColumnPrunning and FilterPushdown rules to push down into this wrapper node. After this we don't need PhysicalOperator anymore and the planner can match on the wrapper node directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Do you mean something like adding in basicLogicalOperators the following:
case class Scanner(
projectionList: Seq[NamedExpression],
filters: Seq[Expression],
child: LogicalPlan)
extends UnaryNode
And pass that to the planner instead of applying PhysicalOperation?
I'm willing to take this work. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup, thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Now that I could insert a Scanner operator over CatalogRelation in Optimizer, but I noticed a relation may also be something like l: LogicalRelation(relation: CatalogRelation, _, _), in this case, we couldn't analyze the class LogicalRelation because it's in package spark-sql while Optimizer is in spark-catalyst, thus we are not able to determine whether a Scanner should be added. I think we don't want to add Scanner over every BaseRelation.
|
@cloud-fan I've send a PR to add |
|
ping @jiangxb1987 @cloud-fan |
|
@jiangxb1987 do we still have this bug? |
|
ya, this still exists. Let me find some time to resolve this. |
|
@heary-cao tried to resolve the same issue in #18969 ping @jiangxb1987 |
|
Ping, @jiangxb1987 . |
What changes were proposed in this pull request?
Currently partition predicate is not passed down correctly when condition contains nondeterministic parts. This PR changed the logic in collectProjectsAndFilters() to add the deterministic parts into filters, so that partition predicate can be passed down correctly.
How was this patch tested?
new test in PruningSuite.