From 5e93654307a925fecef4005956c5791a807ccff3 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Fri, 21 Aug 2015 16:38:47 -0700 Subject: [PATCH 1/9] [SPARK-10165][SQL] Await child resolution in ResolveFunctions --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 1 + .../org/apache/spark/sql/hive/execution/HiveUDFSuite.scala | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index d0eb9c2c90bdf..822396ca95116 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -515,6 +515,7 @@ class Analyzer( def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case q: LogicalPlan => q transformExpressions { + case u if !u.childrenResolved => u // Skip until children are resolved. case u @ UnresolvedFunction(name, children, isDistinct) => withPosition(u) { registry.lookupFunction(name, children) match { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index 10f2902e5eef0..b03a35132325d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -276,6 +276,11 @@ class HiveUDFSuite extends QueryTest { checkAnswer( sql("SELECT testStringStringUDF(\"hello\", s) FROM stringTable"), Seq(Row("hello world"), Row("hello goodbye"))) + + checkAnswer( + sql("SELECT testStringStringUDF(\"\", testStringStringUDF(\"hello\", s)) FROM stringTable"), + Seq(Row(" hello world"), Row(" hello goodbye"))) + sql("DROP TEMPORARY FUNCTION IF EXISTS testStringStringUDF") TestHive.reset() From ca76997c46a9188460edc84b8ef1b3be68db2f94 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Sun, 23 Aug 2015 17:14:35 -0700 Subject: [PATCH 2/9] handle sorts --- .../sql/catalyst/analysis/Analyzer.scala | 58 ++++++++++++++++--- 1 file changed, 51 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 822396ca95116..1891c09f1de33 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -566,15 +566,59 @@ class Analyzer( */ object UnresolvedHavingClauseAttributes extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case filter @ Filter(havingCondition, aggregate @ Aggregate(_, originalAggExprs, _)) - if aggregate.resolved && containsAggregate(havingCondition) => + case filter @ Filter(havingCondition, + aggregate @ Aggregate(grouping, originalAggExprs, child)) + if aggregate.resolved && !filter.resolved => + + // Try resolving the condition of the filter as though it is in the aggregate clause + val aggregatedCondition = Aggregate(grouping, Alias(havingCondition, "")() :: Nil, child) + val resolvedOperator = execute(aggregatedCondition) + def resolvedAggregateFilter = + resolvedOperator + .asInstanceOf[Aggregate] + .aggregateExpressions.head + .children.head // Strip alias + + // If resolution was successful and we see the filter has an aggregate in it, add it to + // the original aggregate operator. + if (resolvedOperator.resolved && containsAggregate(resolvedAggregateFilter)) { + val evaluatedCondition = Alias(resolvedAggregateFilter, "havingCondition")() + val aggExprsWithHaving = evaluatedCondition +: originalAggExprs + + Project(aggregate.output, + Filter(evaluatedCondition.toAttribute, + aggregate.copy(aggregateExpressions = aggExprsWithHaving))) + } else { + plan + } + + case sort @ Sort(sortOrder, global, + aggregate @ Aggregate(grouping, originalAggExprs, child)) + if aggregate.resolved && !sort.resolved => + + // Try resolving the condition of the filter as though it is in the aggregate clause + val aliasedOrder = sortOrder.map(o => Alias(o.child, "aggOrder")()) + val aggregatedCondition = Aggregate(grouping, aliasedOrder, child) + val resolvedOperator: Aggregate = execute(aggregatedCondition).asInstanceOf[Aggregate] + def resolvedAggregateOrdering: Seq[NamedExpression] = resolvedOperator.aggregateExpressions - val evaluatedCondition = Alias(havingCondition, "havingCondition")() - val aggExprsWithHaving = evaluatedCondition +: originalAggExprs + val needsAggregate = resolvedAggregateOrdering.exists(containsAggregate) - Project(aggregate.output, - Filter(evaluatedCondition.toAttribute, - aggregate.copy(aggregateExpressions = aggExprsWithHaving))) + // If resolution was successful and we see the filter has an aggregate in it, add it to + // the original aggregate operator. + if (resolvedOperator.resolved && needsAggregate) { + val evaluatedOrderings: Seq[SortOrder] = sortOrder.zip(resolvedAggregateOrdering).map { + case (order, evaluated) => order.copy(child = evaluated.toAttribute) + } + val aggExprsWithHaving: Seq[NamedExpression] = + resolvedAggregateOrdering ++ originalAggExprs + + Project(aggregate.output, + Sort(evaluatedOrderings, global, + aggregate.copy(aggregateExpressions = aggExprsWithHaving))) + } else { + plan + } } protected def containsAggregate(condition: Expression): Boolean = { From e5ea534886b7adc08d68f567e3d755fde38fb104 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Sun, 23 Aug 2015 18:19:03 -0700 Subject: [PATCH 3/9] remove duplicate rules --- .../sql/catalyst/analysis/Analyzer.scala | 54 +++++-------------- 1 file changed, 12 insertions(+), 42 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 1891c09f1de33..cff74f1b56641 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -78,7 +78,7 @@ class Analyzer( ResolveAliases :: ExtractWindowExpressions :: GlobalAggregates :: - UnresolvedHavingClauseAttributes :: + ResolveAggregateFunctions :: HiveTypeCoercion.typeCoercionRules ++ extendedResolutionRules : _*), Batch("Nondeterministic", Once, @@ -452,37 +452,6 @@ class Analyzer( logDebug(s"Failed to find $missing in ${p.output.mkString(", ")}") s // Nothing we can do here. Return original plan. } - case s @ Sort(ordering, global, a @ Aggregate(grouping, aggs, child)) - if !s.resolved && a.resolved => - // A small hack to create an object that will allow us to resolve any references that - // refer to named expressions that are present in the grouping expressions. - val groupingRelation = LocalRelation( - grouping.collect { case ne: NamedExpression => ne.toAttribute } - ) - - // Find sort attributes that are projected away so we can temporarily add them back in. - val (newOrdering, missingAttr) = resolveAndFindMissing(ordering, a, groupingRelation) - - // Find aggregate expressions and evaluate them early, since they can't be evaluated in a - // Sort. - val (withAggsRemoved, aliasedAggregateList) = newOrdering.map { - case aggOrdering if aggOrdering.collect { case a: AggregateExpression => a }.nonEmpty => - val aliased = Alias(aggOrdering.child, "_aggOrdering")() - (aggOrdering.copy(child = aliased.toAttribute), Some(aliased)) - - case other => (other, None) - }.unzip - - val missing = missingAttr ++ aliasedAggregateList.flatten - - if (missing.nonEmpty) { - // Add missing grouping exprs and then project them away after the sort. - Project(a.output, - Sort(withAggsRemoved, global, - Aggregate(grouping, aggs ++ missing, child))) - } else { - s // Nothing we can do here. Return original plan. - } } /** @@ -560,33 +529,32 @@ class Analyzer( } /** - * This rule finds expressions in HAVING clause filters that depend on - * unresolved attributes. It pushes these expressions down to the underlying - * aggregates and then projects them away above the filter. + * This rule finds aggregate expressions that are not in an aggregate operator. For example, + * those in a HAVING clause or ORDER BY clause. These expressions are pushed down to the + * underlying aggregate operator and then projected away after the original operator. */ - object UnresolvedHavingClauseAttributes extends Rule[LogicalPlan] { + object ResolveAggregateFunctions extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case filter @ Filter(havingCondition, aggregate @ Aggregate(grouping, originalAggExprs, child)) if aggregate.resolved && !filter.resolved => // Try resolving the condition of the filter as though it is in the aggregate clause - val aggregatedCondition = Aggregate(grouping, Alias(havingCondition, "")() :: Nil, child) + val aggregatedCondition = + Aggregate(grouping, Alias(havingCondition, "havingCondition")() :: Nil, child) val resolvedOperator = execute(aggregatedCondition) def resolvedAggregateFilter = resolvedOperator .asInstanceOf[Aggregate] .aggregateExpressions.head - .children.head // Strip alias // If resolution was successful and we see the filter has an aggregate in it, add it to // the original aggregate operator. if (resolvedOperator.resolved && containsAggregate(resolvedAggregateFilter)) { - val evaluatedCondition = Alias(resolvedAggregateFilter, "havingCondition")() - val aggExprsWithHaving = evaluatedCondition +: originalAggExprs + val aggExprsWithHaving = resolvedAggregateFilter +: originalAggExprs Project(aggregate.output, - Filter(evaluatedCondition.toAttribute, + Filter(resolvedAggregateFilter.toAttribute, aggregate.copy(aggregateExpressions = aggExprsWithHaving))) } else { plan @@ -603,10 +571,12 @@ class Analyzer( def resolvedAggregateOrdering: Seq[NamedExpression] = resolvedOperator.aggregateExpressions val needsAggregate = resolvedAggregateOrdering.exists(containsAggregate) + val requiredAttributes = resolvedAggregateOrdering.map(_.references).reduce(_ ++ _) + val missingAttributes = (requiredAttributes -- aggregate.outputSet).nonEmpty // If resolution was successful and we see the filter has an aggregate in it, add it to // the original aggregate operator. - if (resolvedOperator.resolved && needsAggregate) { + if (resolvedOperator.resolved && (needsAggregate || missingAttributes)) { val evaluatedOrderings: Seq[SortOrder] = sortOrder.zip(resolvedAggregateOrdering).map { case (order, evaluated) => order.copy(child = evaluated.toAttribute) } From 827688eb6a85ef70a6b449cf3aecd054b287e7d5 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Sun, 23 Aug 2015 22:04:52 -0700 Subject: [PATCH 4/9] fix fallback --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index cff74f1b56641..4687c1e07dcf9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -557,7 +557,7 @@ class Analyzer( Filter(resolvedAggregateFilter.toAttribute, aggregate.copy(aggregateExpressions = aggExprsWithHaving))) } else { - plan + filter } case sort @ Sort(sortOrder, global, @@ -587,7 +587,7 @@ class Analyzer( Sort(evaluatedOrderings, global, aggregate.copy(aggregateExpressions = aggExprsWithHaving))) } else { - plan + sort } } From 34bc2b613b05f158723c7128821692c8c43a69c8 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Mon, 24 Aug 2015 12:34:46 -0700 Subject: [PATCH 5/9] handle induced ambiguity --- .../sql/catalyst/analysis/Analyzer.scala | 52 +++++++++++-------- 1 file changed, 29 insertions(+), 23 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 4687c1e07dcf9..be7b4ac53e4ce 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -564,30 +564,36 @@ class Analyzer( aggregate @ Aggregate(grouping, originalAggExprs, child)) if aggregate.resolved && !sort.resolved => - // Try resolving the condition of the filter as though it is in the aggregate clause - val aliasedOrder = sortOrder.map(o => Alias(o.child, "aggOrder")()) - val aggregatedCondition = Aggregate(grouping, aliasedOrder, child) - val resolvedOperator: Aggregate = execute(aggregatedCondition).asInstanceOf[Aggregate] - def resolvedAggregateOrdering: Seq[NamedExpression] = resolvedOperator.aggregateExpressions - - val needsAggregate = resolvedAggregateOrdering.exists(containsAggregate) - val requiredAttributes = resolvedAggregateOrdering.map(_.references).reduce(_ ++ _) - val missingAttributes = (requiredAttributes -- aggregate.outputSet).nonEmpty - - // If resolution was successful and we see the filter has an aggregate in it, add it to - // the original aggregate operator. - if (resolvedOperator.resolved && (needsAggregate || missingAttributes)) { - val evaluatedOrderings: Seq[SortOrder] = sortOrder.zip(resolvedAggregateOrdering).map { - case (order, evaluated) => order.copy(child = evaluated.toAttribute) + // Try resolving the ordering as though it is in the aggregate clause. + try { + val aliasedOrder = sortOrder.map(o => Alias(o.child, "aggOrder")()) + val aggregatedCondition = Aggregate(grouping, aliasedOrder, child) + val resolvedOperator: Aggregate = execute(aggregatedCondition).asInstanceOf[Aggregate] + def resolvedAggregateOrdering = resolvedOperator.aggregateExpressions + + val needsAggregate = resolvedAggregateOrdering.exists(containsAggregate) + val requiredAttributes = resolvedAggregateOrdering.map(_.references).reduce(_ ++ _) + val missingAttributes = (requiredAttributes -- aggregate.outputSet).nonEmpty + + // If resolution was successful and we see the filter has an aggregate in it, add it to + // the original aggregate operator. + if (resolvedOperator.resolved && (needsAggregate || missingAttributes)) { + val evaluatedOrderings: Seq[SortOrder] = sortOrder.zip(resolvedAggregateOrdering).map { + case (order, evaluated) => order.copy(child = evaluated.toAttribute) + } + val aggExprsWithHaving: Seq[NamedExpression] = + resolvedAggregateOrdering ++ originalAggExprs + + Project(aggregate.output, + Sort(evaluatedOrderings, global, + aggregate.copy(aggregateExpressions = aggExprsWithHaving))) + } else { + sort } - val aggExprsWithHaving: Seq[NamedExpression] = - resolvedAggregateOrdering ++ originalAggExprs - - Project(aggregate.output, - Sort(evaluatedOrderings, global, - aggregate.copy(aggregateExpressions = aggExprsWithHaving))) - } else { - sort + } catch { + // Attempting to resolve in the aggregate can result in ambiguity. When this happens, + // just return the original plan. + case ae: AnalysisException => sort } } From f00e1e908363fa3167ae6017279aebc8bf007c5c Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Mon, 24 Aug 2015 12:37:47 -0700 Subject: [PATCH 6/9] better comment --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index be7b4ac53e4ce..d389574c35783 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -575,7 +575,8 @@ class Analyzer( val requiredAttributes = resolvedAggregateOrdering.map(_.references).reduce(_ ++ _) val missingAttributes = (requiredAttributes -- aggregate.outputSet).nonEmpty - // If resolution was successful and we see the filter has an aggregate in it, add it to + // If resolution was successful and we see the ordering either has an aggregate in it or + // it is missing something that is projected away by the aggregate, add the ordering // the original aggregate operator. if (resolvedOperator.resolved && (needsAggregate || missingAttributes)) { val evaluatedOrderings: Seq[SortOrder] = sortOrder.zip(resolvedAggregateOrdering).map { From 614eb67829a330d6bada0c859fb98da3eaffc5e8 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Mon, 24 Aug 2015 13:14:48 -0700 Subject: [PATCH 7/9] naming --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index d389574c35783..610c26f65d078 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -582,12 +582,12 @@ class Analyzer( val evaluatedOrderings: Seq[SortOrder] = sortOrder.zip(resolvedAggregateOrdering).map { case (order, evaluated) => order.copy(child = evaluated.toAttribute) } - val aggExprsWithHaving: Seq[NamedExpression] = + val aggExprsWithOrdering: Seq[NamedExpression] = resolvedAggregateOrdering ++ originalAggExprs Project(aggregate.output, Sort(evaluatedOrderings, global, - aggregate.copy(aggregateExpressions = aggExprsWithHaving))) + aggregate.copy(aggregateExpressions = aggExprsWithOrdering))) } else { sort } From a52d3055c402936e9c09d5183d396fd86e211cbc Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Mon, 24 Aug 2015 13:15:50 -0700 Subject: [PATCH 8/9] naming --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 610c26f65d078..f61de59baf79e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -567,8 +567,8 @@ class Analyzer( // Try resolving the ordering as though it is in the aggregate clause. try { val aliasedOrder = sortOrder.map(o => Alias(o.child, "aggOrder")()) - val aggregatedCondition = Aggregate(grouping, aliasedOrder, child) - val resolvedOperator: Aggregate = execute(aggregatedCondition).asInstanceOf[Aggregate] + val aggregatedOrdering = Aggregate(grouping, aliasedOrder, child) + val resolvedOperator: Aggregate = execute(aggregatedOrdering).asInstanceOf[Aggregate] def resolvedAggregateOrdering = resolvedOperator.aggregateExpressions val needsAggregate = resolvedAggregateOrdering.exists(containsAggregate) From ab80fad1ecc26f3160eebee5daa579e8dc556c97 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Mon, 24 Aug 2015 15:25:05 -0700 Subject: [PATCH 9/9] don't push down unless in grouping expressions --- .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index f61de59baf79e..1a5de15c61f86 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -571,14 +571,20 @@ class Analyzer( val resolvedOperator: Aggregate = execute(aggregatedOrdering).asInstanceOf[Aggregate] def resolvedAggregateOrdering = resolvedOperator.aggregateExpressions + // Expressions that have an aggregate can be pushed down. val needsAggregate = resolvedAggregateOrdering.exists(containsAggregate) + + // Attribute references, that are missing from the order but are present in the grouping + // expressions can also be pushed down. val requiredAttributes = resolvedAggregateOrdering.map(_.references).reduce(_ ++ _) - val missingAttributes = (requiredAttributes -- aggregate.outputSet).nonEmpty + val missingAttributes = requiredAttributes -- aggregate.outputSet + val validPushdownAttributes = + missingAttributes.filter(a => grouping.exists(a.semanticEquals)) // If resolution was successful and we see the ordering either has an aggregate in it or // it is missing something that is projected away by the aggregate, add the ordering // the original aggregate operator. - if (resolvedOperator.resolved && (needsAggregate || missingAttributes)) { + if (resolvedOperator.resolved && (needsAggregate || validPushdownAttributes.nonEmpty)) { val evaluatedOrderings: Seq[SortOrder] = sortOrder.zip(resolvedAggregateOrdering).map { case (order, evaluated) => order.copy(child = evaluated.toAttribute) }