Skip to content

Commit ce7293c

Browse files
viiryagatorsmile
authored andcommitted
[SPARK-21835][SQL][FOLLOW-UP] RewritePredicateSubquery should not produce unresolved query plans
## What changes were proposed in this pull request? This is a follow-up of #19050 to deal with `ExistenceJoin` case. ## How was this patch tested? Added test. Author: Liang-Chi Hsieh <[email protected]> Closes #19151 from viirya/SPARK-21835-followup.
1 parent aad2125 commit ce7293c

File tree

2 files changed

+19
-4
lines changed

2 files changed

+19
-4
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,12 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
4949
}
5050
}
5151

52-
private def dedupJoin(joinPlan: Join): Join = joinPlan match {
52+
private def dedupJoin(joinPlan: LogicalPlan): LogicalPlan = joinPlan match {
5353
// SPARK-21835: It is possibly that the two sides of the join have conflicting attributes,
5454
// the produced join then becomes unresolved and break structural integrity. We should
5555
// de-duplicate conflicting attributes. We don't use transformation here because we only
5656
// care about the most top join converted from correlated predicate subquery.
57-
case j @ Join(left, right, joinType @ (LeftSemi | LeftAnti), joinCond) =>
57+
case j @ Join(left, right, joinType @ (LeftSemi | LeftAnti | ExistenceJoin(_)), joinCond) =>
5858
val duplicates = right.outputSet.intersect(left.outputSet)
5959
if (duplicates.nonEmpty) {
6060
val aliasMap = AttributeMap(duplicates.map { dup =>
@@ -145,13 +145,16 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
145145
e transformUp {
146146
case Exists(sub, conditions, _) =>
147147
val exists = AttributeReference("exists", BooleanType, nullable = false)()
148-
newPlan = Join(newPlan, sub, ExistenceJoin(exists), conditions.reduceLeftOption(And))
148+
// Deduplicate conflicting attributes if any.
149+
newPlan = dedupJoin(
150+
Join(newPlan, sub, ExistenceJoin(exists), conditions.reduceLeftOption(And)))
149151
exists
150152
case In(value, Seq(ListQuery(sub, conditions, _, _))) =>
151153
val exists = AttributeReference("exists", BooleanType, nullable = false)()
152154
val inConditions = getValueExpression(value).zip(sub.output).map(EqualTo.tupled)
153155
val newConditions = (inConditions ++ conditions).reduceLeftOption(And)
154-
newPlan = Join(newPlan, sub, ExistenceJoin(exists), newConditions)
156+
// Deduplicate conflicting attributes if any.
157+
newPlan = dedupJoin(Join(newPlan, sub, ExistenceJoin(exists), newConditions))
155158
exists
156159
}
157160
}

sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -938,4 +938,16 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
938938
}
939939
}
940940
}
941+
942+
test("SPARK-21835: Join in correlated subquery should be duplicateResolved: case 3") {
943+
val sqlText =
944+
"""
945+
|SELECT * FROM l, r WHERE l.a = r.c + 1 AND
946+
|(EXISTS (SELECT * FROM r) OR l.a = r.c)
947+
""".stripMargin
948+
val optimizedPlan = sql(sqlText).queryExecution.optimizedPlan
949+
val join = optimizedPlan.collectFirst { case j: Join => j }.get
950+
assert(join.duplicateResolved)
951+
assert(optimizedPlan.resolved)
952+
}
941953
}

0 commit comments

Comments
 (0)