From b98865127a39bde885f9b1680cfe608629d59d51 Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Fri, 29 Jul 2016 17:43:56 -0400 Subject: [PATCH 01/15] [SPARK-16804][SQL] Correlated subqueries containing LIMIT return incorrect results ## What changes were proposed in this pull request? This patch fixes the incorrect results in the rule ResolveSubquery in Catalyst's Analysis phase. ## How was this patch tested? ./dev/run-tests a new unit test on the problematic pattern. --- .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 10 ++++++++++ .../sql/catalyst/analysis/AnalysisErrorSuite.scala | 8 ++++++++ 2 files changed, 18 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 2efa997ff22d2..c3ee6517875c7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1021,6 +1021,16 @@ class Analyzer( case e: Expand => failOnOuterReferenceInSubTree(e, "an EXPAND") e + case l @ LocalLimit(_, child) => + failOnOuterReferenceInSubTree(l, "LIMIT") + l + // Since LIMIT is represented as GlobalLimit(, (LocalLimit (, child)) + // and we are walking bottom up, we will fail on LocalLimit before + // reaching GlobalLimit. + // The code below is just a safety net. + case g @ GlobalLimit(_, child) => + failOnOuterReferenceInSubTree(g, "LIMIT") + g case p => failOnOuterReference(p) p diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index ff112c51697ad..b78a988eddbb0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -533,5 +533,13 @@ class AnalysisErrorSuite extends AnalysisTest { Exists(Union(LocalRelation(b), Filter(EqualTo(OuterReference(a), c), LocalRelation(c)))), LocalRelation(a)) assertAnalysisError(plan3, "Accessing outer query column is not allowed in" :: Nil) + + val plan4 = Filter( + Exists( + Limit(1, + Filter(EqualTo(OuterReference(a), b), LocalRelation(b))) + ), + LocalRelation(a)) + assertAnalysisError(plan4, "Accessing outer query column is not allowed in LIMIT" :: Nil) } } From 069ed8f8e5f14dca7a15701945d42fc27fe82f3c Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Fri, 29 Jul 2016 17:50:02 -0400 Subject: [PATCH 02/15] [SPARK-16804][SQL] Correlated subqueries containing LIMIT return incorrect results ## What changes were proposed in this pull request? This patch fixes the incorrect results in the rule ResolveSubquery in Catalyst's Analysis phase. ## How was this patch tested? ./dev/run-tests a new unit test on the problematic pattern. --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index c3ee6517875c7..357c763f59467 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1022,14 +1022,14 @@ class Analyzer( failOnOuterReferenceInSubTree(e, "an EXPAND") e case l @ LocalLimit(_, child) => - failOnOuterReferenceInSubTree(l, "LIMIT") + failOnOuterReferenceInSubTree(l, "a LIMIT") l // Since LIMIT is represented as GlobalLimit(, (LocalLimit (, child)) // and we are walking bottom up, we will fail on LocalLimit before // reaching GlobalLimit. // The code below is just a safety net. case g @ GlobalLimit(_, child) => - failOnOuterReferenceInSubTree(g, "LIMIT") + failOnOuterReferenceInSubTree(g, "a LIMIT") g case p => failOnOuterReference(p) From edca333c081e6d4e53a91b496fba4a3ef4ee89ac Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Fri, 29 Jul 2016 20:28:15 -0400 Subject: [PATCH 03/15] New positive test cases --- .../org/apache/spark/sql/SubquerySuite.scala | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index afed342ff8e2a..52387b4b72a16 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -571,4 +571,33 @@ class SubquerySuite extends QueryTest with SharedSQLContext { Row(1.0, false) :: Row(1.0, false) :: Row(2.0, true) :: Row(2.0, true) :: Row(3.0, false) :: Row(5.0, true) :: Row(null, false) :: Row(null, true) :: Nil) } + + test("SPARK-16804: Correlated subqueries containing LIMIT - 1") { + withTempView("onerow") { + Seq(1).toDF("c1").createOrReplaceTempView("onerow") + + checkAnswer( + sql( + """ + | select c1 from onerow t1 + | where exists (select 1 from onerow t2 where t1.c1=t2.c1) + | and exists (select 1 from onerow LIMIT 1)""".stripMargin), + Row(1) :: Nil) + } + } + + test("SPARK-16804: Correlated subqueries containing LIMIT - 2") { + withTempView("onerow") { + Seq(1).toDF("c1").createOrReplaceTempView("onerow") + + checkAnswer( + sql( + """ + | select c1 from onerow t1 + | where exists (select 1 + | from (select 1 from onerow t2 LIMIT 1) + | where t1.c1=t2.c1)""".stripMargin), + Row(1) :: Nil) + } + } } From 64184fdb77c1a305bb2932e82582da28bb4c0e53 Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Mon, 1 Aug 2016 09:20:09 -0400 Subject: [PATCH 04/15] Fix unit test case failure --- .../apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index b78a988eddbb0..c08de826bd945 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -540,6 +540,6 @@ class AnalysisErrorSuite extends AnalysisTest { Filter(EqualTo(OuterReference(a), b), LocalRelation(b))) ), LocalRelation(a)) - assertAnalysisError(plan4, "Accessing outer query column is not allowed in LIMIT" :: Nil) + assertAnalysisError(plan4, "Accessing outer query column is not allowed in a LIMIT" :: Nil) } } From 29f82b05c9e40e7934397257c674b260a8e8a996 Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Fri, 5 Aug 2016 13:42:01 -0400 Subject: [PATCH 05/15] blocking TABLESAMPLE --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 7 +++++-- .../spark/sql/catalyst/analysis/AnalysisErrorSuite.scala | 8 ++++++++ 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 357c763f59467..9d99c4173d4af 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1021,16 +1021,19 @@ class Analyzer( case e: Expand => failOnOuterReferenceInSubTree(e, "an EXPAND") e - case l @ LocalLimit(_, child) => + case l @ LocalLimit(_, _) => failOnOuterReferenceInSubTree(l, "a LIMIT") l // Since LIMIT is represented as GlobalLimit(, (LocalLimit (, child)) // and we are walking bottom up, we will fail on LocalLimit before // reaching GlobalLimit. // The code below is just a safety net. - case g @ GlobalLimit(_, child) => + case g @ GlobalLimit(_, _) => failOnOuterReferenceInSubTree(g, "a LIMIT") g + case s @ Sample(_, _, _, _, _) => + failOnOuterReferenceInSubTree(s, "a TABLESAMPLE") + s case p => failOnOuterReference(p) p diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index c08de826bd945..0b7d681be5114 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -541,5 +541,13 @@ class AnalysisErrorSuite extends AnalysisTest { ), LocalRelation(a)) assertAnalysisError(plan4, "Accessing outer query column is not allowed in a LIMIT" :: Nil) + + val plan5 = Filter( + Exists( + Sample(0.0, 0.5, false, 1L, + Filter(EqualTo(OuterReference(a), b), LocalRelation(b)))().select('b) + ), + LocalRelation(a)) + assertAnalysisError(plan5, "Accessing outer query column is not allowed in a TABLESAMPLE" :: Nil) } } From ac43ab47907a1ccd6d22f920415fbb4de93d4720 Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Fri, 5 Aug 2016 17:10:19 -0400 Subject: [PATCH 06/15] Fixing code styling --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 9d99c4173d4af..29ede7048a2db 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1021,17 +1021,17 @@ class Analyzer( case e: Expand => failOnOuterReferenceInSubTree(e, "an EXPAND") e - case l @ LocalLimit(_, _) => + case l : LocalLimit => failOnOuterReferenceInSubTree(l, "a LIMIT") l // Since LIMIT is represented as GlobalLimit(, (LocalLimit (, child)) // and we are walking bottom up, we will fail on LocalLimit before // reaching GlobalLimit. // The code below is just a safety net. - case g @ GlobalLimit(_, _) => + case g : GlobalLimit => failOnOuterReferenceInSubTree(g, "a LIMIT") g - case s @ Sample(_, _, _, _, _) => + case s : Sample => failOnOuterReferenceInSubTree(s, "a TABLESAMPLE") s case p => From 631d396031e8bf627eb1f4872a4d3a17c144536c Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Sun, 7 Aug 2016 14:39:44 -0400 Subject: [PATCH 07/15] Correcting Scala test style --- .../spark/sql/catalyst/analysis/AnalysisErrorSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index 0b7d681be5114..8935d979414ae 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -548,6 +548,7 @@ class AnalysisErrorSuite extends AnalysisTest { Filter(EqualTo(OuterReference(a), b), LocalRelation(b)))().select('b) ), LocalRelation(a)) - assertAnalysisError(plan5, "Accessing outer query column is not allowed in a TABLESAMPLE" :: Nil) + assertAnalysisError(plan5, + "Accessing outer query column is not allowed in a TABLESAMPLE" :: Nil) } } From 7eb9b2dbba3633a1958e38e0019e3ce816300514 Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Sun, 7 Aug 2016 22:31:09 -0400 Subject: [PATCH 08/15] One (last) attempt to correct the Scala style tests --- .../apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index 8935d979414ae..6438065fb292e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -548,7 +548,7 @@ class AnalysisErrorSuite extends AnalysisTest { Filter(EqualTo(OuterReference(a), b), LocalRelation(b)))().select('b) ), LocalRelation(a)) - assertAnalysisError(plan5, + assertAnalysisError(plan5, "Accessing outer query column is not allowed in a TABLESAMPLE" :: Nil) } } From 0d645125ce24247215374963e80e45c31b25cd14 Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Mon, 28 Nov 2016 12:49:53 -0500 Subject: [PATCH 09/15] working code #1 --- .../sql/catalyst/analysis/Analyzer.scala | 180 ++++++++++++++---- 1 file changed, 139 insertions(+), 41 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 1db44496e67c1..951399afc0eff 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -19,12 +19,11 @@ package org.apache.spark.sql.catalyst.analysis import scala.annotation.tailrec import scala.collection.mutable.ArrayBuffer - import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, SimpleCatalystConf} import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.encoders.OuterScopes -import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.{Generator, _} import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.objects.NewInstance import org.apache.spark.sql.catalyst.optimizer.BooleanSimplification @@ -1011,19 +1010,19 @@ class Analyzer( private def pullOutCorrelatedPredicates(sub: LogicalPlan): (LogicalPlan, Seq[Expression]) = { val predicateMap = scala.collection.mutable.Map.empty[LogicalPlan, Seq[Expression]] - /** Make sure a plans' subtree does not contain a tagged predicate. */ + // Make sure a plan's subtree does not contain outer references def failOnOuterReferenceInSubTree(p: LogicalPlan, msg: String): Unit = { if (p.collect(predicateMap).nonEmpty) { failAnalysis(s"Accessing outer query column is not allowed in $msg: $p") } } - /** Helper function for locating outer references. */ + // Helper function for locating outer references. def containsOuter(e: Expression): Boolean = { e.find(_.isInstanceOf[OuterReference]).isDefined } - /** Make sure a plans' expressions do not contain a tagged predicate. */ + // Make sure a plan's expressions do not contain outer references def failOnOuterReference(p: LogicalPlan): Unit = { if (p.expressions.exists(containsOuter)) { failAnalysis( @@ -1076,10 +1075,54 @@ class Analyzer( // Simplify the predicates before pulling them out. val transformed = BooleanSimplification(sub) transformUp { - // WARNING: - // Only Filter can host correlated expressions at this time - // Anyone adding a new "case" below needs to add the call to - // "failOnOuterReference" to disallow correlated expressions in it. + + // Whitelist operators allowed in a correlated subquery + // There are 3 categories: + // 1. Operators that are allowed anywhere in a correlated subquery + // and, by definition, they cannot host outer references. + // 2. Operators that are allowed anywhere in a correlated subquery + // so long as they do not host outer references. + // 3. Operators that need special treatment. These operators are + // Project, Filter, Join, Aggregate, Window(?). + // + // Any operators that are not in the above list are allowed + // in a correlated subquery only if they are not on a correlation path. + // In other word, these operators are allowed only under a correlation point. + // + // A correlation path is defined as the sub-tree of all the operators that + // are on the path from the operator hosting the correlated expressions + // up to the operator producing the correlated values. + + // Category 1: + // Leaf node can be anywhere in a correlated subquery. + case n: LeafNode => + n + // Category 2: + // These operators can be anywhere in a correlated subquery. + // so long as they do not host outer references in the operators. + // SubqueryAlias can be anywhere in a correlated subquery. + case n: SubqueryAlias => + failOnOuterReference(n) + n + case n: Distinct => + failOnOuterReference(n) + n + case n: Sort => + failOnOuterReference(n) + n + case n: Repartition => + failOnOuterReference(n) + n + case n: RedistributeData => + failOnOuterReference(n) + n + case n: BroadcastHint => + failOnOuterReference(n) + n + + // Category 3: + // Filter is the ONLY operator allowed to host correlated expressions. + // Filter can be anywhere in a correlated subquery. case f @ Filter(cond, child) => // Find all predicates with an outer reference. val (correlated, local) = splitConjunctivePredicates(cond).partition(containsOuter) @@ -1101,6 +1144,9 @@ class Analyzer( predicateMap += child -> xs child } + + // Project cannot host any correlated expressions + // but can be anywhere in a correlated subquery. case p @ Project(expressions, child) => failOnOuterReference(p) val referencesToAdd = missingReferences(p) @@ -1109,6 +1155,12 @@ class Analyzer( } else { p } + + // Aggregate cannot host any correlated expressions + // It can be on a correlation path if the correlation has + // only equality correlated predicates. + // It cannot be on a correlation path if the correlation has + // non-equality correlated predicates. case a @ Aggregate(grouping, expressions, child) => failOnOuterReference(a) failOnNonEqualCorrelatedPredicate(foundNonEqualCorrelatedPred, a) @@ -1119,47 +1171,93 @@ class Analyzer( } else { a } - case w : Window => - failOnOuterReference(w) - failOnNonEqualCorrelatedPredicate(foundNonEqualCorrelatedPred, w) - w + + // Join cannot host any correlated expressions. + // Inner join, like Project, can be anywhere. + case j @ Join(_, _, jt, _) if jt.isInstanceOf[InnerLike] => + failOnOuterReference(j) + j + // Right outer join's left operand cannot be on a correlation path. case j @ Join(left, _, RightOuter, _) => failOnOuterReference(j) failOnOuterReferenceInSubTree(left, "a RIGHT OUTER JOIN") j - // SPARK-18578: Do not allow any correlated predicate - // in a Full (Outer) Join operator and its descendants - case j @ Join(_, _, FullOuter, _) => - failOnOuterReferenceInSubTree(j, "a FULL OUTER JOIN") + // Likewise, Left outer join's right operand cannot be on a correlation path. + case j @ Join(_, right, LeftOuter, _) => + failOnOuterReference(j) + failOnOuterReferenceInSubTree(right, "a LEFT (OUTER) JOIN") j - case j @ Join(_, right, jt, _) if !jt.isInstanceOf[InnerLike] => + // LeftSemi, LeftAnti and ExistenceJoin are special cases of LeftOuter. + // ExistenceJoin cannot be used externally in both SQL and DataFrame + // so it should not show up here in Analysis phase. + case j @ Join(_, right, LeftSemi, _) => failOnOuterReference(j) failOnOuterReferenceInSubTree(right, "a LEFT (OUTER) JOIN") j - case u: Union => - failOnOuterReferenceInSubTree(u, "a UNION") - u - case s: SetOperation => - failOnOuterReferenceInSubTree(s.right, "an INTERSECT/EXCEPT") - s - case e: Expand => - failOnOuterReferenceInSubTree(e, "an EXPAND") - e - case l : LocalLimit => - failOnOuterReferenceInSubTree(l, "a LIMIT") - l - // Since LIMIT is represented as GlobalLimit(, (LocalLimit (, child)) - // and we are walking bottom up, we will fail on LocalLimit before - // reaching GlobalLimit. - // The code below is just a safety net. - case g : GlobalLimit => - failOnOuterReferenceInSubTree(g, "a LIMIT") - g - case s : Sample => - failOnOuterReferenceInSubTree(s, "a TABLESAMPLE") - s + case j @ Join(_, right, LeftAnti, _) => + failOnOuterReference(j) + failOnOuterReferenceInSubTree(right, "a LEFT (OUTER) JOIN") + j + // Any other join types not explicitly listed above, + // including Full outer join, are treated as Category 4. + case j @ Join(_, _, _, _) => + failOnOuterReferenceInSubTree(j, "other JOIN") + j + + // ?? + // Window cannot host any correlated expressions + // Window cannot be on or above any non-equality correlated expressions + case w : Window => + failOnOuterReference(w) + failOnNonEqualCorrelatedPredicate(foundNonEqualCorrelatedPred, w) + w + + // ?? + case n @ Generate(generator, join, _, _, _, _) => + // generator which is a derived class of Expression must not host + // any outer references + if (containsOuter(generator)) { + failOnOuterReference(n) + } else if (join) { + // LATERAL VIEW [OUTER] + failOnOuterReference(n) + } else { + failOnOuterReferenceInSubTree(n, "") + } + n + + /* + // These operators are permitted under a correlation point + // They are not permitted between a correlation point and their outer reference. + case u: Union => + failOnOuterReferenceInSubTree(u, "a UNION") + u + case s: SetOperation => + failOnOuterReferenceInSubTree(s.right, "an INTERSECT/EXCEPT") + s + case e: Expand => + failOnOuterReferenceInSubTree(e, "an EXPAND") + e + case l : LocalLimit => + failOnOuterReferenceInSubTree(l, "a LIMIT") + l + // Since LIMIT is represented as GlobalLimit(, (LocalLimit (, child)) + // and we are walking bottom up, we will fail on LocalLimit before + // reaching GlobalLimit. + // The code below is just a safety net. + case g : GlobalLimit => + failOnOuterReferenceInSubTree(g, "a LIMIT") + g + case s : Sample => + failOnOuterReferenceInSubTree(s, "a TABLESAMPLE") + s + */ + // Category 4: Any other operators not in the above 3 categories + // cannot be on a correlation path, that is they are allowed + // under a correlation point but they and their descendant operators + // are not allowed to have any correlated expressions. case p => - failOnOuterReference(p) + failOnOuterReferenceInSubTree(p, "") p } (transformed, predicateMap.values.flatten.toSeq) From 23e357ca3fe77cdad62c25ac2a2d4a5fad0ccb94 Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Mon, 28 Nov 2016 14:34:50 -0500 Subject: [PATCH 10/15] Make the code more concise --- .../sql/catalyst/analysis/Analyzer.scala | 144 +++++++----------- .../analysis/AnalysisErrorSuite.scala | 4 +- .../org/apache/spark/sql/SubquerySuite.scala | 18 +++ 3 files changed, 74 insertions(+), 92 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index fc419c106a7cf..9e5ad546d8347 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1011,9 +1011,9 @@ class Analyzer( val predicateMap = scala.collection.mutable.Map.empty[LogicalPlan, Seq[Expression]] // Make sure a plan's subtree does not contain outer references - def failOnOuterReferenceInSubTree(p: LogicalPlan, msg: String): Unit = { + def failOnOuterReferenceInSubTree(p: LogicalPlan): Unit = { if (p.collect(predicateMap).nonEmpty) { - failAnalysis(s"Accessing outer query column is not allowed in $msg: $p") + failAnalysis(s"Accessing outer query column is not allowed in:\n$p") } } @@ -1027,7 +1027,7 @@ class Analyzer( if (p.expressions.exists(containsOuter)) { failAnalysis( "Expressions referencing the outer query are not supported outside of WHERE/HAVING " + - s"clauses: $p") + s"clauses:\n$p") } } @@ -1078,13 +1078,13 @@ class Analyzer( val transformed = BooleanSimplification(sub) transformUp { // Whitelist operators allowed in a correlated subquery - // There are 3 categories: - // 1. Operators that are allowed anywhere in a correlated subquery - // and, by definition, they cannot host outer references. + // There are 4 categories: + // 1. Operators that are allowed anywhere in a correlated subquery, and, + // by definition of the operators, they cannot host outer references. // 2. Operators that are allowed anywhere in a correlated subquery // so long as they do not host outer references. - // 3. Operators that need special treatment. These operators are - // Project, Filter, Join, Aggregate, Window(?). + // 3. Operators that need special handlings. These operators are + // Project, Filter, Join, Aggregate, and Generate. // // Any operators that are not in the above list are allowed // in a correlated subquery only if they are not on a correlation path. @@ -1122,8 +1122,8 @@ class Analyzer( n // Category 3: - // Filter is the ONLY operator allowed to host correlated expressions. - // Filter can be anywhere in a correlated subquery. + // Filter is one of the two operators allowed to host correlated expressions. + // The other operator is Join. Filter can be anywhere in a correlated subquery. case f @ Filter(cond, child) => // Find all predicates with an outer reference. val (correlated, local) = splitConjunctivePredicates(cond).partition(containsOuter) @@ -1173,92 +1173,56 @@ class Analyzer( a } - // Join cannot host any correlated expressions. - // Inner join, like Project, can be anywhere. - case j @ Join(_, _, jt, _) if jt.isInstanceOf[InnerLike] => - failOnOuterReference(j) - j - // Right outer join's left operand cannot be on a correlation path. - case j @ Join(left, _, RightOuter, _) => - failOnOuterReference(j) - failOnOuterReferenceInSubTree(left, "a RIGHT OUTER JOIN") - j - // Likewise, Left outer join's right operand cannot be on a correlation path. - case j @ Join(_, right, LeftOuter, _) => - failOnOuterReference(j) - failOnOuterReferenceInSubTree(right, "a LEFT (OUTER) JOIN") - j - // LeftSemi, LeftAnti and ExistenceJoin are special cases of LeftOuter. - // ExistenceJoin cannot be used externally in both SQL and DataFrame - // so it should not show up here in Analysis phase. - case j @ Join(_, right, LeftSemi, _) => - failOnOuterReference(j) - failOnOuterReferenceInSubTree(right, "a LEFT (OUTER) JOIN") - j - case j @ Join(_, right, LeftAnti, _) => - failOnOuterReference(j) - failOnOuterReferenceInSubTree(right, "a LEFT (OUTER) JOIN") - j - // Any other join types not explicitly listed above, - // including Full outer join, are treated as Category 4. - case j @ Join(_, _, _, _) => - failOnOuterReferenceInSubTree(j, "other JOIN") + // Join can host correlated expressions. + case j @ Join(left, right, joinType, _) => + joinType match { + // Inner join, like Filter, can be anywhere. + // LeftSemi is a special case of Inner join which returns + // only the first matched row to the right table. + case _: InnerLike | LeftSemi => + failOnOuterReference(j) + + // Left outer join's right operand cannot be on a correlation path. + // LeftAnti and ExistenceJoin are special cases of LeftOuter. + // Note that ExistenceJoin cannot be expressed externally in both SQL and DataFrame + // so it should not show up here in Analysis phase. This is just a safety net. + case LeftOuter | LeftAnti | ExistenceJoin(_) => + failOnOuterReference(j) + failOnOuterReferenceInSubTree(right) + + // Likewise, Right outer join's left operand cannot be on a correlation path. + case RightOuter => + failOnOuterReference(j) + failOnOuterReferenceInSubTree(left) + + // Any other join types not explicitly listed above, + // including Full outer join, are treated as Category 4. + case _ => + failOnOuterReferenceInSubTree(j) + } j - // ?? - // Window cannot host any correlated expressions - // Window cannot be on or above any non-equality correlated expressions - case w : Window => - failOnOuterReference(w) - failOnNonEqualCorrelatedPredicate(foundNonEqualCorrelatedPred, w) - w - - // ?? - case n @ Generate(generator, join, _, _, _, _) => - // generator which is a derived class of Expression must not host - // any outer references - if (containsOuter(generator)) { - failOnOuterReference(n) - } else if (join) { - // LATERAL VIEW [OUTER] - failOnOuterReference(n) - } else { - failOnOuterReferenceInSubTree(n, "") - } - n - - /* - // These operators are permitted under a correlation point - // They are not permitted between a correlation point and their outer reference. - case u: Union => - failOnOuterReferenceInSubTree(u, "a UNION") - u - case s: SetOperation => - failOnOuterReferenceInSubTree(s.right, "an INTERSECT/EXCEPT") - s - case e: Expand => - failOnOuterReferenceInSubTree(e, "an EXPAND") - e - case l : LocalLimit => - failOnOuterReferenceInSubTree(l, "a LIMIT") - l - // Since LIMIT is represented as GlobalLimit(, (LocalLimit (, child)) - // and we are walking bottom up, we will fail on LocalLimit before - // reaching GlobalLimit. - // The code below is just a safety net. - case g : GlobalLimit => - failOnOuterReferenceInSubTree(g, "a LIMIT") - g - case s : Sample => - failOnOuterReferenceInSubTree(s, "a TABLESAMPLE") - s - */ + // Generator with join=true, i.e., expressed with + // LATERAL VIEW [OUTER], similar to inner join, + // allows to have correlation under it + // but must not host any outer references. + case n @ Generate(generator, join, _, _, _, _) if (join) => + if (containsOuter(generator)) { + failOnOuterReference(n) + } + n + + case n @ Generate(_, _, _, _, _, _) => + // Generator with join=false is treated as Category 4. + failOnOuterReferenceInSubTree(n) + n + // Category 4: Any other operators not in the above 3 categories - // cannot be on a correlation path, that is they are allowed + // cannot be on a correlation path, that is they are allowed only // under a correlation point but they and their descendant operators // are not allowed to have any correlated expressions. case p => - failOnOuterReferenceInSubTree(p, "") + failOnOuterReferenceInSubTree(p) p } (transformed, predicateMap.values.flatten.toSeq) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index 8c1faea2394c6..96aff37a4b4f9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -542,7 +542,7 @@ class AnalysisErrorSuite extends AnalysisTest { Filter(EqualTo(OuterReference(a), b), LocalRelation(b))) ), LocalRelation(a)) - assertAnalysisError(plan4, "Accessing outer query column is not allowed in a LIMIT" :: Nil) + assertAnalysisError(plan4, "Accessing outer query column is not allowed in" :: Nil) val plan5 = Filter( Exists( @@ -551,6 +551,6 @@ class AnalysisErrorSuite extends AnalysisTest { ), LocalRelation(a)) assertAnalysisError(plan5, - "Accessing outer query column is not allowed in a TABLESAMPLE" :: Nil) + "Accessing outer query column is not allowed in" :: Nil) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 73a53944964fd..0f2f520006e35 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -789,4 +789,22 @@ class SubquerySuite extends QueryTest with SharedSQLContext { } } } + + // Generate operator + test("Correlated subqueries in LATERAL VIEW") { + withTempView("t1", "t2") { + Seq((1, 1), (2, 0)).toDF("c1", "c2").createOrReplaceTempView("t1") + Seq[(Int, Array[Int])]((1, Array(1, 2)), (2, Array(-1, -3))) + .toDF("c1", "arr_c2").createTempView("t2") + checkAnswer( + sql( + """ + | select c2 + | from t1 + | where exists (select * + | from t2 lateral view explode(arr_c2) q as c2 + where t1.c1 = t2.c1)""".stripMargin), + Row(1) :: Row(0) :: Nil) + } + } } From d60f0def5f0da435a520737cc3bbfae530b58a08 Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Mon, 28 Nov 2016 14:37:48 -0500 Subject: [PATCH 11/15] Fix stylecheck failure --- .../scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 9e5ad546d8347..c11addedda0d0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis import scala.annotation.tailrec import scala.collection.mutable.ArrayBuffer + import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, SimpleCatalystConf} import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, InMemoryCatalog, SessionCatalog} From ca9e1a82e0d9e84e02ed4505562f051ad7c14a5c Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Mon, 28 Nov 2016 17:39:53 -0500 Subject: [PATCH 12/15] Cosmetic code changes --- .../sql/catalyst/analysis/Analyzer.scala | 51 +++++++++---------- 1 file changed, 24 insertions(+), 27 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index c11addedda0d0..f9eb05fe91cba 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, SimpleCatalystConf} import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.encoders.OuterScopes -import org.apache.spark.sql.catalyst.expressions.{Generator, _} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.objects.NewInstance import org.apache.spark.sql.catalyst.optimizer.BooleanSimplification @@ -1103,24 +1103,24 @@ class Analyzer( // These operators can be anywhere in a correlated subquery. // so long as they do not host outer references in the operators. // SubqueryAlias can be anywhere in a correlated subquery. - case n: SubqueryAlias => - failOnOuterReference(n) - n - case n: Distinct => - failOnOuterReference(n) - n - case n: Sort => - failOnOuterReference(n) - n - case n: Repartition => - failOnOuterReference(n) - n - case n: RedistributeData => - failOnOuterReference(n) - n - case n: BroadcastHint => - failOnOuterReference(n) - n + case p: SubqueryAlias => + failOnOuterReference(p) + p + case p: Distinct => + failOnOuterReference(p) + p + case p: Sort => + failOnOuterReference(p) + p + case p: Repartition => + failOnOuterReference(p) + p + case p: RedistributeData => + failOnOuterReference(p) + p + case p: BroadcastHint => + failOnOuterReference(p) + p // Category 3: // Filter is one of the two operators allowed to host correlated expressions. @@ -1207,16 +1207,13 @@ class Analyzer( // LATERAL VIEW [OUTER], similar to inner join, // allows to have correlation under it // but must not host any outer references. - case n @ Generate(generator, join, _, _, _, _) if (join) => + // Note: + // Generator with join=false is treated as Category 4. + case p @ Generate(generator, join, _, _, _, _) if (join) => if (containsOuter(generator)) { - failOnOuterReference(n) + failOnOuterReference(p) } - n - - case n @ Generate(_, _, _, _, _, _) => - // Generator with join=false is treated as Category 4. - failOnOuterReferenceInSubTree(n) - n + p // Category 4: Any other operators not in the above 3 categories // cannot be on a correlation path, that is they are allowed only From 3f4c62afb62f4d922ec0ae037b331e0de8de9193 Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Tue, 29 Nov 2016 20:35:04 -0500 Subject: [PATCH 13/15] Address review comment #1 --- .../sql/catalyst/analysis/Analyzer.scala | 40 +++++++++---------- 1 file changed, 18 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index f9eb05fe91cba..114454a51350c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1013,7 +1013,7 @@ class Analyzer( // Make sure a plan's subtree does not contain outer references def failOnOuterReferenceInSubTree(p: LogicalPlan): Unit = { - if (p.collect(predicateMap).nonEmpty) { + if (p.collectFirst(predicateMap).nonEmpty) { failAnalysis(s"Accessing outer query column is not allowed in:\n$p") } } @@ -1081,7 +1081,8 @@ class Analyzer( // Whitelist operators allowed in a correlated subquery // There are 4 categories: // 1. Operators that are allowed anywhere in a correlated subquery, and, - // by definition of the operators, they cannot host outer references. + // by definition of the operators, they either do not contain + // any columns or cannot host outer references. // 2. Operators that are allowed anywhere in a correlated subquery // so long as they do not host outer references. // 3. Operators that need special handlings. These operators are @@ -1096,29 +1097,25 @@ class Analyzer( // up to the operator producing the correlated values. // Category 1: - // Leaf node can be anywhere in a correlated subquery. - case n: LeafNode => - n - // Category 2: - // These operators can be anywhere in a correlated subquery. - // so long as they do not host outer references in the operators. - // SubqueryAlias can be anywhere in a correlated subquery. - case p: SubqueryAlias => - failOnOuterReference(p) + // BroadcastHint, Distinct, LeafNode, Repartition, and SubqueryAlias + case p: BroadcastHint => p case p: Distinct => - failOnOuterReference(p) p - case p: Sort => - failOnOuterReference(p) + case p: LeafNode => p case p: Repartition => - failOnOuterReference(p) p - case p: RedistributeData => + case p: SubqueryAlias => + p + + // Category 2: + // These operators can be anywhere in a correlated subquery. + // so long as they do not host outer references in the operators. + case p: Sort => failOnOuterReference(p) p - case p: BroadcastHint => + case p: RedistributeData => failOnOuterReference(p) p @@ -1151,6 +1148,7 @@ class Analyzer( // but can be anywhere in a correlated subquery. case p @ Project(expressions, child) => failOnOuterReference(p) + val referencesToAdd = missingReferences(p) if (referencesToAdd.nonEmpty) { Project(expressions ++ referencesToAdd, child) @@ -1159,7 +1157,7 @@ class Analyzer( } // Aggregate cannot host any correlated expressions - // It can be on a correlation path if the correlation has + // It can be on a correlation path if the correlation contains // only equality correlated predicates. // It cannot be on a correlation path if the correlation has // non-equality correlated predicates. @@ -1209,10 +1207,8 @@ class Analyzer( // but must not host any outer references. // Note: // Generator with join=false is treated as Category 4. - case p @ Generate(generator, join, _, _, _, _) if (join) => - if (containsOuter(generator)) { - failOnOuterReference(p) - } + case p @ Generate(generator, true, _, _, _, _) => + failOnOuterReference(p) p // Category 4: Any other operators not in the above 3 categories From 1d329587009903f8b825ab1297e65eaaec5f646e Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Tue, 29 Nov 2016 20:40:27 -0500 Subject: [PATCH 14/15] Remove the extra space --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 37f0c8ed19d37..75d9997582aa6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -932,7 +932,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right) joinType match { - case _: InnerLike | LeftSemi => + case _: InnerLike | LeftSemi => // push down the single side only join filter for both sides sub queries val newLeft = leftJoinConditions. reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) From 0c9d0b5e260a8c2994efa79efaa343a1308d2054 Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Thu, 1 Dec 2016 15:15:22 -0500 Subject: [PATCH 15/15] Move LeftSemi to be the same group as LeftOuter --- .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 114454a51350c..ce6ee4e7b6b43 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1176,16 +1176,18 @@ class Analyzer( case j @ Join(left, right, joinType, _) => joinType match { // Inner join, like Filter, can be anywhere. - // LeftSemi is a special case of Inner join which returns - // only the first matched row to the right table. - case _: InnerLike | LeftSemi => + case _: InnerLike => failOnOuterReference(j) // Left outer join's right operand cannot be on a correlation path. // LeftAnti and ExistenceJoin are special cases of LeftOuter. // Note that ExistenceJoin cannot be expressed externally in both SQL and DataFrame // so it should not show up here in Analysis phase. This is just a safety net. - case LeftOuter | LeftAnti | ExistenceJoin(_) => + // + // LeftSemi does not allow output from the right operand. + // Any correlated references in the subplan + // of the right operand cannot be pulled up. + case LeftOuter | LeftSemi | LeftAnti | ExistenceJoin(_) => failOnOuterReference(j) failOnOuterReferenceInSubTree(right)