From 557298e3d88c04910ebff9cdb1ae77a1537c83af Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 12 May 2017 22:27:57 +0800 Subject: [PATCH 1/2] partial aggregate should behave correctly for sameResult --- .../expressions/aggregate/interfaces.scala | 14 ++++++++++++-- .../spark/sql/execution/SameResultSuite.scala | 8 ++++++++ 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala index 80c25d0b0fb7..fffcc7c9ef53 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala @@ -105,12 +105,22 @@ case class AggregateExpression( } // We compute the same thing regardless of our final result. - override lazy val canonicalized: Expression = + override lazy val canonicalized: Expression = { + val normalizedAggFunc = mode match { + // For PartialMerge or Final mode, the input to the `aggregateFunction` is aggregate buffers, + // and the actual children of `aggregateFunction` is not used, here we normalize the expr id. + case PartialMerge | Final => aggregateFunction.transform { + case a: AttributeReference => a.withExprId(ExprId(0)) + } + case Partial | Complete => aggregateFunction + } + AggregateExpression( - aggregateFunction.canonicalized.asInstanceOf[AggregateFunction], + normalizedAggFunc.canonicalized.asInstanceOf[AggregateFunction], mode, isDistinct, ExprId(0)) + } override def children: Seq[Expression] = aggregateFunction :: Nil override def dataType: DataType = aggregateFunction.dataType diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SameResultSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SameResultSuite.scala index 25e4ca060ae0..668c90a86330 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SameResultSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SameResultSuite.scala @@ -18,12 +18,14 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.{DataFrame, QueryTest} +import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext /** * Tests for the sameResult function for [[SparkPlan]]s. */ class SameResultSuite extends QueryTest with SharedSQLContext { + import testImplicits._ test("FileSourceScanExec: different orders of data filters and partition filters") { withTempPath { path => @@ -46,4 +48,10 @@ class SameResultSuite extends QueryTest with SharedSQLContext { df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get .asInstanceOf[FileSourceScanExec] } + + test("SPARK-20725: partial aggregate should behave correctly for sameResult") { + val df1 = spark.range(10).agg(sum($"id")) + val df2 = spark.range(10).agg(sum($"id")) + assert(df1.queryExecution.executedPlan.sameResult(df2.queryExecution.executedPlan)) + } } From 49da955dce260260325708d07becbc692cd3a005 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Sat, 13 May 2017 20:34:58 +0800 Subject: [PATCH 2/2] fix another bug --- .../scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala | 4 ++-- .../org/apache/spark/sql/execution/SameResultSuite.scala | 4 ++++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 2fb65bd43550..a86e9d8a936b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -286,7 +286,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT def recursiveTransform(arg: Any): AnyRef = arg match { case e: Expression => transformExpression(e) - case Some(e: Expression) => Some(transformExpression(e)) + case Some(value) => Some(recursiveTransform(value)) case m: Map[_, _] => m case d: DataType => d // Avoid unpacking Structs case seq: Traversable[_] => seq.map(recursiveTransform) @@ -320,7 +320,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT productIterator.flatMap { case e: Expression => e :: Nil - case Some(e: Expression) => e :: Nil + case s: Some[_] => seqToExpressions(s.toSeq) case seq: Traversable[_] => seqToExpressions(seq) case other => Nil }.toSeq diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SameResultSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SameResultSuite.scala index 668c90a86330..aaf51b5b9011 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SameResultSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SameResultSuite.scala @@ -53,5 +53,9 @@ class SameResultSuite extends QueryTest with SharedSQLContext { val df1 = spark.range(10).agg(sum($"id")) val df2 = spark.range(10).agg(sum($"id")) assert(df1.queryExecution.executedPlan.sameResult(df2.queryExecution.executedPlan)) + + val df3 = spark.range(10).agg(sumDistinct($"id")) + val df4 = spark.range(10).agg(sumDistinct($"id")) + assert(df3.queryExecution.executedPlan.sameResult(df4.queryExecution.executedPlan)) } }