Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
b988651
[SPARK-16804][SQL] Correlated subqueries containing LIMIT return inco…
nsyca Jul 29, 2016
069ed8f
[SPARK-16804][SQL] Correlated subqueries containing LIMIT return inco…
nsyca Jul 29, 2016
edca333
New positive test cases
nsyca Jul 30, 2016
64184fd
Fix unit test case failure
nsyca Aug 1, 2016
29f82b0
blocking TABLESAMPLE
nsyca Aug 5, 2016
ac43ab4
Fixing code styling
nsyca Aug 5, 2016
631d396
Correcting Scala test style
nsyca Aug 7, 2016
7eb9b2d
One (last) attempt to correct the Scala style tests
nsyca Aug 8, 2016
1387cf5
Merge remote-tracking branch 'upstream/master'
nsyca Aug 12, 2016
6d9bade
Merge remote-tracking branch 'upstream/master'
nsyca Nov 4, 2016
9a1f80b
Merge remote-tracking branch 'upstream/master'
nsyca Nov 4, 2016
3fe9429
Merge remote-tracking branch 'upstream/master'
nsyca Nov 5, 2016
0757b81
Merge remote-tracking branch 'upstream/master'
nsyca Nov 11, 2016
35b77f0
Merge remote-tracking branch 'upstream/master'
nsyca Nov 12, 2016
c63b8c6
Merge remote-tracking branch 'upstream/master'
nsyca Nov 14, 2016
f3351d5
Merge remote-tracking branch 'upstream/master'
nsyca Nov 18, 2016
9fc5c33
Merge remote-tracking branch 'upstream/master'
nsyca Nov 18, 2016
402e1d9
Merge remote-tracking branch 'upstream/master'
nsyca Nov 22, 2016
b117281
Merge remote-tracking branch 'upstream/master'
nsyca Nov 23, 2016
3023399
Merge remote-tracking branch 'upstream/master'
nsyca Nov 24, 2016
4b692f0
Merge remote-tracking branch 'upstream/master'
nsyca Nov 25, 2016
0d64512
working code #1
nsyca Nov 28, 2016
c8aadb5
Merge remote-tracking branch 'upstream/master'
nsyca Nov 28, 2016
3f184ea
Merge branch 'master' into spark18455.0
nsyca Nov 28, 2016
23e357c
Make the code more concise
nsyca Nov 28, 2016
d60f0de
Fix stylecheck failure
nsyca Nov 28, 2016
2181647
Merge remote-tracking branch 'upstream/master'
nsyca Nov 28, 2016
599f54b
Merge branch 'master' into spark18455.0
nsyca Nov 28, 2016
ca9e1a8
Cosmetic code changes
nsyca Nov 28, 2016
3f4c62a
Address review comment #1
nsyca Nov 30, 2016
c8588de
Merge remote-tracking branch 'upstream/master'
nsyca Nov 30, 2016
05fd7a3
Merge branch 'master' into spark18455.0
nsyca Nov 30, 2016
1d32958
Remove the extra space
nsyca Nov 30, 2016
0c9d0b5
Move LeftSemi to be the same group as LeftOuter
nsyca Dec 1, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1011,24 +1011,24 @@ class Analyzer(
private def pullOutCorrelatedPredicates(sub: LogicalPlan): (LogicalPlan, Seq[Expression]) = {
val predicateMap = scala.collection.mutable.Map.empty[LogicalPlan, Seq[Expression]]

/** Make sure a plans' subtree does not contain a tagged predicate. */
def failOnOuterReferenceInSubTree(p: LogicalPlan, msg: String): Unit = {
if (p.collect(predicateMap).nonEmpty) {
failAnalysis(s"Accessing outer query column is not allowed in $msg: $p")
// Make sure a plan's subtree does not contain outer references
def failOnOuterReferenceInSubTree(p: LogicalPlan): Unit = {
if (p.collectFirst(predicateMap).nonEmpty) {
failAnalysis(s"Accessing outer query column is not allowed in:\n$p")
}
}

/** Helper function for locating outer references. */
// Helper function for locating outer references.
def containsOuter(e: Expression): Boolean = {
e.find(_.isInstanceOf[OuterReference]).isDefined
}

/** Make sure a plans' expressions do not contain a tagged predicate. */
// Make sure a plan's expressions do not contain outer references
def failOnOuterReference(p: LogicalPlan): Unit = {
if (p.expressions.exists(containsOuter)) {
failAnalysis(
"Expressions referencing the outer query are not supported outside of WHERE/HAVING " +
s"clauses: $p")
s"clauses:\n$p")
}
}

Expand Down Expand Up @@ -1077,10 +1077,51 @@ class Analyzer(

// Simplify the predicates before pulling them out.
val transformed = BooleanSimplification(sub) transformUp {
// WARNING:
// Only Filter can host correlated expressions at this time
// Anyone adding a new "case" below needs to add the call to
// "failOnOuterReference" to disallow correlated expressions in it.

// Whitelist operators allowed in a correlated subquery
// There are 4 categories:
// 1. Operators that are allowed anywhere in a correlated subquery, and,
// by definition of the operators, they either do not contain
// any columns or cannot host outer references.
// 2. Operators that are allowed anywhere in a correlated subquery
// so long as they do not host outer references.
// 3. Operators that need special handlings. These operators are
// Project, Filter, Join, Aggregate, and Generate.
//
// Any operators that are not in the above list are allowed
// in a correlated subquery only if they are not on a correlation path.
// In other word, these operators are allowed only under a correlation point.
//
// A correlation path is defined as the sub-tree of all the operators that
// are on the path from the operator hosting the correlated expressions
// up to the operator producing the correlated values.

// Category 1:
// BroadcastHint, Distinct, LeafNode, Repartition, and SubqueryAlias
case p: BroadcastHint =>
p
case p: Distinct =>
p
case p: LeafNode =>
p
case p: Repartition =>
p
case p: SubqueryAlias =>
p

// Category 2:
// These operators can be anywhere in a correlated subquery.
// so long as they do not host outer references in the operators.
case p: Sort =>
failOnOuterReference(p)
p
case p: RedistributeData =>
failOnOuterReference(p)
p

// Category 3:
// Filter is one of the two operators allowed to host correlated expressions.
// The other operator is Join. Filter can be anywhere in a correlated subquery.
case f @ Filter(cond, child) =>
// Find all predicates with an outer reference.
val (correlated, local) = splitConjunctivePredicates(cond).partition(containsOuter)
Expand All @@ -1102,14 +1143,24 @@ class Analyzer(
predicateMap += child -> xs
child
}

// Project cannot host any correlated expressions
// but can be anywhere in a correlated subquery.
case p @ Project(expressions, child) =>
failOnOuterReference(p)

val referencesToAdd = missingReferences(p)
if (referencesToAdd.nonEmpty) {
Project(expressions ++ referencesToAdd, child)
} else {
p
}

// Aggregate cannot host any correlated expressions
// It can be on a correlation path if the correlation contains
// only equality correlated predicates.
// It cannot be on a correlation path if the correlation has
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: has -> contains?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will change.

// non-equality correlated predicates.
case a @ Aggregate(grouping, expressions, child) =>
failOnOuterReference(a)
failOnNonEqualCorrelatedPredicate(foundNonEqualCorrelatedPred, a)
Expand All @@ -1120,48 +1171,55 @@ class Analyzer(
} else {
a
}
case w : Window =>
failOnOuterReference(w)
failOnNonEqualCorrelatedPredicate(foundNonEqualCorrelatedPred, w)
w
case j @ Join(left, _, RightOuter, _) =>
failOnOuterReference(j)
failOnOuterReferenceInSubTree(left, "a RIGHT OUTER JOIN")
j
// SPARK-18578: Do not allow any correlated predicate
// in a Full (Outer) Join operator and its descendants
case j @ Join(_, _, FullOuter, _) =>
failOnOuterReferenceInSubTree(j, "a FULL OUTER JOIN")
j
case j @ Join(_, right, jt, _) if !jt.isInstanceOf[InnerLike] =>
failOnOuterReference(j)
failOnOuterReferenceInSubTree(right, "a LEFT (OUTER) JOIN")

// Join can host correlated expressions.
case j @ Join(left, right, joinType, _) =>
joinType match {
// Inner join, like Filter, can be anywhere.
case _: InnerLike =>
failOnOuterReference(j)

// Left outer join's right operand cannot be on a correlation path.
// LeftAnti and ExistenceJoin are special cases of LeftOuter.
// Note that ExistenceJoin cannot be expressed externally in both SQL and DataFrame
// so it should not show up here in Analysis phase. This is just a safety net.
//
// LeftSemi does not allow output from the right operand.
// Any correlated references in the subplan
// of the right operand cannot be pulled up.
case LeftOuter | LeftSemi | LeftAnti | ExistenceJoin(_) =>
failOnOuterReference(j)
failOnOuterReferenceInSubTree(right)

// Likewise, Right outer join's left operand cannot be on a correlation path.
case RightOuter =>
failOnOuterReference(j)
failOnOuterReferenceInSubTree(left)

// Any other join types not explicitly listed above,
// including Full outer join, are treated as Category 4.
case _ =>
failOnOuterReferenceInSubTree(j)
}
j
case u: Union =>
failOnOuterReferenceInSubTree(u, "a UNION")
u
case s: SetOperation =>
failOnOuterReferenceInSubTree(s.right, "an INTERSECT/EXCEPT")
s
case e: Expand =>
failOnOuterReferenceInSubTree(e, "an EXPAND")
e
case l : LocalLimit =>
failOnOuterReferenceInSubTree(l, "a LIMIT")
l
// Since LIMIT <n> is represented as GlobalLimit(<n>, (LocalLimit (<n>, child))
// and we are walking bottom up, we will fail on LocalLimit before
// reaching GlobalLimit.
// The code below is just a safety net.
case g : GlobalLimit =>
failOnOuterReferenceInSubTree(g, "a LIMIT")
g
case s : Sample =>
failOnOuterReferenceInSubTree(s, "a TABLESAMPLE")
s
case p =>

// Generator with join=true, i.e., expressed with
// LATERAL VIEW [OUTER], similar to inner join,
// allows to have correlation under it
// but must not host any outer references.
// Note:
// Generator with join=false is treated as Category 4.
case p @ Generate(generator, true, _, _, _, _) =>
failOnOuterReference(p)
p

// Category 4: Any other operators not in the above 3 categories
// cannot be on a correlation path, that is they are allowed only
// under a correlation point but they and their descendant operators
// are not allowed to have any correlated expressions.
case p =>
failOnOuterReferenceInSubTree(p)
p
}
(transformed, predicateMap.values.flatten.toSeq)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right)

joinType match {
case _: InnerLike | LeftSemi =>
case _: InnerLike | LeftSemi =>
// push down the single side only join filter for both sides sub queries
val newLeft = leftJoinConditions.
reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ class AnalysisErrorSuite extends AnalysisTest {
Filter(EqualTo(OuterReference(a), b), LocalRelation(b)))
),
LocalRelation(a))
assertAnalysisError(plan4, "Accessing outer query column is not allowed in a LIMIT" :: Nil)
assertAnalysisError(plan4, "Accessing outer query column is not allowed in" :: Nil)

val plan5 = Filter(
Exists(
Expand All @@ -551,6 +551,6 @@ class AnalysisErrorSuite extends AnalysisTest {
),
LocalRelation(a))
assertAnalysisError(plan5,
"Accessing outer query column is not allowed in a TABLESAMPLE" :: Nil)
"Accessing outer query column is not allowed in" :: Nil)
}
}
18 changes: 18 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -789,4 +789,22 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
}
}
}

// Generate operator
test("Correlated subqueries in LATERAL VIEW") {
withTempView("t1", "t2") {
Seq((1, 1), (2, 0)).toDF("c1", "c2").createOrReplaceTempView("t1")
Seq[(Int, Array[Int])]((1, Array(1, 2)), (2, Array(-1, -3)))
.toDF("c1", "arr_c2").createTempView("t2")
checkAnswer(
sql(
"""
| select c2
| from t1
| where exists (select *
| from t2 lateral view explode(arr_c2) q as c2
where t1.c1 = t2.c1)""".stripMargin),
Row(1) :: Row(0) :: Nil)
}
}
}