-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-18609][SQL]Fix when CTE with Join between two table with same column name #16255
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-18609][SQL]Fix when CTE with Join between two table with same column name #16255
Conversation
|
Test build #70021 has finished for PR 16255 at commit
|
|
@windpiger I am not sure that this is an analysis problem. I suspect that optimizer is messing up the query. Could you pinpoint the rule that causes the issue? Tip: in the spark shell ( |
|
ok, the optimizer rule apply as follows:
here
col#6 should not be existed after using col#9 to replace col#6 in this RemoveAliasOnlyProject.
here
col#9 has no attribute to bind when do BindRefference
|
|
Test build #70048 has finished for PR 16255 at commit
|
|
@hvanhovell please help to review this, thanks a lot! |
| case p @ Project(_, child) if sameOutput(child.output, p.output) => child | ||
|
|
||
| case p @ Project(_, child) if sameOutput(child.output, p.output) => | ||
| if (child.isInstanceOf[CatalogRelation]) p else child |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is a CatalogRelation different?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This works in your example case, but will fail for a more complex query.
The problem is that the transform on L199 also modifies parts of the tree, that are totally unrelated to the projection at hand. In this case right side of the join. This can currently only happen in case of a self-join on a CTE, such a tree can contain duplicate attribute ids because we expand the tree after we have analyzed the CTE.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it will transform the right side of the join, and the problem happened when replace the right side of the join. Above step 5, it apply RemoveAliasOnlyProject rule on step 4's plan, which replace col#6 with col#9, but it does not have effect on Project [col#9 AS col#6] of the right side of the join. I think the right side Project [col#9 AS col#6] should be changed to Project [col#9]
Project [col#9 AS c1#4, col#10 AS c2#5]
+- Join Cross
:- Join Cross
: :- Project
: : +- MetastoreRelation default, p1
: +- MetastoreRelation default, p2
+- !Project [col#9 AS col#10]
+- Join Cross
:- Project
: +- MetastoreRelation default, p1
+- Project [col#9 AS col#6]
+- MetastoreRelation default, p2
|
Test build #70114 has finished for PR 16255 at commit
|
| } | ||
| val attrMap = AttributeMap(attributesToReplace) | ||
| plan transform { | ||
| case plan: Project if plan eq proj => plan.child |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is: this rule assumes that, if we find an alias-only project, e.g. alias a#1 to a#2, it's safe to remove this project and replace all a#2 with a#1 in this plan. However, this is not true for complex cases like https://github.com/apache/spark/pull/16255/files#diff-1ea02a6fab84e938582f7f87cc4d9ea1R2023 .
Let's see if there is a way to fix this problem entirely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a naive way to do this is, make sure we only replace attributes in the ancestor nodes of the alias-only project:
plan transform {
case plan: Project if plan eq proj => plan.child
case plan if plan.collect { case p if p eq project }.nonEmpty => // do the replace
}
It's very inefficient, maybe we can improve TreeNode to maintain the parent-child relationship between nodes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is safe to only replace attributes in the ancestor nodes.
Alias with the same exprId but not the same object, replace the alias with it's child. it is not safe ,right?
Project [col#9 AS col#6] -> Project [col#9]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what do you mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| case plan => plan transformExpressions { | ||
| case a: Attribute if attrMap.contains(a) => attrMap(a) | ||
| case b: Alias if attrMap.exists(_._1.exprId == b.exprId) | ||
| && b.child.isInstanceOf[NamedExpression] => b.child |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do you reason about this? why we treat Alias differently here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As you said, if we find an alias-only project, e.g. alias a#1 to a#2, it's safe to remove this project and replace all a#2 with a#1 in this plan. So another Alias which is also alias a#1 to a#2, but not the same object with the first one, it will not be processed.
here, the logic shows that we process the situation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get it, for alias a#1 to a#2, we wanna replace all a#2 with a#1, so we will do nothing for alias a#1 to a#2, because we can't find an attribute a#2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RemoveAliasOnlyProject will remove alias a#1 to a#2, and replace all a#2 with a#1, so there is no a#2 exists, If we do nothing for alias a#1 to a#2(not the same object with the removed one), it will cause the exception situation from step 5 to step 6 showed on the above comment.
@cloud-fan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know how the failure happens and this can fix it, but I think it's too hacky and does not catch the root cause. https://github.com/apache/spark/pull/16255/files#r92348878 easily explains why the failure happens and how to fix it, can you make other people understand your fix easily?
…timizer ## What changes were proposed in this pull request? The optimizer tries to remove redundant alias only projections from the query plan using the `RemoveAliasOnlyProject` rule. The current rule identifies removes such a project and rewrites the project's attributes in the **entire** tree. This causes problems when parts of the tree are duplicated (for instance a self join on a temporary view/CTE) and the duplicated part contains the alias only project, in this case the rewrite will break the tree. This PR fixes these problems by using a blacklist for attributes that are not to be moved, and by making sure that attribute remapping is only done for the parent tree, and not for unrelated parts of the query plan. The current tree transformation infrastructure works very well if the transformation at hand requires little or a global contextual information. In this case we need to know both the attributes that were not to be moved, and we also needed to know which child attributes were modified. This cannot be done easily using the current infrastructure, and solutions typically involves transversing the query plan multiple times (which is super slow). I have moved around some code in `TreeNode`, `QueryPlan` and `LogicalPlan`to make this much more straightforward; this basically allows you to manually traverse the tree. This PR subsumes the following PRs by windpiger: Closes apache#16267 Closes apache#16255 ## How was this patch tested? I have added unit tests to `RemoveRedundantAliasAndProjectSuite` and I have added integration tests to the `SQLQueryTestSuite.union` and `SQLQueryTestSuite.cte` test cases. Author: Herman van Hovell <[email protected]> Closes apache#16757 from hvanhovell/SPARK-18609.
What changes were proposed in this pull request?
After optimize the SQL with CTE and Join between two tables with the same column name, there will throw a exception when the Physical Plan executed.
SQL:
before fixed the SQL explained:
when execute the Physical Plan above, it will throw exception because of there is no attribute for col#123 to bind( the rules ColumnPruning/RemoveAliasOnlyProject are the root cause):
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: col#123 at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56) at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88) at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:87) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:285) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:291) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:291) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:328) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:326) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:291) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:275) at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87) at org.apache.spark.sql.execution.ProjectExec$$anonfun$4.apply(basicPhysicalOperators.scala:61) at org.apache.spark.sql.execution.ProjectExec$$anonfun$4.apply(basicPhysicalOperators.scala:60) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)after fixed the SQL explained:
and the result is
How was this patch tested?
unit test added