Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ class Analyzer(
ExtractWindowExpressions ::
GlobalAggregates ::
ResolveAggregateFunctions ::
DistinctAggregationRewriter(conf) ::
HiveTypeCoercion.typeCoercionRules ++
extendedResolutionRules : _*),
Batch("Nondeterministic", Once,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,8 @@ import org.apache.spark.sql.types.IntegerType
*/
case class DistinctAggregationRewriter(conf: CatalystConf) extends Rule[LogicalPlan] {

def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case p if !p.resolved => p
// We need to wait until this Aggregate operator is resolved.
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case a: Aggregate => rewrite(a)
case p => p
}

def rewrite(a: Aggregate): Aggregate = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
package org.apache.spark.sql.catalyst.optimizer

import scala.collection.immutable.HashSet
import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, EliminateSubQueries}
import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, DistinctAggregationRewriter, EliminateSubQueries}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.Inner
Expand All @@ -30,14 +31,13 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.types._

abstract class Optimizer extends RuleExecutor[LogicalPlan]

object DefaultOptimizer extends Optimizer {
abstract class Optimizer(conf: CatalystConf) extends RuleExecutor[LogicalPlan] {
val batches =
// SubQueries are only needed for analysis and can be removed before execution.
Batch("Remove SubQueries", FixedPoint(100),
EliminateSubQueries) ::
Batch("Aggregate", FixedPoint(100),
DistinctAggregationRewriter(conf),
ReplaceDistinctWithAggregate,
RemoveLiteralFromGroupExpressions) ::
Batch("Operator Optimizations", FixedPoint(100),
Expand Down Expand Up @@ -68,6 +68,18 @@ object DefaultOptimizer extends Optimizer {
Batch("LocalRelation", FixedPoint(100),
ConvertToLocalRelation) :: Nil
}
case class DefaultOptimizer(conf: CatalystConf) extends Optimizer(conf)

/**
* An optimizer used in test code.
*
* To ensure extendability, we leave the standard rules in the abstract optimizer rules, while
* specific rules go to the subclasses
*/
object SimpleTestOptimizer extends SimpleTestOptimizer

class SimpleTestOptimizer extends Optimizer(
new SimpleCatalystConf(caseSensitiveAnalysis = true))

/**
* Pushes operations down into a Sample.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.scalatest.prop.GeneratorDrivenPropertyChecks
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.optimizer.DefaultOptimizer
import org.apache.spark.sql.catalyst.optimizer.SimpleTestOptimizer
import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project}
import org.apache.spark.sql.types.DataType

Expand Down Expand Up @@ -189,7 +189,7 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks {
expected: Any,
inputRow: InternalRow = EmptyRow): Unit = {
val plan = Project(Alias(expression, s"Optimized($expression)")() :: Nil, OneRowRelation)
val optimizedPlan = DefaultOptimizer.execute(plan)
val optimizedPlan = SimpleTestOptimizer.execute(plan)
checkEvaluationWithoutCodegen(optimizedPlan.expressions.head, expected, inputRow)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateProjection, GenerateMutableProjection}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.optimizer.DefaultOptimizer
import org.apache.spark.sql.catalyst.optimizer.SimpleTestOptimizer
import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project}
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -150,7 +150,7 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
expression: Expression,
inputRow: InternalRow = EmptyRow): Unit = {
val plan = Project(Alias(expression, s"Optimized($expression)")() :: Nil, OneRowRelation)
val optimizedPlan = DefaultOptimizer.execute(plan)
val optimizedPlan = SimpleTestOptimizer.execute(plan)
checkNaNWithoutCodegen(optimizedPlan.expressions.head, inputRow)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ class SQLContext private[sql](
}

@transient
protected[sql] lazy val optimizer: Optimizer = DefaultOptimizer
protected[sql] lazy val optimizer: Optimizer = DefaultOptimizer(conf)

@transient
protected[sql] val ddlParser = new DDLParser(sqlParser.parse(_))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,12 @@ class PlannerSuite extends SharedSQLContext {
fail(s"Could query play aggregation query $query. Is it an aggregation query?"))
val aggregations = planned.collect { case n if n.nodeName contains "Aggregate" => n }

// For the new aggregation code path, there will be four aggregate operator for
// distinct aggregations.
// For the new aggregation code path, there will be three aggregate operator for
// distinct aggregations. There used to be four aggregate operators because single
// distinct aggregate used to trigger DistinctAggregationRewriter rewrite. Now the
// the rewrite only happens when there are multiple distinct aggregations.
assert(
aggregations.size == 2 || aggregations.size == 4,
aggregations.size == 2 || aggregations.size == 3,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this change? This PR just moves the rewrite rule to the beginning of optimizer rule right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before my change of moving the DistinctAggregationRewriter rule to Optimizer, Analyzer resolved the single distinct case into 2-layered non-distinct Aggregates and an EXPAND as following:

Aggregate [value#3], [value#3,(count(if ((gid#48 = 1)) key#49 else null),mode=Complete,isDistinct=false) AS count(key)#47L]
+- Aggregate [value#3,key#49,gid#48], [value#3,key#49,gid#48]
   +- Expand [ArrayBuffer(value#3, key#2, 1)], [value#3,key#49,gid#48]
      +- LogicalRDD [key#2,value#3], MapPartitionsRDD[3] at beforeAll at BeforeAndAfterAll.scala:187

Then, the resulted physical Plan in this testcase had 4 TungstenAggregate generated by aggregate/utils.planAggregateWithoutDistinct. So this testcase checked for 4 aggregations.

After my change, there is no rewrite for the single distinct aggregate anymore, the analyzed plan looks like this:

Aggregate [value#3], [value#3,(count(key#2),mode=Complete,isDistinct=true) AS count(key)#47L]
+- LogicalRDD [key#2,value#3], MapPartitionsRDD[3] at beforeAll at BeforeAndAfterAll.scala:187

Then, the physical plan are resulted in 3 TungstenAggregate by aggregate/utils.planAggregateWithOneDistinct

Therefore, I need to modify this testcase.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is there no rewrite after your change? You just moved the rule but didn't change it.

Copy link
Contributor Author

@xwu0226 xwu0226 May 11, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rewrite I mentioned was by Analyzer. In this testcase, the analyzed logical plan, instead of the optimized one, is passed to generate physical plan. So after I moved the code, this testcase will not see a rewrite for single distinct aggregation.

s"The plan of query $query does not have partial aggregations.")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,22 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te
Row(11) :: Nil)
}
}

test("SPARK-14495: distinct aggregate in having clause") {
checkAnswer(
sqlContext.sql(
"""
|select key, count(distinct value1), count(distinct value2)
|from agg2 group by key
|having count(distinct value1) > 0
""".stripMargin),
Seq(
Row(null, 3, 3),
Row(1, 2, 3),
Row(2, 2, 1)
)
)
}
}


Expand Down