From 97837a46b790ceb1f0df38cc7a3094b1cb4eb556 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 11 Jul 2018 07:44:43 +0000 Subject: [PATCH 1/8] Resolved references from Dataset should be checked if it is missed from plan. --- .../spark/sql/catalyst/analysis/Analyzer.scala | 5 +++-- .../sql/catalyst/plans/logical/LogicalPlan.scala | 3 ++- .../plans/logical/basicLogicalOperators.scala | 3 ++- .../spark/sql/catalyst/plans/LogicalPlanSuite.scala | 12 ++++++++++++ .../scala/org/apache/spark/sql/DataFrameSuite.scala | 11 +++++++++++ 5 files changed, 30 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index e187133d03b17..84ca22b8c999c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1149,7 +1149,7 @@ class Analyzer( private def resolveExprsAndAddMissingAttrs( exprs: Seq[Expression], plan: LogicalPlan): (Seq[Expression], LogicalPlan) = { - if (exprs.forall(_.resolved)) { + if (exprs.forall(e => e.resolved && e.references.subsetOf(plan.outputSet))) { // All given expressions are resolved, no need to continue anymore. (exprs, plan) } else { @@ -1163,7 +1163,8 @@ class Analyzer( case p: Project => val maybeResolvedExprs = exprs.map(resolveExpression(_, p)) val (newExprs, newChild) = resolveExprsAndAddMissingAttrs(maybeResolvedExprs, p.child) - val missingAttrs = AttributeSet(newExprs) -- AttributeSet(maybeResolvedExprs) + val missingAttrs = AttributeSet(newExprs) -- + AttributeSet(maybeResolvedExprs.filter(_.references.subsetOf(p.outputSet))) (newExprs, Project(p.projectList ++ missingAttrs, newChild)) case a @ Aggregate(groupExprs, aggExprs, child) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index c486ad700f362..65cc55d709977 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -60,7 +60,8 @@ abstract class LogicalPlan * [[org.apache.spark.sql.catalyst.analysis.UnresolvedRelation UnresolvedRelation]] * should return `false`). */ - lazy val resolved: Boolean = expressions.forall(_.resolved) && childrenResolved + lazy val resolved: Boolean = expressions.forall(_.resolved) && childrenResolved && + missingInput.isEmpty override protected def statePrefix = if (!resolved) "'" else super.statePrefix diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 3bf32ef7884e5..d2cf163c82321 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -60,7 +60,8 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) }.nonEmpty ) - !expressions.exists(!_.resolved) && childrenResolved && !hasSpecialExpressions + !expressions.exists(!_.resolved) && childrenResolved && !hasSpecialExpressions && + missingInput.isEmpty } override def validConstraints: Set[Expression] = diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala index bf569cb869428..c772c305cb1db 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala @@ -119,4 +119,16 @@ class LogicalPlanSuite extends SparkFunSuite { OneRowRelation()) assert(result.sameResult(expected)) } + + test("Logical plan with missing inputs should be unresolved") { + // Normally we won't add a missing resolved reference into a logical plan, + // but a valid query like `df.select(df("name")).filter(df("id") === 0)` can make a query + // like this. + val relation = LocalRelation(AttributeReference("a", IntegerType, nullable = true)()) + val plan = Project(Stream(AttributeReference("b", IntegerType, nullable = true)()), relation) + assert(plan.expressions.forall(_.resolved)) + assert(plan.childrenResolved) + assert(plan.missingInput.nonEmpty) + assert(!plan.resolved) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index ea00d22bff001..5b9193d1495b8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -2329,4 +2329,15 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { val mapWithBinaryKey = map(lit(Array[Byte](1.toByte)), lit(1)) checkAnswer(spark.range(1).select(mapWithBinaryKey.getItem(Array[Byte](1.toByte))), Row(1)) } + + test("SPARK-24781: Using a reference from Dataset in Filter/Sort might not work") { + val df = Seq(("test1", 0), ("test2", 1)).toDF("name", "id") + val filter1 = df.select(df("name")).filter(df("id") === 0) + val filter2 = df.select(col("name")).filter(col("id") === 0) + checkAnswer(filter1, filter2.collect()) + + val sort1 = df.select(df("name")).orderBy(df("id")) + val sort2 = df.select(col("name")).orderBy(col("id")) + checkAnswer(sort1, sort2.collect()) + } } From b99d0c7ed2fb9932a3bec661b65598c77a8ad5f3 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 12 Jul 2018 03:02:43 +0000 Subject: [PATCH 2/8] Address comments and fix bug. --- .../spark/sql/catalyst/analysis/Analyzer.scala | 12 +++++++----- .../sql/catalyst/plans/logical/LogicalPlan.scala | 3 +-- .../plans/logical/basicLogicalOperators.scala | 3 +-- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 84ca22b8c999c..3c00db6d9350a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1125,7 +1125,8 @@ class Analyzer( case sa @ Sort(_, _, AnalysisBarrier(child: Aggregate)) => sa case sa @ Sort(_, _, child: Aggregate) => sa - case s @ Sort(order, _, child) if !s.resolved && child.resolved => + case s @ Sort(order, _, child) + if (!s.resolved || s.missingInput.nonEmpty) && child.resolved => val (newOrder, newChild) = resolveExprsAndAddMissingAttrs(order, child) val ordering = newOrder.map(_.asInstanceOf[SortOrder]) if (child.output == newChild.output) { @@ -1136,7 +1137,7 @@ class Analyzer( Project(child.output, newSort) } - case f @ Filter(cond, child) if !f.resolved && child.resolved => + case f @ Filter(cond, child) if (!f.resolved || f.missingInput.nonEmpty) && child.resolved => val (newCond, newChild) = resolveExprsAndAddMissingAttrs(Seq(cond), child) if (child.output == newChild.output) { f.copy(condition = newCond.head) @@ -1149,8 +1150,9 @@ class Analyzer( private def resolveExprsAndAddMissingAttrs( exprs: Seq[Expression], plan: LogicalPlan): (Seq[Expression], LogicalPlan) = { + // An expression is possibly resolved but not in the output of `plan`. if (exprs.forall(e => e.resolved && e.references.subsetOf(plan.outputSet))) { - // All given expressions are resolved, no need to continue anymore. + // All given expressions are resolved and in the plan's output, no need to continue anymore. (exprs, plan) } else { plan match { @@ -1163,8 +1165,8 @@ class Analyzer( case p: Project => val maybeResolvedExprs = exprs.map(resolveExpression(_, p)) val (newExprs, newChild) = resolveExprsAndAddMissingAttrs(maybeResolvedExprs, p.child) - val missingAttrs = AttributeSet(newExprs) -- - AttributeSet(maybeResolvedExprs.filter(_.references.subsetOf(p.outputSet))) + // The resolved attributes might not come from `p.child`. Need to filter it. + val missingAttrs = (AttributeSet(newExprs).intersect(p.child.outputSet)) -- p.outputSet (newExprs, Project(p.projectList ++ missingAttrs, newChild)) case a @ Aggregate(groupExprs, aggExprs, child) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 65cc55d709977..c486ad700f362 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -60,8 +60,7 @@ abstract class LogicalPlan * [[org.apache.spark.sql.catalyst.analysis.UnresolvedRelation UnresolvedRelation]] * should return `false`). */ - lazy val resolved: Boolean = expressions.forall(_.resolved) && childrenResolved && - missingInput.isEmpty + lazy val resolved: Boolean = expressions.forall(_.resolved) && childrenResolved override protected def statePrefix = if (!resolved) "'" else super.statePrefix diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index d2cf163c82321..3bf32ef7884e5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -60,8 +60,7 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) }.nonEmpty ) - !expressions.exists(!_.resolved) && childrenResolved && !hasSpecialExpressions && - missingInput.isEmpty + !expressions.exists(!_.resolved) && childrenResolved && !hasSpecialExpressions } override def validConstraints: Set[Expression] = From 38a935df69b0ad34fd648e69d0f4c9c6b5a3dd0e Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 12 Jul 2018 06:51:21 +0000 Subject: [PATCH 3/8] Fix bug. --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 3c00db6d9350a..03f12692f5c06 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1165,8 +1165,8 @@ class Analyzer( case p: Project => val maybeResolvedExprs = exprs.map(resolveExpression(_, p)) val (newExprs, newChild) = resolveExprsAndAddMissingAttrs(maybeResolvedExprs, p.child) - // The resolved attributes might not come from `p.child`. Need to filter it. - val missingAttrs = (AttributeSet(newExprs).intersect(p.child.outputSet)) -- p.outputSet + // Only add missing attributes coming from `newChild`. + val missingAttrs = (AttributeSet(newExprs) -- p.outputSet).intersect(newChild.outputSet) (newExprs, Project(p.projectList ++ missingAttrs, newChild)) case a @ Aggregate(groupExprs, aggExprs, child) => From 6eda8d2e8e56a9d9027d6cac49d23684396cb0ea Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 12 Jul 2018 07:04:27 +0000 Subject: [PATCH 4/8] Remove added test. --- .../spark/sql/catalyst/plans/LogicalPlanSuite.scala | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala index c772c305cb1db..bf569cb869428 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala @@ -119,16 +119,4 @@ class LogicalPlanSuite extends SparkFunSuite { OneRowRelation()) assert(result.sameResult(expected)) } - - test("Logical plan with missing inputs should be unresolved") { - // Normally we won't add a missing resolved reference into a logical plan, - // but a valid query like `df.select(df("name")).filter(df("id") === 0)` can make a query - // like this. - val relation = LocalRelation(AttributeReference("a", IntegerType, nullable = true)()) - val plan = Project(Stream(AttributeReference("b", IntegerType, nullable = true)()), relation) - assert(plan.expressions.forall(_.resolved)) - assert(plan.childrenResolved) - assert(plan.missingInput.nonEmpty) - assert(!plan.resolved) - } } From 8432b00ab737646217a0191ad3661ef95af094cc Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 12 Jul 2018 08:14:04 +0000 Subject: [PATCH 5/8] Add more tests and deal with aggregate. --- .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 8 ++++++-- .../scala/org/apache/spark/sql/DataFrameSuite.scala | 10 ++++++++++ 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 58d81a86ccf7c..51636d54215d2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1176,7 +1176,7 @@ class Analyzer( case a @ Aggregate(groupExprs, aggExprs, child) => val maybeResolvedExprs = exprs.map(resolveExpression(_, a)) val (newExprs, newChild) = resolveExprsAndAddMissingAttrs(maybeResolvedExprs, child) - val missingAttrs = AttributeSet(newExprs) -- AttributeSet(maybeResolvedExprs) + val missingAttrs = (AttributeSet(newExprs) -- a.outputSet).intersect(newChild.outputSet) if (missingAttrs.forall(attr => groupExprs.exists(_.semanticEquals(attr)))) { // All the missing attributes are grouping expressions, valid case. (newExprs, a.copy(aggregateExpressions = aggExprs ++ missingAttrs, child = newChild)) @@ -1496,7 +1496,11 @@ class Analyzer( // Try resolving the ordering as though it is in the aggregate clause. try { - val unresolvedSortOrders = sortOrder.filter(s => !s.resolved || containsAggregate(s)) + // If a sort order is unresolved, containing references not in aggregate, or containing + // `AggregateExpression`, we need to push down it to the underlying aggregate operator. + val unresolvedSortOrders = sortOrder.filter { s => + !s.resolved || !s.references.subsetOf(aggregate.outputSet) || containsAggregate(s) + } val aliasedOrdering = unresolvedSortOrders.map(o => Alias(o.child, "aggOrder")()) val aggregatedOrdering = aggregate.copy(aggregateExpressions = aliasedOrdering) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 05a4c9714016e..f91ce434020b1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -2397,5 +2397,15 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { val sort1 = df.select(df("name")).orderBy(df("id")) val sort2 = df.select(col("name")).orderBy(col("id")) checkAnswer(sort1, sort2.collect()) + + withSQLConf(SQLConf.DATAFRAME_RETAIN_GROUP_COLUMNS.key -> "false") { + val aggPlusSort1 = df.groupBy(df("name")).agg(count(df("name"))).orderBy(df("name")) + val aggPlusSort2 = df.groupBy(col("name")).agg(count(col("name"))).orderBy(col("name")) + checkAnswer(aggPlusSort1, aggPlusSort2.collect()) + + val aggPlusFilter1 = df.groupBy(df("name")).agg(count(df("name"))).filter(df("name") === 0) + val aggPlusFilter2 = df.groupBy(col("name")).agg(count(col("name"))).filter(col("name") === 0) + checkAnswer(aggPlusFilter1, aggPlusFilter2.collect()) + } } } From 860d433127ef691034b4c5e2135780679312939e Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 12 Jul 2018 12:58:47 +0000 Subject: [PATCH 6/8] Add comments. --- .../spark/sql/catalyst/analysis/Analyzer.scala | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 51636d54215d2..157e4f14ad1c8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1152,11 +1152,17 @@ class Analyzer( } } + /** + * This method tries to resolve expressions and find missing attributes recursively. Specially, + * when the expressions used in `Sort` or `Filter` contain unresolved attributes or resolved + * attributes which are missed from SELECT clause. This method tries to find the missing + * attributes out and add into the projection. + */ private def resolveExprsAndAddMissingAttrs( exprs: Seq[Expression], plan: LogicalPlan): (Seq[Expression], LogicalPlan) = { - // An expression is possibly resolved but not in the output of `plan`. + // Missing attributes can be unresolved attributes or resolved attributes which are not in + // the output attributes of the plan. if (exprs.forall(e => e.resolved && e.references.subsetOf(plan.outputSet))) { - // All given expressions are resolved and in the plan's output, no need to continue anymore. (exprs, plan) } else { plan match { @@ -1167,9 +1173,12 @@ class Analyzer( (newExprs, AnalysisBarrier(newChild)) case p: Project => + // Resolving expressions against current plan. val maybeResolvedExprs = exprs.map(resolveExpression(_, p)) + // Recursively resolving expressions on the child of current plan. val (newExprs, newChild) = resolveExprsAndAddMissingAttrs(maybeResolvedExprs, p.child) - // Only add missing attributes coming from `newChild`. + // If some attributes used by expressions are resolvable only on the rewritten child + // plan, we need to add them into original projection. val missingAttrs = (AttributeSet(newExprs) -- p.outputSet).intersect(newChild.outputSet) (newExprs, Project(p.projectList ++ missingAttrs, newChild)) From a98f4161c682b90755e9599a437241dcaeb388b5 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 13 Jul 2018 00:36:53 +0000 Subject: [PATCH 7/8] Update comment. --- .../scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 157e4f14ad1c8..83eb52d05af64 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1155,7 +1155,7 @@ class Analyzer( /** * This method tries to resolve expressions and find missing attributes recursively. Specially, * when the expressions used in `Sort` or `Filter` contain unresolved attributes or resolved - * attributes which are missed from SELECT clause. This method tries to find the missing + * attributes which are missed from child output. This method tries to find the missing * attributes out and add into the projection. */ private def resolveExprsAndAddMissingAttrs( From 9e00db938ddc6293899170e19b41530b22fb525a Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 13 Jul 2018 04:59:15 +0000 Subject: [PATCH 8/8] Split original test case to two test cases. --- .../test/scala/org/apache/spark/sql/DataFrameSuite.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index f91ce434020b1..5babdf6f33b99 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -2388,7 +2388,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { checkAnswer(spark.range(1).select(mapWithBinaryKey.getItem(Array[Byte](1.toByte))), Row(1)) } - test("SPARK-24781: Using a reference from Dataset in Filter/Sort might not work") { + test("SPARK-24781: Using a reference from Dataset in Filter/Sort") { val df = Seq(("test1", 0), ("test2", 1)).toDF("name", "id") val filter1 = df.select(df("name")).filter(df("id") === 0) val filter2 = df.select(col("name")).filter(col("id") === 0) @@ -2397,8 +2397,12 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { val sort1 = df.select(df("name")).orderBy(df("id")) val sort2 = df.select(col("name")).orderBy(col("id")) checkAnswer(sort1, sort2.collect()) + } + + test("SPARK-24781: Using a reference not in aggregation in Filter/Sort") { + withSQLConf(SQLConf.DATAFRAME_RETAIN_GROUP_COLUMNS.key -> "false") { + val df = Seq(("test1", 0), ("test2", 1)).toDF("name", "id") - withSQLConf(SQLConf.DATAFRAME_RETAIN_GROUP_COLUMNS.key -> "false") { val aggPlusSort1 = df.groupBy(df("name")).agg(count(df("name"))).orderBy(df("name")) val aggPlusSort2 = df.groupBy(col("name")).agg(count(col("name"))).orderBy(col("name")) checkAnswer(aggPlusSort1, aggPlusSort2.collect())